重複実行を許容しないステートマシンを構築してみた
何らかのトリガーによって定期的に処理が呼びだしされるものの、実行中の場合はスキップしたい...EventBridge(CloudWatch Events)から複数回起動される可能性をなくしたい...そんなことがあると思います。
そうした要件には、冪等性を確保するなど処理を行うアプリケーション側で対応していることが多いかと思います。
今回は、何らかの理由でアプリケーションの改修が行えないかつ、該当の処理はStep Functionsにて呼び出しされていることを前提として、重複実行を許容しないステートマシンを作成してみました。
制限のある前提のため、利用できるケースは限られるかと思いますが、こういうやり方もあるか〜程度に、参考にしていただけれと思います。
実装
Lambda FunctionとStep Functionsを作成します。今回は、AWS SAMテンプレートを用意しましたので、構築についての詳細は割愛しています。処理内容を中心に説明していきます。
ちなみに、AWS SAMテンプレートでStep Functionsを扱う際は、以下を参考にしてください。
Lambda Function作成
ここでは、重複実行のチェックはLambda Functionで行います。こちらのLambda Functionは後ほど構築するStep Functionsから呼び出しされるものとなります。
import os import boto3 import logging sfn_client = boto3.client("stepfunctions") logger = logging.getLogger() logger.setLevel(logging.INFO) def lambda_handler(event, context): sfn_arn = event["StateMachineId"] #呼び出し元ステートマシンARN(SAM側でステートマシンのARNを設定すると循環参照となる) sfn_current_exec_name = event["ExecutionName"] #呼び出し元ステートマシンの実行名取得 # イベントデータロギング logger.info(F" EventData : {event}") # 実行中のステートマシン取得 response = sfn_client.list_executions( stateMachineArn = sfn_arn, statusFilter = "RUNNING", #実行中のステートマシンでフィルタ(一致する実行を取得) ) # 実行中のステートマシンフラグ running_flg = False # 実行中のステートマシンがあるかチェック for sfn_exec_name in response["executions"]: # 呼び出し元ステートマシンの実行名ロギング logger.info(F" Execution name of the running Statemachine : {sfn_exec_name['name']}") # 呼び出し元以外の実行があるかどうか if sfn_current_exec_name != sfn_exec_name["name"] : running_flg = True logger.error("There is a running state machine") break return { "running_flg": running_flg }
呼び出し元のステートマシンから、「ステートマシンのArn」と「実行名」をハンドラーで受け取ります。該当のステートマシンの実行状況を確認し、受け取った「実行名」以外の実行があるかチェックします。実行有無のチェック結果を、呼び出し元のステートマシンに返すような処理です。
Step Functions作成
先程のLambda Functionにイベントデータ(「ステートマシンのArn」と「実行名」)を渡し、重複実行の有無によって、処理を分岐するステートマシンを作成しました。
{ "StartAt": "get execution id", "States": { "get execution id": { "Type": "Pass", "Parameters": { "StateMachineId.$" : "$$.StateMachine.Id", "ExecutionName.$" : "$$.Execution.Name" }, "Next": "check running statemachine" }, "check running statemachine": { "Type": "Task", "Resource": "${LambdaFunction}", "Next": "choice" }, "choice": { "Type": "Choice", "Choices": [ { "Variable": "$.running_flg", "BooleanEquals": true, "Next": "fail" } ], "Default": "processing" }, "processing": { "Type": "Wait", "Seconds": 300, "Next": "succeed" }, "succeed": { "Type": "Succeed" }, "fail": { "Type": "Fail", "Error": "ErrorCode", "Cause": "There is a running state machine." } } }
処理内容について簡単に説明します。
get execution id
ステート
ステートタイプPassのため、処理はなくデータを生成しています。生成するデータは「ステートマシンのArn」と「実行名」で、Context オブジェクトより取得しています。
check running statemachine
ステート
ステートタイプTaskで、先程のLambda Functionを呼び出しています。先のステートで取得したデータはLambda Functionに渡され、重複実行の有無がステートマシンに返されます。
choice
ステート
ステートタイプChoiceで、check running statemachine
で取得した、重複実行の有無を判定して処理を分岐しています。重複実行がない場合に限り、実施したい処理に進みます。
processing
ステート
こちらのステートをアプリケーションの処理に見立てています。本来実施したい処理をここのステートで実施します。ここでは、ステートタイプWaitで、指定時間待機するだけとなっています。
動作確認
今回は、重複実行が行われないことが確認できればいいので、今回は手動でステートマシンを実行します。
まずは、実行に必要なステートマシンのARNを取得します。ステートマシン名でフィルタリングしてARNを取得しています。
$ SFN_NAME=duplication_control_sfn $ SFN_ARN=`aws stepfunctions list-state-machines \ --query "stateMachines[?name=='$SFN_NAME'].stateMachineArn" \ --output text`
ステートマシンを実行します。
$ aws stepfunctions start-execution \ --state-machine-arn ${SFN_ARN} { "executionArn": "arn:aws:states:ap-northeast-1:XXXXXXXXXXXX:execution:duplication_control_sfn:7fcc7494-6d19-471b-8911-f9ac6cc25fe8", "startDate": "2020-06-25T16:12:29.849000+09:00" }
コンソールを確認すると該当のステートマシンが、実行名7fcc7494-6d19-471b-8911-f9ac6cc25fe8
で実行されていることがわかります。
実行状況を確認します。
ステートマシン実行時にインプットデータは渡していませんが、get execution id
ステートで現在の実行名7fcc7494-6d19-471b-8911-f9ac6cc25fe8
等が取得できていることがわかります。
check running statemachine
ステートに実行名などが渡されています。今回は、重複実行がないため、Lambda Functionが重複実行ナシの値(false)を返しています。
その値をchoice
ステートで判定し処理が分岐され、実際の処理を行うprocessing
ステートが開始されていることが確認できました。
この状態で、再度ステートマシンを実行をします。
$ aws stepfunctions start-execution \ --state-machine-arn ${SFN_ARN} { "executionArn": "arn:aws:states:ap-northeast-1:XXXXXXXXXXXX:execution:duplication_control_sfn:2c2ebce1-796b-4e49-84d9-254ce00467ab", "startDate": "2020-06-25T16:14:17.365000+09:00" }
ステートマシンの実行状態を確認すると、実行名2c2ebce1-796b-4e49-84d9-254ce00467ab
は失敗していることがわかります。
実行中のステートマシンが存在しているので、check running statemachine
が重複実行アリの値(true)を返し、その結果をchoice
ステートが判定し重複実行を抑止しました。
という感じで、重複実行を許可しないステートマシンの完成です。
さいごに
作り込みをしていますが、制限のある前提の中で重複実行を抑止することができました。
Step FunctionsはDynamoDBと統合されているので、アプリケーションの内容によっては、DynamoDBからステートを取得するようなことで、今回のような作り込みもなくせるかもしれません。
今回はアプリケーションの処理内容を考慮せず、Step Functionsで実装するなら...といった内容になっておりますので、重複実行を抑止したいといった際は、処理内容を鑑みて実装方法を検討いただければと思います。