AWS Step FunctionsステートマシンのMapステート内でエラーをキャッチする(AWS CDK v2)

2022.05.29

こんにちは、CX事業本部 IoT事業部の若槻です。

AWS Step FunctionsステートマシンのMapステートを使うと、アイテムの配列に対してMap処理をすることができます。しかし既定の動作ではMapステート内のいずれかの実行が失敗したら、Mapステート自体が失敗したとして扱われてしまいます。また、その際にまだ実行されていないアイテムに対する処理はAbortされてしまいます。

そこで今回は、AWS Step FunctionsステートマシンのMapステート内でエラーをキャッチする構成をAWS CDK v2で作ってみました。

やってみた

エラーキャッチを実装しない場合

まず、Mapステート内でエラーキャッチをしない場合の実装をCDKで行います。

Mapステート内で実行されるLambda関数では、入力された数値のべき乗を計算して出力します。もし数値以外が入力されたらエラーがスローされます。

lib/process-stack.ts

import { Construct } from 'constructs';
import {
  aws_lambda,
  aws_stepfunctions,
  aws_stepfunctions_tasks,
  Stack,
  StackProps,
} from 'aws-cdk-lib';

export class ProcessStack extends Stack {
  constructor(scope: Construct, id: string, props: StackProps) {
    super(scope, id, props);

    //Mapステート
    const mapState = new aws_stepfunctions.Map(this, 'mapState', {
      itemsPath: aws_stepfunctions.JsonPath.stringAt('$.items'),
      parameters: {
        'item.$': '$$.Map.Item.Value',
      },
    });

    //Lambda実行タスク
    const culcTask = new aws_stepfunctions_tasks.LambdaInvoke(
      this,
      'culcTask',
      {
        lambdaFunction: new aws_lambda.Function(this, 'myFunc', {
          functionName: 'culcFunc',
          code: aws_lambda.Code.fromInline(`
            exports.handler = (event, context, callback) => {
              if (isNaN(event.item)){
                throw new Error()
              }
              callback(null, event.item * event.item);
            };
          `),
          runtime: aws_lambda.Runtime.NODEJS_14_X,
          handler: 'index.handler',
        }),
        payload: aws_stepfunctions.TaskInput.fromJsonPathAt('$'),
      }
    );

    //Mapステートのイテレータ
    mapState.iterator(culcTask);

    //Step Functionsステートマシン
    new aws_stepfunctions.StateMachine(this, 'stateMachine', {
      stateMachineName: 'myStateMachine',
      definition: mapState,
    });
  }
}

CDK Deployしてステートマシンを作成します。

全てのLambda実行が成功した場合

Inputとして数値の配列を指定してステートマシンを実行します。

{
    "items": [1,2,3]
}

実行が成功しました。

Mapステートの実行結果を見ると、べき乗された数値が出力されています。

一部のLambda実行が失敗した場合

2番目のアイテムに文字列とした入力で実行してみます。

{
    "items": [1,"あああ",3]
}

2番目のアイテムのMapステートの処理が失敗しました。また1および3番目のアイテムではAbortedとなり処理が実行されませんでした。

エラーキャッチを実装した場合

ステートマシンでエラーキャッチを実装したい場合はaddCatchを使用します。パッケージのソース上では次の部分で確認できます。

  /**
   * Add a recovery handler for this state
   *
   * When a particular error occurs, execution will continue at the error
   * handler instead of failing the state machine execution.
   */
  public addCatch(handler: IChainable, props: CatchProps = {}): TaskStateBase {
    super._addCatch(handler.startState, props);
    return this;
  }

先程のCDKコードを次のように変更します。

lib/process-stack.ts

    //Mapステートのイテレータ
    mapState.iterator(
      culcTask.addCatch(new aws_stepfunctions.Pass(this, 'culcTaskCatch'))
    );

CDK Deployしてステートマシンを変更します。

変更を反映できたら、先程と同じく2番目のアイテムに文字列とした入力で実行してみます。

{
    "items": [1,"あああ",3]
}

するとステートマシンの実行自体は成功しています。

しかしMapステートのアイテムごとの実行結果を見ると、1および3番目のアイテムでは実行が成功し、2番目のアイテムは失敗しています。

Graph Viewはそれぞれこんな感じです。

エラーとなったアイテムのタスク実行がエラーキャッチされることにより、Mapステートごと失敗はせず、Mapステート内の他のアイテムでの実行に影響が出ないようにすることができました!

おわりに

AWS Step FunctionsステートマシンのMapステート内でエラーをキャッチする構成をAWS CDK v2で作ってみました。

Mapステート内でアイテム毎に独立した処理を行う場合は、addCatchを使用したエラーキャッチの実装が必須だと思いました。またエラーキャッチが発生してもステートマシン自体は失敗とはならないので、タスクの失敗に気付けるように、エラーキャッチ時にメールやチャットへの通知を行うようにすると良さそうですね。

参考

以上