EventBridgeによるStep Functionsの重複実行を排除する方法

EventBridgeは1回のイベントに対し、複数回ターゲットを実行する可能性があります。ターゲットにStep Functionsを利用する際、この重複実行を排除する方法を紹介します。
2022.08.31

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、ヤギです。

EventBridgeをトリガーにStep Functionsを実行することは多いのではないでしょうか。
この際、EventBridgeは1つのイベントに対して、複数回ターゲットを実行する可能性があります。1
Step Functions内の処理が冪等性を担保する設計になっていれば問題ないですが、そうでない場合は重複実行を排除する必要があります。
そこで今回は、EventBridgeにより発行されるイベントIDをDynamoDBに保存し、重複実行を防ぐ方法をご紹介します。

また、似た目的でLambdaを使った方法もあります。

この方法は今回紹介する方法と違い、1つのステートマシンの並列実行を禁止するという処理です。
今回紹介する方法は、1つのイベントに対する重複実行を排除する仕組みです。並列して複数のイベントが発生した場合は、それぞれのイベントに対してワークフローが実行されます。
「ステートマシンの並列実行を禁止する」のか、「イベントごとに1回の実行を保証する(複数のイベントの並列実行は可能)」のか、必要なユースケースに合わせて手法を選択してください。

本題

それでは本題に入ります。

EventBridgeで発行されるイベントは一意のIDを持ちます。2
このイベントIDをDynamoDBに保存し、Step Functionsの実行時にDBに同一のイベントIDが存在する場合は処理をスキップすることで、重複実行を排除します。
この処理を、DynamoDBへの条件付き書き込みを使って実現します。
PutItemリクエストの条件に「同一IDのレコードが存在しないこと」を指定し、書き込みを行います。
この際、条件に一致しない場合、つまり同一のイベントIDの処理が既に実行されている場合はConditionalCheckFailedException エラーが発生するため、重複実行を回避することができます。

やってみた

では実際にワークフローを作って検証してみます。

DynamoDBの作成

まずイベントIDを記録するDynamoDBテーブルを作成します。
ここではテーブル名を eventHistory 、パーティションキーを eventId (文字列型)としました。
また、散発的な書き込みを想定し、キャパシティーモードはオンデマンドにしました。

Step Functionsの作成

以下のAmazon State Language (AST) でステートマシンを作成します。

{
  "StartAt": "DynamoDB PutItem",
  "States": {
    "DynamoDB PutItem": {
      "Type": "Task",
      "Resource": "arn:aws:states:::dynamodb:putItem",
      "Parameters": {
        "TableName": "eventHistory",
        "Item": {
          "eventId": {
            "S.$": "$.id"
          }
        },
        "ConditionExpression": "attribute_not_exists(eventId)"
      },
      "Next": "Do Something",
      "Catch": [
        {
          "ErrorEquals": [
            "DynamoDB.ConditionalCheckFailedException"
          ],
          "Next": "Already Executed"
        },
        {
          "ErrorEquals": [
            "States.TaskFailed"
          ],
          "Next": "Handle Error"
        }
      ],
      "ResultPath": null
    },
    "Do Something": {
      "Type": "Wait",
      "Seconds": 60,
      "End": true
    },
    "Handle Error": {
      "Type": "Fail"
    },
    "Already Executed": {
      "Type": "Fail"
    }
  }
}

最初のステート DynamoDB PutItem でDynamoDBテーブルに条件付きPutItemを実行し、イベントIDを保存しています。もし既にレコードが存在する場合は DynamoDB.ConditionalCheckFailedException が発生するため、エラーをキャッチして処理を終了させます。

以上のようにして、重複実行を排除する仕組みが完成しました。

検証

実際はEventBridgeを作成し、ターゲットにStep Functionsを設定します。しかしEventBridgeの重複実行はなかなか発生しません。そこで今回は、Step Functionsに手動で入力パラメータを渡し重複実行を再現します。
サンプルとして、スケジュールイベントのパラメータを使用します。

{
    "version": "0",
    "id": "53dc4d37-cffa-4f76-80c9-8b7d4a4d2eaa",
    "detail-type": "Scheduled Event",
    "source": "aws.events",
    "account": "123456789012",
    "time": "2015-10-08T16:53:06Z",
    "region": "us-east-1",
    "resources": [
        "arn:aws:events:us-east-1:123456789012:rule/my-scheduled-rule"
    ],
    "detail": {}
}

上記のJSONを入力として、ステートマシンを実行します。

最初の実行では、指定したイベントIDを持つイベントが過去に実行されていないため、処理が続行しています。

続いて新たな実行を、同じ入力を使って実行してみます。

既に同じイベントIDのレコードがDynamoDBに存在する、つまり過去に同じイベントが実行されているため、処理が終了しました。

これで想定通り、1つのイベントに対する重複実行を排除することが確認できました。

最後に

DynamoDBにイベントIDを保存することで、EventBridgeの重複実行を排除する仕組みを紹介しました。Step Functionsをターゲットにする場合、ワークフローにPutItemアクションを使用するだけで、比較的簡単に重複実行の排除を行うことができました。

繰り返しになりますが、この方法は1つのイベントに対する重複実行を排除する仕組みであり、ステートマシンの並列実行を防ぐ仕組みではありません。

目的に合わせて利用をご検討ください!

参考リンク

項目と属性の操作 条件付きの書き込み