Lambdaから起動するバッチ処理の作成

2016.07.20

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

定型的な処理をミス無くこなす事が苦手なt.hondaです。そんな私の元に、毎月一回手動でスクリプトを実行しなければならない運用がありました。今回はこれをLambdaとEC2の組み合わせで自動起動するバッチ処理化してみました。

実装言語はPythonとなります。

処理概要

LambdaとEC2を組み合わせについて

以下のメリットを考慮し、LambdaとEC2を組み合わせることにしました。

  • EC2内で既存のスクリプトを実行することでスクリプト自体は改修することなく流用できる(スクリプトの対応OSによるが)
  • EC2内でスクリプトを実行することで、処理時間を考慮する必要がなくなる(Lambda単体ではタイムアウトがある)
  • 実行時のみLambdaよりEC2を起動することで利用料金を押さえる

処理フロー

処理としては単純で

  • 実行するスクリプトをEC2内で実行するようにする
  • そのEC2はLambdaの定期実行にて起動するようにする
  • Lambdaより起動する際、スクリプトにパラメータを渡せるようにする
  • スクリプトの実行後、EC2を終了する

としました。

図にすると以下のようになります。

lambda-start-batch-flow

実装について

では実装についてです。

1.Lambda 起動処理

上記図の「1.定期的に起動し、SNSにパラメータを送信する」「3.EC2を起動する」を実行するLambda Functionです。このLambdaがスケジュールで定期的に起動すること(今回は月一回)で、バッチ処理全体が起動します。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import boto3
import logging
import datetime
from dateutil import relativedelta

TOPIC_ARN = 'arn:aws:sns:ap-northeast-1:xxxxxx:topic-name'
SUBJECT = 'start'
EC2_INSTANCE_ID = 'i-xxxxxxxx'

logging.basicConfig(format='%(asctime)s - %(threadName)s - %(module)s:%(funcName)s(%(lineno)d) - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

def get_target_month():
    return (datetime.datetime.now() - relativedelta.relativedelta(months=1))\
            .strftime("%Y%m")

def send_message():
    client = boto3.client('sns')
    
    request = {
        'TopicArn': TOPIC_ARN,
        'Message': get_target_month(),
        'Subject': SUBJECT
    }

    resp = client.publish(**request)

    if resp['ResponseMetadata']['HTTPStatusCode'] != 200:
        raise Exception('sns send message status error', resp)

def ec2_start():
    client = boto3.client('ec2')

    resp = client.start_instances(
        InstanceIds=[
            EC2_INSTANCE_ID,
        ]
    )

    if resp['ResponseMetadata']['HTTPStatusCode'] != 200:
        raise Exception('ec2 start error', resp)
        
def start_lambda_handler(event, context):
    try:
        logger.info('start.')

        send_message()
        ec2_start()

        logger.info('start success.')
        return 'success'
    except Exception as e:
        logger.error(e)
        return 'error'

if __name__ == '__main__':
    start_lambda_handler(None, None)

このプログラムの「start_lambda_handler」メソッドをLambdaのイベントハンドラーとしてスケジュールに登録します。処理としては「send_message」メソッドでSNSへのメッセージを送信、「ec2_start」メソッドでEC2を起動しています。SNSへのメッセージはスクリプトへ渡すパラメータとしていますが、今回は処理日の年月としました。

Lambdaのスケジュール登録については以下のリンクを参考にしてください。
スケジュールされたイベントでの AWS Lambda の使用

2.SNSからSQSへの通知

上記図の「2.SNSからSQSにパラメータを伝達する」についてです。LambdaからSQSへ直接書き込まずにSNSを経由した理由は以下のブログの内容を考慮したためです。
【AWS】SQSキューの前には難しいこと考えずにSNSトピックを挟むと良いよ、という話

3.既存スクリプト 起動処理

上記図の「4.SQSより取得したパラメータを渡し、スクリプトを起動する」「5.SNSに終了メッセージを送信する」についてです。以下の「starter.py」をEC2の起動時に実行するようにcronで設定します。

starter.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import datetime
from message import Message

def current_datetime():
    return datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S')

def print_log(message):
    print('[%s] %s' % (current_datetime(), message))

def main():
    try:
        print_log('starter start.')

        message = Message()
        target_month = message.get_target_month()

        if target_month == '':
            print_log('starter cannot get target_month.')
            return

        # 既存スクリプトの起動処理
        # target_monthを渡す
        
        message.send_stop_ec2()

        print_log('starter end.')
    except Exception as e:
        print(e)

if __name__ == '__main__':
    main()

Messageの「get_target_month」メソッドを呼び出し、SQSよりパラメータの年月を取得しています。そして既存スクリプトを起動後、Messageの「send_stop_ec2」メソッドを呼び出してSNSへEC2を終了するメッセージを送信しています。

既存スクリプトの起動については業務ロジックなのでここには書きませんが、SQSより取得したパラメータを渡す事、処理の終了と同期できるように呼び出すことを考慮してください。Messageについては以下のようになります。

message.py
import boto3
import json

config = {}
execfile("application.conf", config)

class Message(object):
    def get_target_month(self):
        client = boto3.client('sqs')

        response = {}

        cnt = 0
        while 'Messages' not in response:
            cnt += 1
            response = client.receive_message(
                QueueUrl=config['QUEUE_URL'],
                MaxNumberOfMessages=1
            )
            if cnt > 1000:
                return ''
            
        for message in response['Messages']:
            client.delete_message(
                QueueUrl=config['QUEUE_URL'],
                ReceiptHandle=message['ReceiptHandle']
            )
            return json.loads(message['Body'])['Message']

    def send_stop_ec2(self):
        client = boto3.client('sns')
    
        request = {
            'TopicArn': config['TOPIC_ARN'],
            'Message': config['MESSAGE'],
            'Subject': config['SUBJECT']
        }

        resp = client.publish(**request)

        if resp['ResponseMetadata']['HTTPStatusCode'] != 200:
            raise Exception('sns send message status error', resp)

SQSよりパラメータを取得する「get_target_month」メソッドと、EC2終了のためSNSにメッセージを送信する「send_stop_ec2」メソッドです。こちらは定義値をapplication.confに記述していますが、以下のようになります。

application.conf
QUEUE_URL = 'https://sqs.ap-northeast-1.amazonaws.com/xxxxxxxxxx/queue-nane'
TOPIC_ARN = 'arn:aws:sns:ap-northeast-1:xxxxxxxxxx:end-topic-name'
SUBJECT = 'stop'
MESSAGE = 'ec2 stop.'

4.EC2終了処理

上記図の「6.SNSよりLambdaを発火する」「7.EC2を終了する」についてです。今回はSNSとLambdaを組み合わせてEC2を終了していますが、EC2内から(上記「starter.py」)直接終了してもいいかもしれません。

SNSからLambdaを呼び出す設定については割愛します。以下、EC2を終了するLambda Functionです。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import boto3
import logging
import datetime
from dateutil import relativedelta

EC2_INSTANCE_ID = 'i-xxxxxxxx'

logging.basicConfig(format='%(asctime)s - %(threadName)s - %(module)s:%(funcName)s(%(lineno)d) - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

def ec2_stop():
    client = boto3.client('ec2')

    resp = client.stop_instances(
        InstanceIds=[
            EC2_INSTANCE_ID,
        ]
    )

    if resp['ResponseMetadata']['HTTPStatusCode'] != 200:
        raise Exception('ec2 start error', resp)
        
def stop_lambda_handler(event, context):
    try:
        logger.info('stop report.')

        ec2_stop()

        logger.info('stop success.')
        return 'success'
    except Exception as e:
        logger.error(e)
        return 'error'

if __name__ == '__main__':
    stop_lambda_handler(None, None)

このプログラムの「stop_lambda_handler」メソッドをLambdaのイベントハンドラーとしてスケジュールに登録します。

追記。EC2の起動に失敗するケースがありました。以下、そのケースについてです。

具体的には、マネージメントコンソール上でステータスチェックが全て有効にならない状態のままとなってしまう事象です。これが発生すると

  • 上記の処理フロー図の「3.EC2を起動する」の状態で止まり続け、次の実行が行われない。
  • EC2も(ステータスチェックがエラーのまま)起動し続ける。
  • SQSにもキューが残り続ける。


ということになります。これを防ぐために以下の設定を行いました。

  • EC2のステータスチェックアラームで、EC2を終了するLambdaを呼ぶ。
  • SQSにキューが残るのを防ぐため、キューの有効時間を設定する。


これらの設定により、EC2のステータスがエラーの場合はシャットダウンされ、またキューも残らないため、次の定期的な実行にて再トライすることになります(条件によっては同じ結果が出力されるとは限らないですが)。

まとめ

以上で定期的に実行するスクリプトを、実行時のみEC2内で処理するバッチ化することができました。今まで手動で定期的に実行していたスクリプトを自動化することで、日常の業務が捗ることなどもあるかと思います。そのような時の参考になれば幸いです。