Amazon SQSを使ってAWS Step Functionsで長時間かかる処理が終わるのを待つ
AWS Step Functionsの中に(外部APIの呼び出しなどで)長時間かかることもある処理がある場合に、その処理の結果を待ち、重複を防ぐために処理中は新しい処理のリクエストをしないという仕組みが必要になり、実装してみました。
重複の発生する実装(SQS未使用の実装)
次の手順で1分毎に実行するStep Functionsを定義します。
$ sls create --template aws-python3 --path sample-app $ cd sample-app $ sls plugin install -n serverless-python-requirements $ sls plugin install -n serverless-prune-plugin $ sls plugin install -n serverless-step-functions $ sls plugin install -n serverless-pseudo-parameters
serverless.yml
を次のように記述します。
service: sample-app plugins: - serverless-python-requirements - serverless-prune-plugin - serverless-step-functions - serverless-pseudo-parameters custom: defaultStage: dev defaultRegion: ap-northeast-1 defaultProfile: default enableTrigger: dev: true stg: false prod: false prune: automatic: true number: 3 provider: name: aws runtime: python3.8 stage: ${opt:stage, self:custom.defaultStage} profile: ${opt:profile, self:custom.defaultProfile} memorySize: 128 region: ${opt:region, self:custom.defaultRegion} functions: CallSomeLongApi: handler: handler.lambda_handler name: "sample-app-${self:provider.stage}-call-some-long-api" description: 'レスポンスが長時間得られないこともあるAPIの呼び出し' environment: STAGE: ${self:provider.stage} package: include: - /** timeout: 120 stepFunctions: stateMachines: CallSomeLongApiOrchestrator: name: "sample-app-${self:provider.stage}-call-some-long-api-orchestrator" events: - schedule: name: "some-app-${self:provider.stage}-call-some-long-api-orchestrator" description: '1分ごとに処理を実行する' rate: rate(1 minute) enabled: ${self:custom.enableTrigger.${self:provider.stage}} definition: StartAt: Task Call Some Long Api States: Task Call Some Long Api: Type: Task Resource: Fn::GetAtt: [CallSomeLongApi, Arn] InputPath: "$" ResultPath: "$.body" OutputPath: "$" End: true
ステートマシンは次の構造になります。
「Task Call Some Long Api」で実行する処理をhandler.py
に記述します。
handler.py
を次のように記述します。
import json import time import random import logging logging.basicConfig( format="%(asctime)s %(module)s:%(funcName)s(%(lineno)d) - %(levelname)s - %(message)s" ) logger = logging.getLogger() logger.setLevel(logging.INFO) def lambda_handler(event, context): body = { "message": "Go Serverless v1.0! Your function executed successfully!", "input": event } response = { "statusCode": 200, "body": json.dumps(body) } # 長時間かかる場合もあるAPIを呼び出す処理に見立てる process_time = random.randint(40, 120) logger.info("%d秒かかるAPIコールが発生しています。", process_time) time.sleep(process_time) return response
time.sleep
で40秒から120秒かかるAPI処理に見立てたhandler.lambda_handlerを1分単位に実行することで、次画像のように複数の処理が同時に走る状況を作りました。
SQSを使用して重複を発生させない実装
serverless.yml
を次のように記述します。
service: sample-app plugins: - serverless-python-requirements - serverless-prune-plugin - serverless-step-functions - serverless-pseudo-parameters custom: defaultStage: dev defaultRegion: ap-northeast-1 defaultProfile: default enableTrigger: dev: true stg: false prod: false prune: automatic: true number: 3 provider: name: aws runtime: python3.8 stage: ${opt:stage, self:custom.defaultStage} profile: ${opt:profile, self:custom.defaultProfile} memorySize: 128 region: ${opt:region, self:custom.defaultRegion} iamRoleStatements: - Effect: Allow Action: - sqs:* Resource: - "*" functions: EnQueue: handler: functions/enqueue/handler.lambda_handler name: "sample-app-${self:provider.stage}-enqueue" description: 'SQSへのメッセージの入力。まだ稼働中のメッセージがあったら削除する' environment: STAGE: ${self:provider.stage} package: include: - /functions/enqueue/** CallSomeLongApi: handler: functions/call-some-long-api/handler.lambda_handler name: "sample-app-${self:provider.stage}-call-some-long-api" description: 'レスポンスが長時間得られないこともあるAPIの呼び出し' environment: STAGE: ${self:provider.stage} package: include: - /functions/call-some-long-api/** timeout: 120 events: - sqs: arn: Fn::GetAtt: [SampleAppQueue, Arn] batchSize: 1 stepFunctions: stateMachines: CallSomeLongApiOrchestrator: name: "sample-app-${self:provider.stage}-call-some-long-api-orchestrator" events: - schedule: name: "some-app-${self:provider.stage}-call-some-long-api-orchestrator" description: '1分ごとに処理を実行する' rate: rate(1 minute) enabled: ${self:custom.enableTrigger.${self:provider.stage}} definition: StartAt: EnQueue States: EnQueue: Type: Task Resource: Fn::GetAtt: [EnQueueLambdaFunction, Arn] InputPath: "$" ResultPath: "$.body" OutputPath: "$" End: true resources: Resources: SampleAppQueue: Type: "AWS::SQS::Queue" Properties: QueueName: "SampleAppQueue.fifo" FifoQueue: true VisibilityTimeout: 120
SQSのリソースとしてFIFOキューを定義して、このFIFOキューに1分単位にメッセージをエンキューするEnQueue関数を定義しました。
なお、キューのVisibilityTimeout
の値はLambdaファンクションのタイムアウト値より大きい必要があるので注意してください。ここでは共に120秒に設定しています。
resources: Resources: SampleAppQueue: Type: "AWS::SQS::Queue" Properties: QueueName: "SampleAppQueue.fifo" FifoQueue: true VisibilityTimeout: 120
また、CallSomeLongApi関数はFIFOキューへのエンキューをトリガーに動作するように定義しています。
CallSomeLongApi: : events: - sqs: arn: Fn::GetAtt: [SampleAppQueue, Arn] batchSize: 1
ステートマシンは次の構造になります。
Producerの実装
EnQueue関数(Producer)の実装は次の通りです。
import logging import boto3 import datetime import uuid logging.basicConfig( format="%(asctime)s %(module)s:%(funcName)s(%(lineno)d) - %(levelname)s - %(message)s" ) logger = logging.getLogger() logger.setLevel(logging.INFO) sqs = boto3.client('sqs') sqsr = boto3.resource('sqs') queue_name = 'SampleAppQueue.fifo' def get_queue_name(): return queue_name def get_queue_resource(): return sqsr.get_queue_by_name(QueueName=get_queue_name()) def get_queue_url(sqs): response = sqs.get_queue_url(QueueName=get_queue_name()) return response['QueueUrl'] def lambda_handler(event, context): qr = get_queue_resource() count_of_message_processed = qr.attributes.get('ApproximateNumberOfMessagesNotVisible') logger.info('count of message: %s' % count_of_message_processed) date_now = datetime.datetime.now() if int(count_of_message_processed) > 0: # 処理中のメッセージがあったら何もしない logger.info('処理中のメッセージがあります。処理を抜けます。 date: %s' % date_now) return else: # 処理中のメッセージがなかったらエンキューする logger.info('処理中のメッセージがありません。処理を続けます。 date: %s' % date_now) response = sqs.send_message( QueueUrl=get_queue_url(sqs), MessageBody="message date : %s" % datetime.datetime.now(), MessageGroupId="sampleApp", MessageDeduplicationId=str(uuid.uuid4()) ) return response
SQSのGetQueueAttributesアクションで処理中のメッセージ数を取得しています。
qr = get_queue_resource() count_of_message_processed = qr.attributes.get('ApproximateNumberOfMessagesNotVisible')
ApproximateNumberOfMessagesNotVisibleには処理中1のメッセージの数が入っています。 注意が必要なのは、FIFOキューでなければ、ApproximateNumberOfMessagesNotVisibleは「おおよそ」の数が入るということです。 処理中のメッセージ数を正確に調べたかったのでFIFOキューを採用しています。
Amazon SQS メッセージの処理に必要なリソース - Amazon Simple Queue Service
処理中のメッセージがあればFIFOキューに新たなメッセージをエンキューせず、処理中のメッセージがなければFIFOキューに新たなメッセージをエンキューしています。
Consumerの実装
CallSomeLongApi関数(Consumer)の実装は次の通りです。
import json import logging import boto3 import time import random logging.basicConfig( format="%(asctime)s %(module)s:%(funcName)s(%(lineno)d) - %(levelname)s - %(message)s" ) logger = logging.getLogger() logger.setLevel(logging.INFO) sqs = boto3.client('sqs') queue_name = 'SampleAppQueue.fifo' def get_queue_name(): return queue_name def get_queue_url(sqs): response = sqs.get_queue_url(QueueName=get_queue_name()) return response['QueueUrl'] def lambda_handler(event, context): for record in event['Records']: # 長時間かかる場合もあるAPIを呼び出す処理に見立てる process_time = random.randint(40, 120) logger.info("%d秒かかるAPIコールが発生しています。", process_time) time.sleep(process_time) # メッセージを削除 sqs.delete_message(QueueUrl=get_queue_url(sqs), ReceiptHandle=record["receiptHandle"]) body = { "message": "Go Serverless v1.0! Your function executed successfully!", "input": event } response = { "statusCode": 200, "body": json.dumps(body) } return response
SQSへのエンキューをトリガーに設定した場合、送信されたメッセージはevent['Records']
からとれます。
処理(長いAPIコール)を行った後、メッセージを削除しています。
結果
次図に示す通り、処理中のメッセージがある場合は新たにメッセージをエンキューしない動きが確認できます。
まとめ
Amazon SQSのFIFOキューを使うことで正確にメッセージの処理状態を把握することができました。 このことを活用することで処理が終わるのを待ってから次のメッセージの処理に移る実装ができます。 Step Functionsで外部APIの呼び出しなどで長時間の処理が含まれる場合にご参考になればと思います。
- メッセージがクライアントに送信されたが、まだ削除されていない場合、または表示期限に達していない場合、メッセージは処理中とみなされます。 ↩