LambdaとSQSによる非同期分散処理
はじめに
AWS Lambdaで処理時間が掛かることが想定される場合、どのような対応策があるでしょうか?Lambdaを諦める?(これも1つの選択肢だと思います)、一回のデータ処理量を減らして処理時間を短くする、など色々な方法が考えつくと思います。
今回は1つの対応策として、処理時間が掛かる処理を複数に分割して別Lambda関数として動かす事を考えてみました。処理を分割してQueueに登録し、Queue毎に別Lambda関数を起動するサンプルを作成してみたので、それを中心に書きたいと思います。尚、実装言語はPythonとなります。
処理概要
今回の処理概要を図にすると以下のようになります。
Pythonにより実装したLambda関数、SQS、DynamoDBを組み合わせました。一番左の「main.py」がタスクをQueueに分割してSQSに登録します。定期的に動いている「consumer.py」がSQSよりQueueを取得し、タスク毎に別Lambda関数「worker.py」を起動して処理を実行します。SQSはQueueを重複して取得する可能性があるため、一度取得したQueueのIDをDynamoDBに保存し、「worker.py」起動時にDynamoDBに登録済みか(つまり重複して取得したのか)をチェックするようにしました。
実装について
SQSとDynamoDB
SQSについては「sqs_workloads_queue」という名前でQueueを作成しました。DynamoDBについてはテーブル名「sqs_workloads_ids」、プライマリーキーは文字列型の「id」としました。
requirements.txt
以降はPythonのソースについての説明となります。先ずは今回のプロジェクトに必要なモジュールを定義する「requirements.txt」についてです。以下のようにしました。
requests boto3
PythonのAWS SDKである「boto3」を使用します。プロジェクトのローカルにモジュールをインストールするため、以下のコマンドを実行します。
$ pip install -r requirements.txt -t .
main.py
タスクをQueueに分割してSQSに登録する「main.py」のソースです。
import logging import json from queue import Queue logging.basicConfig(format='%(asctime)s - %(threadName)s - %(module)s:%(funcName)s(%(lineno)d) - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) def lambda_handler(event, context): try: logger.info('main start.') data = json.loads(json.dumps(event)) queue = Queue() for attr in data.get('names'): queue.enqueue(attr.get('name')) logger.info('main end.') return 'main success.' except Exception as e: logger.error('error raise.') logger.error(e) return 'main error.' if __name__ == "__main__": lambda_handler({"names":[{"name":"NameA"},{"name":"NameB"},{"name":"NameC"}]}, {})
今回は受信したjsonに複数件含まれている「name」属性を、1つずつのタスクに分割し、その値をQueueに渡すこととしました。16・17行目でこれを行っています。「queue.enqueue」でQueueに値を設定していますが、このメソッドについては後述することとします。
consumer.py
Queueを取得し、各タスク毎に別Lambda関数を起動する「consumer.py」です。
import logging import json from queue import Queue from lambda_function import LambdaFunction from dynamodb import DynamoDB logging.basicConfig(format='%(asctime)s - %(threadName)s - %(module)s:%(funcName)s(%(lineno)d) - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) def lambda_handler(event, context): try: logger.info('consumer start.') dynamodb = DynamoDB() lambda_fnc = LambdaFunction() queue = Queue() queues = queue.dequeue() for queue in queues: if dynamodb.is_exists(queue['MessageId']) == False: json = queue['Body'] lambda_fnc.invoke_worker(json) print 'worker invoke. json = ' + json dynamodb.save(queue['MessageId']) logger.info('consumer end.') return 'consumer success.' except Exception as e: logger.error('error raise.') logger.error(e) return 'consumer error.' if __name__ == "__main__": lambda_handler({}, {})
19行目でSQSよりQueueを取得します。20行目以下のループで、DynamoDBにID(MessageID)が存在するかのチェック(21行目)・Queueより値を取得して別Lambda関数の起動(23行目)・処理済みのIDをDynamoDBに登録(25行目)を行っています。各処理で呼び出したLambdaFunction・DynamoDBクラスについては後述します。
worker.py
タスク毎に起動されるLambda関数「worker.py」です。
import logging import json from queue import Queue logging.basicConfig(format='%(asctime)s - %(threadName)s - %(module)s:%(funcName)s(%(lineno)d) - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) def lambda_handler(event, context): try: logger.info('worker start.') data = json.loads(json.dumps(event)) print('worker received name = ' + data['name']) logger.info('worker end.') return 'worker success.' except Exception as e: logger.error('error raise.') logger.error(e) return 'worker error.' if __name__ == "__main__": lambda_handler({"name":"host1"}, {})
この関数は受け取ったjsonを解析し、nameの値を出力するだけのものです。13・14行目で行っています。
queue.py
Queueへの登録、Queueからの取得を行うメソッドを記述した「queue.py」です。
import boto3 class Queue: QUEUE_NAME = 'sqs_workloads_queue' MAX_QUEUE_COUNT = 10 REGION = 'ap-northeast-1' def enqueue(self, name): client = self.__client() response = client.send_message( QueueUrl=self.__queue_url(), MessageBody='{"name": "%s"}' % name ) return response['ResponseMetadata']['HTTPStatusCode'] == 200 def dequeue(self): client = self.__client() response = client.receive_message( QueueUrl=self.__queue_url(), MaxNumberOfMessages=self.MAX_QUEUE_COUNT ) messages = [] if 'Messages' in response: for message in response['Messages']: messages.append(message) client.delete_message( QueueUrl=self.__queue_url(), ReceiptHandle=message['ReceiptHandle'] ) return messages def __client(self): return boto3.client('sqs', region_name=self.REGION) def __queue_url(self): return self.__client().get_queue_url(QueueName=self.QUEUE_NAME)['QueueUrl']
Queueへの登録が「enqueue」メソッド、Queueからの取得が「dequeue」メソッドとなります。「dequeue」メソッドについてはQueueから取得後に削除も行っています。
dynamodb.py
DynamoDBへの登録、存在チェックを行う「dynamodb.py」です。
import boto3 class DynamoDB: REGION = 'ap-northeast-1' def save(self, id): client = self.__client() response = client.update_item( TableName='sqs_workloads_ids', Key={ 'id': { 'S': id } } ) return True def is_exists(self, id): client = self.__client() response = client.get_item( TableName='sqs_workloads_ids', Key={ 'id': { 'S': id } }, ConsistentRead=True ) return 'Item' in response def __client(self): return boto3.client('dynamodb', region_name=self.REGION)
DynamoDBへの登録が「save」メソッド、存在チェックが「exists」メソッドとなります。
lambda_function.py
Lambda関数を起動する「lambda_function.py」となります。
import boto3 import base64 class LambdaFunction: REGION = 'ap-northeast-1' WORKER_FUNCTION = 'sqs_workers_worker' def invoke_worker(self, json): client = boto3.client('lambda', region_name=self.REGION) response = client.invoke( FunctionName=self.WORKER_FUNCTION, InvocationType='Event', LogType='Tail', Payload=json ) return True
実行について
プロジェクトをLambdaに配置するためのzipを作成します。作成したzipを使い、「main.py」「consumer.py」「worker.py」を実行する3つのLambda関数を作成します。各Lambda関数には適切なロールを付与するのを忘れないで下さい。
「main.py」を実行するLambda関数の起動引数に、以下の様なjsonを指定します。
{ "names":[ {"name":"Name1"}, {"name":"Name2"}, {"name":"Name3"}, {"name":"Name4"}, {"name":"Name5"} ] }
「main.py」のLambda関数を起動すると、SQSにQueueが登録されます。「consumer.py」のLambda関数をスケジューリングによる起動 or 手動による起動をすると、タスクとして「worker.py」が呼び出されて以下の様なログがCloudWatch Logに出力されるはずです(出力されるメッセージのName自体はランダムで異なります)。
[INFO] 2016-06-14T03:24:06.79Z 7319afca-31df-11e6-9449-e3fd9e0e0473 worker start. worker received name = Name4 [INFO] 2016-06-14T03:24:06.79Z 7319afca-31df-11e6-9449-e3fd9e0e0473 worker end. END RequestId: 7319afca-31df-11e6-9449-e3fd9e0e0473 REPORT RequestId: 7319afca-31df-11e6-9449-e3fd9e0e0473 Duration: 0.50 ms Billed Duration: 100 ms Memory Size: 128 MB Max Memory Used: 21 MB START RequestId: 7384588d-31df-11e6-887e-29a6d11c36e7 Version: $LATEST [INFO] 2016-06-14T03:24:06.804Z 7384588d-31df-11e6-887e-29a6d11c36e7 worker start. worker received name = Name5 [INFO] 2016-06-14T03:24:06.804Z 7384588d-31df-11e6-887e-29a6d11c36e7 worker end.
参考サイト
Integrate SQS and Lambda: Serverless Architecture for Asynchronous Workloads