![[AWS Step Functions] マップステートの各イテレーター内での例外処理をキャッチする方法はあるのか確認してみた(AWS CDK)](https://devio2023-media.developers.io/wp-content/uploads/2022/08/aws-step-functions.png)
[AWS Step Functions] マップステートの各イテレーター内での例外処理をキャッチする方法はあるのか確認してみた(AWS CDK)
この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
こんにちは、CX事業本部 Delivery部の若槻です。
分散型アプリケーションをサーバーレスで構築できる AWS Step Functions では、配列に対する繰り返し処理を実行するステートとして マップステート(Map state)が利用できます。
マップステートでは配列の各要素に対するイテレーターが並列に実行され、各イテレーター内でタスクによる処理が行われます。
今回は、このマップステートで、各イテレーター内での例外処理(Fail)をキャッチする方法はあるのか、確認してみました。
エラーキャッチを実装しない場合
まずエラーキャッチを実装しない場合の挙動を確認します。
AWS CDK(TypeScript)でステートマシンを実装します。
import { Construct } from 'constructs';
import { aws_stepfunctions, Stack, StackProps } from 'aws-cdk-lib';
export class CdkSampleStack extends Stack {
  constructor(scope: Construct, id: string, props: StackProps) {
    super(scope, id, props);
    // マップステート内のタスク
    const pass1 = new aws_stepfunctions.Pass(this, 'pass1');
    const pass2 = new aws_stepfunctions.Pass(this, 'pass2');
    const pass3 = new aws_stepfunctions.Pass(this, 'pass3', {
      parameters: {
        // 1と配列内の要素を加算
        mathAdd: aws_stepfunctions.JsonPath.mathAdd(
          1,
          aws_stepfunctions.JsonPath.numberAt('$.item')
        ),
      },
    });
    // マップステート
    const mapState = new aws_stepfunctions.Map(this, 'mapState', {
      itemsPath: aws_stepfunctions.JsonPath.stringAt('$.items'),
      parameters: {
        'item.$': '$$.Map.Item.Value',
      },
    });
    mapState.iterator(pass1.next(pass2).next(pass3));
    new aws_stepfunctions.StateMachine(this, 'StateMachine', {
      stateMachineName: 'StateMachine',
      definition: mapState,
    });
  }
}
マップステート内で3つのパスが実行されるようにし、3つ目のパスでは、配列内の要素を利用した計算(加算処理)を行うようにしています。
ステートマシングラフは次のようになります。

すべてのイテレーターが Success する場合
次のような数値のみから成る配列を入力としてステートマシンを実行してみます。
{
    "items": [1,2,3]
}

するとマップステートのすべてのイテレーターおよびステートマシンの実行が成功しました。

一部のイテレーターが Fail する場合
続いて、次のような数値以外の要素を含む配列を入力としてステートマシンを実行してみます。
{
    "items": [1,"あああ",3]
}

するとイテレーター#1内のタスクが Fail し、またそれによってイテレーター#1および#2が Abort しており、ステートマシンの実行も Fail してしまっています。


あるイテレーター内のタスクが Fail した場合に、そのイテレーターのみ Abort して、他のイテレーターは継続して実行させる方法として、マップステート内のタスクに addCatch を設定する方法があります。
しかし今回のようにマップステート内のタスクが複数ある場合、すべてのタスクに addCatch を設定するのはあまりスマートではありませんね。そこで他に方法が無いか確認してみます。
確認してみた
マップステートに addCatch を設定してみる
まず思いついたのが、マップステート自体に addCatch を設定する方法です。
AWS CDK のコードを修正します。
    new aws_stepfunctions.StateMachine(this, 'StateMachine', {
      stateMachineName: 'StateMachine',
      definition: mapState.addCatch(
        new aws_stepfunctions.Pass(this, 'catchPass')
      ),
    });
mapState に addCatch を設定すると、次のようなステートマシングラフとなりました。

数値以外の要素を含む配列を入力としてステートマシンを実行してみます。
{
    "items": [1,"あああ",3]
}

するとまたマップステートで処理が abort してしまいました。
 

マップステート自体に addCatch を設定しても、 Fail したイテレーターのみ Abort させることはできませんでした。
パラレルステートに addCatch を設定してみる
調べてみるとパラレルステートに addCatch を設定すれば良いとの情報がありました。
試してみます。AWS CDK のコードを次のように修正します。
import { Construct } from 'constructs';
import { aws_stepfunctions, Stack, StackProps } from 'aws-cdk-lib';
export class CdkSampleStack extends Stack {
  constructor(scope: Construct, id: string, props: StackProps) {
    super(scope, id, props);
    // マップステート内のタスク
    const pass1 = new aws_stepfunctions.Pass(this, 'pass1');
    const pass2 = new aws_stepfunctions.Pass(this, 'pass2');
    const pass3 = new aws_stepfunctions.Pass(this, 'pass3', {
      parameters: {
        // 1と配列内の要素を加算
        mathAdd: aws_stepfunctions.JsonPath.mathAdd(
          1,
          aws_stepfunctions.JsonPath.numberAt('$.item')
        ),
      },
    });
    // パラレルステート
    const parallel = new aws_stepfunctions.Parallel(this, 'parallel');
    parallel.branch(pass1.next(pass2).next(pass3));
    // パラレルステートのエラーキャッチ
    parallel.addCatch(new aws_stepfunctions.Succeed(this, 'succeed'));
    // マップステート
    const mapState = new aws_stepfunctions.Map(this, 'mapState', {
      itemsPath: aws_stepfunctions.JsonPath.stringAt('$.items'),
      parameters: {
        'item.$': '$$.Map.Item.Value',
      },
    });
    mapState.iterator(parallel);
    new aws_stepfunctions.StateMachine(this, 'StateMachine', {
      stateMachineName: 'StateMachine',
      definition: mapState,
    });
  }
}
ステートマシングラフは次のようになりました。マップステート内の各タスクがパラレルステートの1ブランチ内で実行されるようになり、そのブランチに addCatch が設定されています。

数値以外の要素を含む配列を入力としてステートマシンを実行してみます。
{
    "items": [1,"あああ",3]
}

しかしマップステートは引き続き abort されてしまいました。

parallel.addCatch(new aws_stepfunctions.Succeed(this, 'succeed')) によりブランチ内でキャッチされた Fail が問答無用で Success になると期待していたのですが、そうはなりませんでした。
"Type": "Fail" ステートで Fail させる必要がありそう
なぜ参考記事ではキャッチできて、私の実装ではできなかったのでしょうか。
前述の私の実装と参考記事の ASL を見比べてみると、参考記事ではパラレルステートのブランチ内で "Type": "Fail" のステートで Fail を発生させていました。
そこで切り分けとして、私の実装でもパラレルステートのブランチ内で Fail するタスクを "Type": "Fail" のステートで置き換えてみます。
{
  "StartAt": "mapState",
  "States": {
    "mapState": {
      "Type": "Map",
      "End": true,
      "Parameters": {
        "item.$": "$$.Map.Item.Value"
      },
      "Iterator": {
        "StartAt": "parallel",
        "States": {
          "parallel": {
            "Type": "Parallel",
            "End": true,
            "Catch": [
              {
                "ErrorEquals": [
                  "States.ALL"
                ],
                "Next": "succeed"
              }
            ],
            "Branches": [
              {
                "StartAt": "pass1",
                "States": {
                  "pass1": {
                    "Type": "Pass",
                    "Next": "pass2"
                  },
                  "pass2": {
                    "Type": "Pass",
                    "Next": "pass3"
                  },
                  "pass3": {
+                    "Type": "Fail",
-                    "Type": "Pass",
-                    "Parameters": {
-                      "mathAdd.$": "States.MathAdd(1, $.item)"
                    },
                    "End": true
                  }
                }
              }
            ]
          },
          "succeed": {
            "Type": "Succeed"
          }
        }
      },
      "ItemsPath": "$.items"
    }
  }
}
ステートマシンを実行すると Fail がキャッチされて各イテレーターおよびステートマシン実行が Success となりました。
 

パラレルステートに addCatch を設定する方法は、"Type": "Fail" のステートで Fail した場合にのみ有効なようです。
おわりに
AWS Step Functions でマップステートの各イテレーター全体の Fail をキャッチする方法はあるのか確認してみました。
結論としては、そのような方法は確認できませんでした。よってマップステート内で Fail が発生した場合にも他のイテレーターやステートマシン本体の実行が Abort および Fail しないようにする場合は、マップステート内のすべてのタスクに addCatch を設定する必要がありそうです。
参考
以上










