この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
AWS Step Functionsの中に(外部APIの呼び出しなどで)長時間かかることもある処理がある場合に、その処理の結果を待ち、重複を防ぐために処理中は新しい処理のリクエストをしないという仕組みが必要になり、実装してみました。
重複の発生する実装(SQS未使用の実装)
次の手順で1分毎に実行するStep Functionsを定義します。
$ sls create --template aws-python3 --path sample-app
$ cd sample-app
$ sls plugin install -n serverless-python-requirements
$ sls plugin install -n serverless-prune-plugin
$ sls plugin install -n serverless-step-functions
$ sls plugin install -n serverless-pseudo-parameters
serverless.yml
を次のように記述します。
service: sample-app
plugins:
- serverless-python-requirements
- serverless-prune-plugin
- serverless-step-functions
- serverless-pseudo-parameters
custom:
defaultStage: dev
defaultRegion: ap-northeast-1
defaultProfile: default
enableTrigger:
dev: true
stg: false
prod: false
prune:
automatic: true
number: 3
provider:
name: aws
runtime: python3.8
stage: ${opt:stage, self:custom.defaultStage}
profile: ${opt:profile, self:custom.defaultProfile}
memorySize: 128
region: ${opt:region, self:custom.defaultRegion}
functions:
CallSomeLongApi:
handler: handler.lambda_handler
name: "sample-app-${self:provider.stage}-call-some-long-api"
description: 'レスポンスが長時間得られないこともあるAPIの呼び出し'
environment:
STAGE: ${self:provider.stage}
package:
include:
- /**
timeout: 120
stepFunctions:
stateMachines:
CallSomeLongApiOrchestrator:
name: "sample-app-${self:provider.stage}-call-some-long-api-orchestrator"
events:
- schedule:
name: "some-app-${self:provider.stage}-call-some-long-api-orchestrator"
description: '1分ごとに処理を実行する'
rate: rate(1 minute)
enabled: ${self:custom.enableTrigger.${self:provider.stage}}
definition:
StartAt: Task Call Some Long Api
States:
Task Call Some Long Api:
Type: Task
Resource:
Fn::GetAtt: [CallSomeLongApi, Arn]
InputPath: "$"
ResultPath: "$.body"
OutputPath: "$"
End: true
ステートマシンは次の構造になります。
「Task Call Some Long Api」で実行する処理をhandler.py
に記述します。
handler.py
を次のように記述します。
import json
import time
import random
import logging
logging.basicConfig(
format="%(asctime)s %(module)s:%(funcName)s(%(lineno)d) - %(levelname)s - %(message)s"
)
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
body = {
"message": "Go Serverless v1.0! Your function executed successfully!",
"input": event
}
response = {
"statusCode": 200,
"body": json.dumps(body)
}
# 長時間かかる場合もあるAPIを呼び出す処理に見立てる
process_time = random.randint(40, 120)
logger.info("%d秒かかるAPIコールが発生しています。", process_time)
time.sleep(process_time)
return response
time.sleep
で40秒から120秒かかるAPI処理に見立てたhandler.lambda_handlerを1分単位に実行することで、次画像のように複数の処理が同時に走る状況を作りました。
SQSを使用して重複を発生させない実装
serverless.yml
を次のように記述します。
service: sample-app
plugins:
- serverless-python-requirements
- serverless-prune-plugin
- serverless-step-functions
- serverless-pseudo-parameters
custom:
defaultStage: dev
defaultRegion: ap-northeast-1
defaultProfile: default
enableTrigger:
dev: true
stg: false
prod: false
prune:
automatic: true
number: 3
provider:
name: aws
runtime: python3.8
stage: ${opt:stage, self:custom.defaultStage}
profile: ${opt:profile, self:custom.defaultProfile}
memorySize: 128
region: ${opt:region, self:custom.defaultRegion}
iamRoleStatements:
- Effect: Allow
Action:
- sqs:*
Resource:
- "*"
functions:
EnQueue:
handler: functions/enqueue/handler.lambda_handler
name: "sample-app-${self:provider.stage}-enqueue"
description: 'SQSへのメッセージの入力。まだ稼働中のメッセージがあったら削除する'
environment:
STAGE: ${self:provider.stage}
package:
include:
- /functions/enqueue/**
CallSomeLongApi:
handler: functions/call-some-long-api/handler.lambda_handler
name: "sample-app-${self:provider.stage}-call-some-long-api"
description: 'レスポンスが長時間得られないこともあるAPIの呼び出し'
environment:
STAGE: ${self:provider.stage}
package:
include:
- /functions/call-some-long-api/**
timeout: 120
events:
- sqs:
arn:
Fn::GetAtt: [SampleAppQueue, Arn]
batchSize: 1
stepFunctions:
stateMachines:
CallSomeLongApiOrchestrator:
name: "sample-app-${self:provider.stage}-call-some-long-api-orchestrator"
events:
- schedule:
name: "some-app-${self:provider.stage}-call-some-long-api-orchestrator"
description: '1分ごとに処理を実行する'
rate: rate(1 minute)
enabled: ${self:custom.enableTrigger.${self:provider.stage}}
definition:
StartAt: EnQueue
States:
EnQueue:
Type: Task
Resource:
Fn::GetAtt: [EnQueueLambdaFunction, Arn]
InputPath: "$"
ResultPath: "$.body"
OutputPath: "$"
End: true
resources:
Resources:
SampleAppQueue:
Type: "AWS::SQS::Queue"
Properties:
QueueName: "SampleAppQueue.fifo"
FifoQueue: true
VisibilityTimeout: 120
SQSのリソースとしてFIFOキューを定義して、このFIFOキューに1分単位にメッセージをエンキューするEnQueue関数を定義しました。
なお、キューのVisibilityTimeout
の値はLambdaファンクションのタイムアウト値より大きい必要があるので注意してください。ここでは共に120秒に設定しています。
resources:
Resources:
SampleAppQueue:
Type: "AWS::SQS::Queue"
Properties:
QueueName: "SampleAppQueue.fifo"
FifoQueue: true
VisibilityTimeout: 120
また、CallSomeLongApi関数はFIFOキューへのエンキューをトリガーに動作するように定義しています。
CallSomeLongApi:
:
events:
- sqs:
arn:
Fn::GetAtt: [SampleAppQueue, Arn]
batchSize: 1
ステートマシンは次の構造になります。
Producerの実装
EnQueue関数(Producer)の実装は次の通りです。
import logging
import boto3
import datetime
import uuid
logging.basicConfig(
format="%(asctime)s %(module)s:%(funcName)s(%(lineno)d) - %(levelname)s - %(message)s"
)
logger = logging.getLogger()
logger.setLevel(logging.INFO)
sqs = boto3.client('sqs')
sqsr = boto3.resource('sqs')
queue_name = 'SampleAppQueue.fifo'
def get_queue_name():
return queue_name
def get_queue_resource():
return sqsr.get_queue_by_name(QueueName=get_queue_name())
def get_queue_url(sqs):
response = sqs.get_queue_url(QueueName=get_queue_name())
return response['QueueUrl']
def lambda_handler(event, context):
qr = get_queue_resource()
count_of_message_processed = qr.attributes.get('ApproximateNumberOfMessagesNotVisible')
logger.info('count of message: %s' % count_of_message_processed)
date_now = datetime.datetime.now()
if int(count_of_message_processed) > 0:
# 処理中のメッセージがあったら何もしない
logger.info('処理中のメッセージがあります。処理を抜けます。 date: %s' % date_now)
return
else:
# 処理中のメッセージがなかったらエンキューする
logger.info('処理中のメッセージがありません。処理を続けます。 date: %s' % date_now)
response = sqs.send_message(
QueueUrl=get_queue_url(sqs),
MessageBody="message date : %s" % datetime.datetime.now(),
MessageGroupId="sampleApp",
MessageDeduplicationId=str(uuid.uuid4())
)
return response
SQSのGetQueueAttributesアクションで処理中のメッセージ数を取得しています。
qr = get_queue_resource()
count_of_message_processed = qr.attributes.get('ApproximateNumberOfMessagesNotVisible')
ApproximateNumberOfMessagesNotVisibleには処理中1のメッセージの数が入っています。 注意が必要なのは、FIFOキューでなければ、ApproximateNumberOfMessagesNotVisibleは「おおよそ」の数が入るということです。 処理中のメッセージ数を正確に調べたかったのでFIFOキューを採用しています。
Amazon SQS メッセージの処理に必要なリソース - Amazon Simple Queue Service
処理中のメッセージがあればFIFOキューに新たなメッセージをエンキューせず、処理中のメッセージがなければFIFOキューに新たなメッセージをエンキューしています。
Consumerの実装
CallSomeLongApi関数(Consumer)の実装は次の通りです。
import json
import logging
import boto3
import time
import random
logging.basicConfig(
format="%(asctime)s %(module)s:%(funcName)s(%(lineno)d) - %(levelname)s - %(message)s"
)
logger = logging.getLogger()
logger.setLevel(logging.INFO)
sqs = boto3.client('sqs')
queue_name = 'SampleAppQueue.fifo'
def get_queue_name():
return queue_name
def get_queue_url(sqs):
response = sqs.get_queue_url(QueueName=get_queue_name())
return response['QueueUrl']
def lambda_handler(event, context):
for record in event['Records']:
# 長時間かかる場合もあるAPIを呼び出す処理に見立てる
process_time = random.randint(40, 120)
logger.info("%d秒かかるAPIコールが発生しています。", process_time)
time.sleep(process_time)
# メッセージを削除
sqs.delete_message(QueueUrl=get_queue_url(sqs), ReceiptHandle=record["receiptHandle"])
body = {
"message": "Go Serverless v1.0! Your function executed successfully!",
"input": event
}
response = {
"statusCode": 200,
"body": json.dumps(body)
}
return response
SQSへのエンキューをトリガーに設定した場合、送信されたメッセージはevent['Records']
からとれます。
処理(長いAPIコール)を行った後、メッセージを削除しています。
結果
次図に示す通り、処理中のメッセージがある場合は新たにメッセージをエンキューしない動きが確認できます。
まとめ
Amazon SQSのFIFOキューを使うことで正確にメッセージの処理状態を把握することができました。 このことを活用することで処理が終わるのを待ってから次のメッセージの処理に移る実装ができます。 Step Functionsで外部APIの呼び出しなどで長時間の処理が含まれる場合にご参考になればと思います。
- メッセージがクライアントに送信されたが、まだ削除されていない場合、または表示期限に達していない場合、メッセージは処理中とみなされます。 ↩