重複実行を許容しないステートマシンを構築してみた

アプリケーションの処理内容を考慮せず、Step Functionsで重複実行を抑止するなら...といった内容です。
2020.06.25

何らかのトリガーによって定期的に処理が呼びだしされるものの、実行中の場合はスキップしたい...EventBridge(CloudWatch Events)から複数回起動される可能性をなくしたい...そんなことがあると思います。

そうした要件には、冪等性を確保するなど処理を行うアプリケーション側で対応していることが多いかと思います。

今回は、何らかの理由でアプリケーションの改修が行えないかつ、該当の処理はStep Functionsにて呼び出しされていることを前提として、重複実行を許容しないステートマシンを作成してみました。

制限のある前提のため、利用できるケースは限られるかと思いますが、こういうやり方もあるか〜程度に、参考にしていただけれと思います。

実装

Lambda FunctionとStep Functionsを作成します。今回は、AWS SAMテンプレートを用意しましたので、構築についての詳細は割愛しています。処理内容を中心に説明していきます。

ちなみに、AWS SAMテンプレートでStep Functionsを扱う際は、以下を参考にしてください。

Lambda Function作成

ここでは、重複実行のチェックはLambda Functionで行います。こちらのLambda Functionは後ほど構築するStep Functionsから呼び出しされるものとなります。

import os
import boto3
import logging

sfn_client = boto3.client("stepfunctions")
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    sfn_arn = event["StateMachineId"]                 #呼び出し元ステートマシンARN(SAM側でステートマシンのARNを設定すると循環参照となる)
    sfn_current_exec_name = event["ExecutionName"]    #呼び出し元ステートマシンの実行名取得
    # イベントデータロギング
    logger.info(F" EventData : {event}")

    # 実行中のステートマシン取得
    response = sfn_client.list_executions(
        stateMachineArn = sfn_arn,
        statusFilter = "RUNNING", #実行中のステートマシンでフィルタ(一致する実行を取得)
    )

    # 実行中のステートマシンフラグ
    running_flg = False

    # 実行中のステートマシンがあるかチェック
    for sfn_exec_name in response["executions"]:
      # 呼び出し元ステートマシンの実行名ロギング
      logger.info(F" Execution name of the running Statemachine : {sfn_exec_name['name']}")

      # 呼び出し元以外の実行があるかどうか
      if sfn_current_exec_name != sfn_exec_name["name"] :
        running_flg = True
        logger.error("There is a running state machine")
        break

    return {
        "running_flg": running_flg
    }

呼び出し元のステートマシンから、「ステートマシンのArn」と「実行名」をハンドラーで受け取ります。該当のステートマシンの実行状況を確認し、受け取った「実行名」以外の実行があるかチェックします。実行有無のチェック結果を、呼び出し元のステートマシンに返すような処理です。

Step Functions作成

先程のLambda Functionにイベントデータ(「ステートマシンのArn」と「実行名」)を渡し、重複実行の有無によって、処理を分岐するステートマシンを作成しました。

{
  "StartAt": "get execution id",
  "States": {
    "get execution id": {
      "Type": "Pass",
      "Parameters": {
        "StateMachineId.$" : "$$.StateMachine.Id",
        "ExecutionName.$" : "$$.Execution.Name"
      },
      "Next": "check running statemachine"
    },
    "check running statemachine": {
      "Type": "Task",
      "Resource": "${LambdaFunction}",
      "Next": "choice"
    },
    "choice": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.running_flg",
          "BooleanEquals": true,
          "Next": "fail"
        }
      ],
      "Default": "processing"
    },
    "processing": {
      "Type": "Wait",
      "Seconds": 300,
      "Next": "succeed"
    },
    "succeed": {
      "Type": "Succeed"
    },
    "fail": {
      "Type": "Fail",
      "Error": "ErrorCode",
      "Cause": "There is a running state machine."
    }
  }
}

処理内容について簡単に説明します。

  • get execution idステート

ステートタイプPassのため、処理はなくデータを生成しています。生成するデータは「ステートマシンのArn」と「実行名」で、Context オブジェクトより取得しています。

  • check running statemachineステート

ステートタイプTaskで、先程のLambda Functionを呼び出しています。先のステートで取得したデータはLambda Functionに渡され、重複実行の有無がステートマシンに返されます。

  • choiceステート

ステートタイプChoiceで、check running statemachineで取得した、重複実行の有無を判定して処理を分岐しています。重複実行がない場合に限り、実施したい処理に進みます。

  • processingステート

こちらのステートをアプリケーションの処理に見立てています。本来実施したい処理をここのステートで実施します。ここでは、ステートタイプWaitで、指定時間待機するだけとなっています。

動作確認

今回は、重複実行が行われないことが確認できればいいので、今回は手動でステートマシンを実行します。

まずは、実行に必要なステートマシンのARNを取得します。ステートマシン名でフィルタリングしてARNを取得しています。

$ SFN_NAME=duplication_control_sfn
$ SFN_ARN=`aws stepfunctions list-state-machines \
    --query "stateMachines[?name=='$SFN_NAME'].stateMachineArn" \
    --output text`

ステートマシンを実行します。

$ aws stepfunctions start-execution \
    --state-machine-arn ${SFN_ARN}
{
    "executionArn": "arn:aws:states:ap-northeast-1:XXXXXXXXXXXX:execution:duplication_control_sfn:7fcc7494-6d19-471b-8911-f9ac6cc25fe8",
    "startDate": "2020-06-25T16:12:29.849000+09:00"
}

コンソールを確認すると該当のステートマシンが、実行名7fcc7494-6d19-471b-8911-f9ac6cc25fe8で実行されていることがわかります。

実行状況を確認します。

ステートマシン実行時にインプットデータは渡していませんが、get execution idステートで現在の実行名7fcc7494-6d19-471b-8911-f9ac6cc25fe8等が取得できていることがわかります。

check running statemachineステートに実行名などが渡されています。今回は、重複実行がないため、Lambda Functionが重複実行ナシの値(false)を返しています。

その値をchoiceステートで判定し処理が分岐され、実際の処理を行うprocessingステートが開始されていることが確認できました。

この状態で、再度ステートマシンを実行をします。

$ aws stepfunctions start-execution \
  --state-machine-arn ${SFN_ARN}
{
    "executionArn": "arn:aws:states:ap-northeast-1:XXXXXXXXXXXX:execution:duplication_control_sfn:2c2ebce1-796b-4e49-84d9-254ce00467ab",
    "startDate": "2020-06-25T16:14:17.365000+09:00"
}

ステートマシンの実行状態を確認すると、実行名2c2ebce1-796b-4e49-84d9-254ce00467abは失敗していることがわかります。

実行中のステートマシンが存在しているので、check running statemachineが重複実行アリの値(true)を返し、その結果をchoiceステートが判定し重複実行を抑止しました。

という感じで、重複実行を許可しないステートマシンの完成です。

さいごに

作り込みをしていますが、制限のある前提の中で重複実行を抑止することができました。

Step FunctionsはDynamoDBと統合されているので、アプリケーションの内容によっては、DynamoDBからステートを取得するようなことで、今回のような作り込みもなくせるかもしれません。

今回はアプリケーションの処理内容を考慮せず、Step Functionsで実装するなら...といった内容になっておりますので、重複実行を抑止したいといった際は、処理内容を鑑みて実装方法を検討いただければと思います。