この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
何らかのトリガーによって定期的に処理が呼びだしされるものの、実行中の場合はスキップしたい...EventBridge(CloudWatch Events)から複数回起動される可能性をなくしたい...そんなことがあると思います。
そうした要件には、冪等性を確保するなど処理を行うアプリケーション側で対応していることが多いかと思います。
今回は、何らかの理由でアプリケーションの改修が行えないかつ、該当の処理はStep Functionsにて呼び出しされていることを前提として、重複実行を許容しないステートマシンを作成してみました。
制限のある前提のため、利用できるケースは限られるかと思いますが、こういうやり方もあるか〜程度に、参考にしていただけれと思います。
実装
Lambda FunctionとStep Functionsを作成します。今回は、AWS SAMテンプレートを用意しましたので、構築についての詳細は割愛しています。処理内容を中心に説明していきます。
ちなみに、AWS SAMテンプレートでStep Functionsを扱う際は、以下を参考にしてください。
Lambda Function作成
ここでは、重複実行のチェックはLambda Functionで行います。こちらのLambda Functionは後ほど構築するStep Functionsから呼び出しされるものとなります。
import os
import boto3
import logging
sfn_client = boto3.client("stepfunctions")
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
sfn_arn = event["StateMachineId"] #呼び出し元ステートマシンARN(SAM側でステートマシンのARNを設定すると循環参照となる)
sfn_current_exec_name = event["ExecutionName"] #呼び出し元ステートマシンの実行名取得
# イベントデータロギング
logger.info(F" EventData : {event}")
# 実行中のステートマシン取得
response = sfn_client.list_executions(
stateMachineArn = sfn_arn,
statusFilter = "RUNNING", #実行中のステートマシンでフィルタ(一致する実行を取得)
)
# 実行中のステートマシンフラグ
running_flg = False
# 実行中のステートマシンがあるかチェック
for sfn_exec_name in response["executions"]:
# 呼び出し元ステートマシンの実行名ロギング
logger.info(F" Execution name of the running Statemachine : {sfn_exec_name['name']}")
# 呼び出し元以外の実行があるかどうか
if sfn_current_exec_name != sfn_exec_name["name"] :
running_flg = True
logger.error("There is a running state machine")
break
return {
"running_flg": running_flg
}
呼び出し元のステートマシンから、「ステートマシンのArn」と「実行名」をハンドラーで受け取ります。該当のステートマシンの実行状況を確認し、受け取った「実行名」以外の実行があるかチェックします。実行有無のチェック結果を、呼び出し元のステートマシンに返すような処理です。
Step Functions作成
先程のLambda Functionにイベントデータ(「ステートマシンのArn」と「実行名」)を渡し、重複実行の有無によって、処理を分岐するステートマシンを作成しました。
{
"StartAt": "get execution id",
"States": {
"get execution id": {
"Type": "Pass",
"Parameters": {
"StateMachineId.$" : "$$.StateMachine.Id",
"ExecutionName.$" : "$$.Execution.Name"
},
"Next": "check running statemachine"
},
"check running statemachine": {
"Type": "Task",
"Resource": "${LambdaFunction}",
"Next": "choice"
},
"choice": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.running_flg",
"BooleanEquals": true,
"Next": "fail"
}
],
"Default": "processing"
},
"processing": {
"Type": "Wait",
"Seconds": 300,
"Next": "succeed"
},
"succeed": {
"Type": "Succeed"
},
"fail": {
"Type": "Fail",
"Error": "ErrorCode",
"Cause": "There is a running state machine."
}
}
}
処理内容について簡単に説明します。
get execution id
ステート
ステートタイプPassのため、処理はなくデータを生成しています。生成するデータは「ステートマシンのArn」と「実行名」で、Context オブジェクトより取得しています。
check running statemachine
ステート
ステートタイプTaskで、先程のLambda Functionを呼び出しています。先のステートで取得したデータはLambda Functionに渡され、重複実行の有無がステートマシンに返されます。
choice
ステート
ステートタイプChoiceで、check running statemachine
で取得した、重複実行の有無を判定して処理を分岐しています。重複実行がない場合に限り、実施したい処理に進みます。
processing
ステート
こちらのステートをアプリケーションの処理に見立てています。本来実施したい処理をここのステートで実施します。ここでは、ステートタイプWaitで、指定時間待機するだけとなっています。
動作確認
今回は、重複実行が行われないことが確認できればいいので、今回は手動でステートマシンを実行します。
まずは、実行に必要なステートマシンのARNを取得します。ステートマシン名でフィルタリングしてARNを取得しています。
$ SFN_NAME=duplication_control_sfn
$ SFN_ARN=`aws stepfunctions list-state-machines \
--query "stateMachines[?name=='$SFN_NAME'].stateMachineArn" \
--output text`
ステートマシンを実行します。
$ aws stepfunctions start-execution \
--state-machine-arn ${SFN_ARN}
{
"executionArn": "arn:aws:states:ap-northeast-1:XXXXXXXXXXXX:execution:duplication_control_sfn:7fcc7494-6d19-471b-8911-f9ac6cc25fe8",
"startDate": "2020-06-25T16:12:29.849000+09:00"
}
コンソールを確認すると該当のステートマシンが、実行名7fcc7494-6d19-471b-8911-f9ac6cc25fe8
で実行されていることがわかります。
実行状況を確認します。
ステートマシン実行時にインプットデータは渡していませんが、get execution id
ステートで現在の実行名7fcc7494-6d19-471b-8911-f9ac6cc25fe8
等が取得できていることがわかります。
check running statemachine
ステートに実行名などが渡されています。今回は、重複実行がないため、Lambda Functionが重複実行ナシの値(false)を返しています。
その値をchoice
ステートで判定し処理が分岐され、実際の処理を行うprocessing
ステートが開始されていることが確認できました。
この状態で、再度ステートマシンを実行をします。
$ aws stepfunctions start-execution \
--state-machine-arn ${SFN_ARN}
{
"executionArn": "arn:aws:states:ap-northeast-1:XXXXXXXXXXXX:execution:duplication_control_sfn:2c2ebce1-796b-4e49-84d9-254ce00467ab",
"startDate": "2020-06-25T16:14:17.365000+09:00"
}
ステートマシンの実行状態を確認すると、実行名2c2ebce1-796b-4e49-84d9-254ce00467ab
は失敗していることがわかります。
実行中のステートマシンが存在しているので、check running statemachine
が重複実行アリの値(true)を返し、その結果をchoice
ステートが判定し重複実行を抑止しました。
という感じで、重複実行を許可しないステートマシンの完成です。
さいごに
作り込みをしていますが、制限のある前提の中で重複実行を抑止することができました。
Step FunctionsはDynamoDBと統合されているので、アプリケーションの内容によっては、DynamoDBからステートを取得するようなことで、今回のような作り込みもなくせるかもしれません。
今回はアプリケーションの処理内容を考慮せず、Step Functionsで実装するなら...といった内容になっておりますので、重複実行を抑止したいといった際は、処理内容を鑑みて実装方法を検討いただければと思います。