[AWS Step Functions] マップステートの各イテレーター内での例外処理をキャッチする方法はあるのか確認してみた(AWS CDK)

[AWS Step Functions] マップステートの各イテレーター内での例外処理をキャッチする方法はあるのか確認してみた(AWS CDK)

見つけられませんでした
Clock Icon2023.05.11

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、CX事業本部 Delivery部の若槻です。

分散型アプリケーションをサーバーレスで構築できる AWS Step Functions では、配列に対する繰り返し処理を実行するステートとして マップステート(Map state)が利用できます。

マップステートでは配列の各要素に対するイテレーターが並列に実行され、各イテレーター内でタスクによる処理が行われます。

今回は、このマップステートで、各イテレーター内での例外処理(Fail)をキャッチする方法はあるのか、確認してみました。

エラーキャッチを実装しない場合

まずエラーキャッチを実装しない場合の挙動を確認します。

AWS CDK(TypeScript)でステートマシンを実装します。

import { Construct } from 'constructs';
import { aws_stepfunctions, Stack, StackProps } from 'aws-cdk-lib';

export class CdkSampleStack extends Stack {
  constructor(scope: Construct, id: string, props: StackProps) {
    super(scope, id, props);

    // マップステート内のタスク
    const pass1 = new aws_stepfunctions.Pass(this, 'pass1');
    const pass2 = new aws_stepfunctions.Pass(this, 'pass2');
    const pass3 = new aws_stepfunctions.Pass(this, 'pass3', {
      parameters: {
        // 1と配列内の要素を加算
        mathAdd: aws_stepfunctions.JsonPath.mathAdd(
          1,
          aws_stepfunctions.JsonPath.numberAt('$.item')
        ),
      },
    });

    // マップステート
    const mapState = new aws_stepfunctions.Map(this, 'mapState', {
      itemsPath: aws_stepfunctions.JsonPath.stringAt('$.items'),
      parameters: {
        'item.$': '$$.Map.Item.Value',
      },
    });
    mapState.iterator(pass1.next(pass2).next(pass3));

    new aws_stepfunctions.StateMachine(this, 'StateMachine', {
      stateMachineName: 'StateMachine',
      definition: mapState,
    });
  }
}

マップステート内で3つのパスが実行されるようにし、3つ目のパスでは、配列内の要素を利用した計算(加算処理)を行うようにしています。

ステートマシングラフは次のようになります。

すべてのイテレーターが Success する場合

次のような数値のみから成る配列を入力としてステートマシンを実行してみます。

{
    "items": [1,2,3]
}

するとマップステートのすべてのイテレーターおよびステートマシンの実行が成功しました。

一部のイテレーターが Fail する場合

続いて、次のような数値以外の要素を含む配列を入力としてステートマシンを実行してみます。

{
    "items": [1,"あああ",3]
}

するとイテレーター#1内のタスクが Fail し、またそれによってイテレーター#1および#2が Abort しており、ステートマシンの実行も Fail してしまっています。

あるイテレーター内のタスクが Fail した場合に、そのイテレーターのみ Abort して、他のイテレーターは継続して実行させる方法として、マップステート内のタスクに addCatch を設定する方法があります。

しかし今回のようにマップステート内のタスクが複数ある場合、すべてのタスクに addCatch を設定するのはあまりスマートではありませんね。そこで他に方法が無いか確認してみます。

確認してみた

マップステートに addCatch を設定してみる

まず思いついたのが、マップステート自体に addCatch を設定する方法です。

AWS CDK のコードを修正します。

    new aws_stepfunctions.StateMachine(this, 'StateMachine', {
      stateMachineName: 'StateMachine',
      definition: mapState.addCatch(
        new aws_stepfunctions.Pass(this, 'catchPass')
      ),
    });

mapStateaddCatch を設定すると、次のようなステートマシングラフとなりました。

数値以外の要素を含む配列を入力としてステートマシンを実行してみます。

{
    "items": [1,"あああ",3]
}

するとまたマップステートで処理が abort してしまいました。

マップステート自体に addCatch を設定しても、 Fail したイテレーターのみ Abort させることはできませんでした。

パラレルステートに addCatch を設定してみる

調べてみるとパラレルステートに addCatch を設定すれば良いとの情報がありました。

試してみます。AWS CDK のコードを次のように修正します。

import { Construct } from 'constructs';
import { aws_stepfunctions, Stack, StackProps } from 'aws-cdk-lib';

export class CdkSampleStack extends Stack {
  constructor(scope: Construct, id: string, props: StackProps) {
    super(scope, id, props);

    // マップステート内のタスク
    const pass1 = new aws_stepfunctions.Pass(this, 'pass1');
    const pass2 = new aws_stepfunctions.Pass(this, 'pass2');
    const pass3 = new aws_stepfunctions.Pass(this, 'pass3', {
      parameters: {
        // 1と配列内の要素を加算
        mathAdd: aws_stepfunctions.JsonPath.mathAdd(
          1,
          aws_stepfunctions.JsonPath.numberAt('$.item')
        ),
      },
    });

    // パラレルステート
    const parallel = new aws_stepfunctions.Parallel(this, 'parallel');
    parallel.branch(pass1.next(pass2).next(pass3));

    // パラレルステートのエラーキャッチ
    parallel.addCatch(new aws_stepfunctions.Succeed(this, 'succeed'));

    // マップステート
    const mapState = new aws_stepfunctions.Map(this, 'mapState', {
      itemsPath: aws_stepfunctions.JsonPath.stringAt('$.items'),
      parameters: {
        'item.$': '$$.Map.Item.Value',
      },
    });
    mapState.iterator(parallel);

    new aws_stepfunctions.StateMachine(this, 'StateMachine', {
      stateMachineName: 'StateMachine',
      definition: mapState,
    });
  }
}

ステートマシングラフは次のようになりました。マップステート内の各タスクがパラレルステートの1ブランチ内で実行されるようになり、そのブランチに addCatch が設定されています。

数値以外の要素を含む配列を入力としてステートマシンを実行してみます。

{
    "items": [1,"あああ",3]
}

しかしマップステートは引き続き abort されてしまいました。

parallel.addCatch(new aws_stepfunctions.Succeed(this, 'succeed')) によりブランチ内でキャッチされた Fail が問答無用で Success になると期待していたのですが、そうはなりませんでした。

"Type": "Fail" ステートで Fail させる必要がありそう

なぜ参考記事ではキャッチできて、私の実装ではできなかったのでしょうか。

前述の私の実装と参考記事の ASL を見比べてみると、参考記事ではパラレルステートのブランチ内で "Type": "Fail" のステートで Fail を発生させていました。

そこで切り分けとして、私の実装でもパラレルステートのブランチ内で Fail するタスクを "Type": "Fail" のステートで置き換えてみます。

{
  "StartAt": "mapState",
  "States": {
    "mapState": {
      "Type": "Map",
      "End": true,
      "Parameters": {
        "item.$": "$$.Map.Item.Value"
      },
      "Iterator": {
        "StartAt": "parallel",
        "States": {
          "parallel": {
            "Type": "Parallel",
            "End": true,
            "Catch": [
              {
                "ErrorEquals": [
                  "States.ALL"
                ],
                "Next": "succeed"
              }
            ],
            "Branches": [
              {
                "StartAt": "pass1",
                "States": {
                  "pass1": {
                    "Type": "Pass",
                    "Next": "pass2"
                  },
                  "pass2": {
                    "Type": "Pass",
                    "Next": "pass3"
                  },
                  "pass3": {
+                    "Type": "Fail",
-                    "Type": "Pass",
-                    "Parameters": {
-                      "mathAdd.$": "States.MathAdd(1, $.item)"
                    },
                    "End": true
                  }
                }
              }
            ]
          },
          "succeed": {
            "Type": "Succeed"
          }
        }
      },
      "ItemsPath": "$.items"
    }
  }
}

ステートマシンを実行すると Fail がキャッチされて各イテレーターおよびステートマシン実行が Success となりました。

パラレルステートに addCatch を設定する方法は、"Type": "Fail" のステートで Fail した場合にのみ有効なようです。

おわりに

AWS Step Functions でマップステートの各イテレーター全体の Fail をキャッチする方法はあるのか確認してみました。

結論としては、そのような方法は確認できませんでした。よってマップステート内で Fail が発生した場合にも他のイテレーターやステートマシン本体の実行が Abort および Fail しないようにする場合は、マップステート内のすべてのタスクに addCatch を設定する必要がありそうです。

参考

以上

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.