AWS CDK+StepFunctionsでエラー時に実行するタスクへエラー元タスクのイベントを渡す方法

AWS CDK+StepFunctionsでエラー時に実行するタスクへエラー元のイベントを渡す方法を紹介します。CDK+DynamoDBの記述方法も少し紹介しています。
2020.07.20

はじめに

CX事業本部の佐藤智樹です。

今回はAWS CDK+StepFunctionsでエラー時に実行するタスクへエラー元のイベントを渡す方法を紹介します。具体的には以下のようなイメージです。

初めてStepFunctionsを使ったところ、どこに設定を入れれば良いのか分からず詰まったので記事にしました。

本記事は上記の内容がCDKで行いたい方に参考になるかと思います。またStepFunctionsのASLも記載するので、ASLで上記の実装をやりたい方にも参考になるかと思います。

AWS CDK+StepFunctionsの実装については以下の動画を参考にさせてもらいました。本記事の環境は以下の動画と同様の基本的な構成をベースとしています。

環境情報

名称 バージョン
cdk 1.46.0

事前準備

実例を紹介するために、CDKのスタックでエラーを起こすためのタスクとエラー時のタスクを準備します。何でも良いのですが今回はDynamoDBへの書き込みタスクと削除タスクを作って書き込みタスクでエラーを起こします。先に書き込むためのDynamoDBテーブルを用意します。

lib/cdk-datastore.ts

import * as cdk from '@aws-cdk/core';

import * as dynamodb from '@aws-cdk/aws-dynamodb';

export class DataStoreStack extends cdk.Stack {
  constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    new dynamodb.Table(this, `DynamodbStore`, {
      partitionKey: {
        name: 'id',
        type: dynamodb.AttributeType.STRING,
      },
      tableName: `Store`,
      readCapacity: 1,
      writeCapacity: 1
    });
  }
}

次に上記のDynamoDBに書き込むタスクや削除するタスクなどを組み込むStepFunctionsを定義します。StepFunctionsのFlowを定義する部分で、resultPathを設定することでエラー時の処理にeventを渡すことができます。実際に渡されているか確認するために、同じidでDynamoDBへ削除処理を行います。

lib/cdk-stepfunctions-stack

import * as cdk from '@aws-cdk/core';

import * as dynamodb from '@aws-cdk/aws-dynamodb';
import * as sfn from '@aws-cdk/aws-stepfunctions';
import * as tasks from '@aws-cdk/aws-stepfunctions-tasks';

export class StepfunctionsStack extends cdk.Stack {
  constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    // DynamoDBへの書き込み処理、データが重複すればエラーとなる()
    const storeDynamoWrite = new tasks.DynamoPutItem(this, 'StoreTable',{
      item: {id: tasks.DynamoAttributeValue.fromString(sfn.JsonPath.stringAt('$.id')),},
      table: dynamodb.Table.fromTableArn(this,'StoreTableArn',`arn:aws:dynamodb:*:*:table/Store`,),
      expressionAttributeNames:{
        '#id':'id'
      },
      conditionExpression: 'attribute_not_exists(#id)',
    })

    // DynamoDBへの削除処理
    const storeRecoveryState =new tasks.DynamoDeleteItem(this, 'StoreTableRecovery',{
      key: { id: tasks.DynamoAttributeValue.fromString(sfn.JsonPath.stringAt('$.id')) },
      table: dynamodb.Table.fromTableArn(this,'StoreTableRecoveryArn',`arn:aws:dynamodb:*:*:table/Store`,)
    })

    // StepFunctionsのFlowを定義する
    // resultPathを設定するとstoreDynamoWriteに渡されたeventがstoreRecoveryStateに渡る。
    const definition = storeDynamoWrite.addCatch(storeRecoveryState,{resultPath:'$.error'});

    new sfn.StateMachine(this, 'stateMachine',{
      definition: definition,
    })
  }
}

上記のスタックを呼び出すため、packeage.jsonで指定したbin配下のファイルを編集します。

bin/cdk-stepfunctions.ts

#!/usr/bin/env node
import 'source-map-support/register';
import * as cdk from '@aws-cdk/core';
import { StepfunctionsStack } from '../lib/cdk-stepfunctions-stack';
import { DataStoreStack } from '../lib/cdk-datastore-stack'

const app = new cdk.App();
new StepfunctionsStack(app, 'StepfunctionsStack');
new DataStoreStack(app, 'DataStoreStack');

上記のスタック設定でデプロイします。デプロイ方法などについてはCDKの公式や紹介した動画などを参考にしてください。

最終的にデプロイされたASLのJSONは以下のようになります。

{
    "StartAt": "StoreTable",
    "States": {
        "StoreTable": {
            "End": true,
            "Catch": [
                {
                    "ErrorEquals": [
                        "States.ALL"
                    ],
                    "ResultPath": "$.error",
                    "Next": "StoreTableRecovery"
                }
            ],
            "Type": "Task",
            "Resource": "arn:aws:states:::dynamodb:putItem",
            "Parameters": {
                "Item": {
                    "id": {
                        "S.$": "$.id"
                    }
                },
                "TableName": "Store",
                "ConditionExpression": "attribute_not_exists(#id)",
                "ExpressionAttributeNames": {
                    "#id": "id"
                }
            }
        },
        "StoreTableRecovery": {
            "End": true,
            "Type": "Task",
            "Resource": "arn:aws:states:::dynamodb:deleteItem",
            "Parameters": {
                "Key": {
                    "id": {
                        "S.$": "$.id"
                    }
                },
                "TableName": "Store"
            }
        }
    }
}

実施

先ほどの設定でデプロイ後に実行してみます。実行時には以下のようにidカラムを含んだイベントを含んだWeb画面から実行してみます。

初回実行時はDynamoDBのテーブルにデータがないので最初のタスクが成功してエラー時のタスクは呼び出されません。

次にもう一度同じ設定でStepFunctionsを実行してみます。今度はエラー時のタスクが実行されていることを確認できます。

エラー時のタスクをクリックして入力を確認すると「StoreTable」タスクに渡したイベントがそのまま取得できていることが確認できます。以上で正しくイベントを渡せることが確認できました。

参考資料

aws-stepfunctions module · AWS CDK

Package - @aws-cdk/aws-stepfunctions-tasks