LambdaとSQSによる非同期分散処理

2016.06.16

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

AWS Lambdaで処理時間が掛かることが想定される場合、どのような対応策があるでしょうか?Lambdaを諦める?(これも1つの選択肢だと思います)、一回のデータ処理量を減らして処理時間を短くする、など色々な方法が考えつくと思います。

今回は1つの対応策として、処理時間が掛かる処理を複数に分割して別Lambda関数として動かす事を考えてみました。処理を分割してQueueに登録し、Queue毎に別Lambda関数を起動するサンプルを作成してみたので、それを中心に書きたいと思います。尚、実装言語はPythonとなります。

処理概要

今回の処理概要を図にすると以下のようになります。

lambda-sqs-asynchronous-distributed-processing

Pythonにより実装したLambda関数、SQS、DynamoDBを組み合わせました。一番左の「main.py」がタスクをQueueに分割してSQSに登録します。定期的に動いている「consumer.py」がSQSよりQueueを取得し、タスク毎に別Lambda関数「worker.py」を起動して処理を実行します。SQSはQueueを重複して取得する可能性があるため、一度取得したQueueのIDをDynamoDBに保存し、「worker.py」起動時にDynamoDBに登録済みか(つまり重複して取得したのか)をチェックするようにしました。

実装について

SQSとDynamoDB

SQSについては「sqs_workloads_queue」という名前でQueueを作成しました。DynamoDBについてはテーブル名「sqs_workloads_ids」、プライマリーキーは文字列型の「id」としました。

requirements.txt

以降はPythonのソースについての説明となります。先ずは今回のプロジェクトに必要なモジュールを定義する「requirements.txt」についてです。以下のようにしました。

requests
boto3

PythonのAWS SDKである「boto3」を使用します。プロジェクトのローカルにモジュールをインストールするため、以下のコマンドを実行します。

$ pip install -r requirements.txt -t .

main.py

タスクをQueueに分割してSQSに登録する「main.py」のソースです。

import logging
import json
from queue import Queue

logging.basicConfig(format='%(asctime)s - %(threadName)s - %(module)s:%(funcName)s(%(lineno)d) - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    try:
        logger.info('main start.')

        data = json.loads(json.dumps(event))
        queue = Queue()

        for attr in data.get('names'):
            queue.enqueue(attr.get('name'))

        logger.info('main end.')
        return 'main success.'
    except Exception as e:
        logger.error('error raise.')
        logger.error(e)
        return 'main error.'

if __name__ == "__main__":
    lambda_handler({"names":[{"name":"NameA"},{"name":"NameB"},{"name":"NameC"}]}, {})

今回は受信したjsonに複数件含まれている「name」属性を、1つずつのタスクに分割し、その値をQueueに渡すこととしました。16・17行目でこれを行っています。「queue.enqueue」でQueueに値を設定していますが、このメソッドについては後述することとします。

consumer.py

Queueを取得し、各タスク毎に別Lambda関数を起動する「consumer.py」です。

import logging
import json
from queue import Queue
from lambda_function import LambdaFunction
from dynamodb import DynamoDB

logging.basicConfig(format='%(asctime)s - %(threadName)s - %(module)s:%(funcName)s(%(lineno)d) - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    try:
        logger.info('consumer start.')

        dynamodb = DynamoDB()
        lambda_fnc = LambdaFunction()

        queue = Queue()
        queues = queue.dequeue()
        for queue in queues:
            if dynamodb.is_exists(queue['MessageId']) == False:
                json = queue['Body']
                lambda_fnc.invoke_worker(json)
                print 'worker invoke. json = ' + json
                dynamodb.save(queue['MessageId'])

        logger.info('consumer end.')
        return 'consumer success.'
    except Exception as e:
        logger.error('error raise.')
        logger.error(e)
        return 'consumer error.'

if __name__ == "__main__":
    lambda_handler({}, {})

19行目でSQSよりQueueを取得します。20行目以下のループで、DynamoDBにID(MessageID)が存在するかのチェック(21行目)・Queueより値を取得して別Lambda関数の起動(23行目)・処理済みのIDをDynamoDBに登録(25行目)を行っています。各処理で呼び出したLambdaFunction・DynamoDBクラスについては後述します。

worker.py

タスク毎に起動されるLambda関数「worker.py」です。

import logging
import json
from queue import Queue

logging.basicConfig(format='%(asctime)s - %(threadName)s - %(module)s:%(funcName)s(%(lineno)d) - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    try:
        logger.info('worker start.')

        data = json.loads(json.dumps(event))
        print('worker received name = ' + data['name'])
        
        logger.info('worker end.')
        return 'worker success.'
    except Exception as e:
        logger.error('error raise.')
        logger.error(e)
        return 'worker error.'

if __name__ == "__main__":
    lambda_handler({"name":"host1"}, {})

この関数は受け取ったjsonを解析し、nameの値を出力するだけのものです。13・14行目で行っています。

queue.py

Queueへの登録、Queueからの取得を行うメソッドを記述した「queue.py」です。

import boto3

class Queue:
    QUEUE_NAME = 'sqs_workloads_queue'
    MAX_QUEUE_COUNT = 10
    REGION = 'ap-northeast-1'

    def enqueue(self, name):
        client = self.__client()

        response = client.send_message(
            QueueUrl=self.__queue_url(),
            MessageBody='{"name": "%s"}' % name
        )

        return response['ResponseMetadata']['HTTPStatusCode'] == 200

    def dequeue(self):
        client = self.__client()

        response = client.receive_message(
            QueueUrl=self.__queue_url(),
            MaxNumberOfMessages=self.MAX_QUEUE_COUNT
        )

        messages = []
        if 'Messages' in response:
            for message in response['Messages']:
                messages.append(message)
                client.delete_message(
                    QueueUrl=self.__queue_url(),
                    ReceiptHandle=message['ReceiptHandle']
                )

        return messages

    def __client(self):
        return boto3.client('sqs', region_name=self.REGION)

    def __queue_url(self):
        return self.__client().get_queue_url(QueueName=self.QUEUE_NAME)['QueueUrl']

Queueへの登録が「enqueue」メソッド、Queueからの取得が「dequeue」メソッドとなります。「dequeue」メソッドについてはQueueから取得後に削除も行っています。

dynamodb.py

DynamoDBへの登録、存在チェックを行う「dynamodb.py」です。

import boto3

class DynamoDB:
    REGION = 'ap-northeast-1'

    def save(self, id):
        client = self.__client()

        response = client.update_item(
            TableName='sqs_workloads_ids',
            Key={
                'id': {
                    'S': id
                }
            }
        )

        return True

    def is_exists(self, id):
        client = self.__client()

        response = client.get_item(
            TableName='sqs_workloads_ids',
            Key={
                'id': {
                    'S': id
                }
            },
            ConsistentRead=True
        )

        return 'Item' in response

    def __client(self):
        return boto3.client('dynamodb', region_name=self.REGION)

DynamoDBへの登録が「save」メソッド、存在チェックが「exists」メソッドとなります。

lambda_function.py

Lambda関数を起動する「lambda_function.py」となります。

import boto3
import base64

class LambdaFunction:
    REGION = 'ap-northeast-1'
    WORKER_FUNCTION = 'sqs_workers_worker'

    def invoke_worker(self, json):
        client = boto3.client('lambda', region_name=self.REGION)

        response = client.invoke(
            FunctionName=self.WORKER_FUNCTION,
            InvocationType='Event',
            LogType='Tail',
            Payload=json
        )

        return True

実行について

プロジェクトをLambdaに配置するためのzipを作成します。作成したzipを使い、「main.py」「consumer.py」「worker.py」を実行する3つのLambda関数を作成します。各Lambda関数には適切なロールを付与するのを忘れないで下さい。

「main.py」を実行するLambda関数の起動引数に、以下の様なjsonを指定します。

{
    "names":[
        {"name":"Name1"},
        {"name":"Name2"},
        {"name":"Name3"},
        {"name":"Name4"},
        {"name":"Name5"}
    ]
}

「main.py」のLambda関数を起動すると、SQSにQueueが登録されます。「consumer.py」のLambda関数をスケジューリングによる起動 or 手動による起動をすると、タスクとして「worker.py」が呼び出されて以下の様なログがCloudWatch Logに出力されるはずです(出力されるメッセージのName自体はランダムで異なります)。

[INFO]	2016-06-14T03:24:06.79Z	7319afca-31df-11e6-9449-e3fd9e0e0473	worker start. 
worker received name = Name4 
[INFO]	2016-06-14T03:24:06.79Z	7319afca-31df-11e6-9449-e3fd9e0e0473	worker end. 
END RequestId: 7319afca-31df-11e6-9449-e3fd9e0e0473 
REPORT RequestId: 7319afca-31df-11e6-9449-e3fd9e0e0473	Duration: 0.50 ms	Billed Duration: 100 ms Memory Size: 128 MB	Max Memory Used: 21 MB	 
START RequestId: 7384588d-31df-11e6-887e-29a6d11c36e7 Version: $LATEST 
[INFO]	2016-06-14T03:24:06.804Z	7384588d-31df-11e6-887e-29a6d11c36e7	worker start. 
worker received name = Name5 
[INFO]	2016-06-14T03:24:06.804Z	7384588d-31df-11e6-887e-29a6d11c36e7	worker end.

参考サイト

Integrate SQS and Lambda: Serverless Architecture for Asynchronous Workloads