[AWS Step Functions] マップステートの各イテレーター内での例外処理をキャッチする方法はあるのか確認してみた(AWS CDK)
こんにちは、CX事業本部 Delivery部の若槻です。
分散型アプリケーションをサーバーレスで構築できる AWS Step Functions では、配列に対する繰り返し処理を実行するステートとして マップステート(Map state)が利用できます。
マップステートでは配列の各要素に対するイテレーターが並列に実行され、各イテレーター内でタスクによる処理が行われます。
今回は、このマップステートで、各イテレーター内での例外処理(Fail)をキャッチする方法はあるのか、確認してみました。
エラーキャッチを実装しない場合
まずエラーキャッチを実装しない場合の挙動を確認します。
AWS CDK(TypeScript)でステートマシンを実装します。
import { Construct } from 'constructs'; import { aws_stepfunctions, Stack, StackProps } from 'aws-cdk-lib'; export class CdkSampleStack extends Stack { constructor(scope: Construct, id: string, props: StackProps) { super(scope, id, props); // マップステート内のタスク const pass1 = new aws_stepfunctions.Pass(this, 'pass1'); const pass2 = new aws_stepfunctions.Pass(this, 'pass2'); const pass3 = new aws_stepfunctions.Pass(this, 'pass3', { parameters: { // 1と配列内の要素を加算 mathAdd: aws_stepfunctions.JsonPath.mathAdd( 1, aws_stepfunctions.JsonPath.numberAt('$.item') ), }, }); // マップステート const mapState = new aws_stepfunctions.Map(this, 'mapState', { itemsPath: aws_stepfunctions.JsonPath.stringAt('$.items'), parameters: { 'item.$': '$$.Map.Item.Value', }, }); mapState.iterator(pass1.next(pass2).next(pass3)); new aws_stepfunctions.StateMachine(this, 'StateMachine', { stateMachineName: 'StateMachine', definition: mapState, }); } }
マップステート内で3つのパスが実行されるようにし、3つ目のパスでは、配列内の要素を利用した計算(加算処理)を行うようにしています。
ステートマシングラフは次のようになります。
すべてのイテレーターが Success する場合
次のような数値のみから成る配列を入力としてステートマシンを実行してみます。
{ "items": [1,2,3] }
するとマップステートのすべてのイテレーターおよびステートマシンの実行が成功しました。
一部のイテレーターが Fail する場合
続いて、次のような数値以外の要素を含む配列を入力としてステートマシンを実行してみます。
{ "items": [1,"あああ",3] }
するとイテレーター#1内のタスクが Fail し、またそれによってイテレーター#1および#2が Abort しており、ステートマシンの実行も Fail してしまっています。
あるイテレーター内のタスクが Fail した場合に、そのイテレーターのみ Abort して、他のイテレーターは継続して実行させる方法として、マップステート内のタスクに addCatch を設定する方法があります。
しかし今回のようにマップステート内のタスクが複数ある場合、すべてのタスクに addCatch を設定するのはあまりスマートではありませんね。そこで他に方法が無いか確認してみます。
確認してみた
マップステートに addCatch を設定してみる
まず思いついたのが、マップステート自体に addCatch
を設定する方法です。
AWS CDK のコードを修正します。
new aws_stepfunctions.StateMachine(this, 'StateMachine', { stateMachineName: 'StateMachine', definition: mapState.addCatch( new aws_stepfunctions.Pass(this, 'catchPass') ), });
mapState
に addCatch
を設定すると、次のようなステートマシングラフとなりました。
数値以外の要素を含む配列を入力としてステートマシンを実行してみます。
{ "items": [1,"あああ",3] }
するとまたマップステートで処理が abort してしまいました。
マップステート自体に addCatch
を設定しても、 Fail したイテレーターのみ Abort させることはできませんでした。
パラレルステートに addCatch を設定してみる
調べてみるとパラレルステートに addCatch
を設定すれば良いとの情報がありました。
試してみます。AWS CDK のコードを次のように修正します。
import { Construct } from 'constructs'; import { aws_stepfunctions, Stack, StackProps } from 'aws-cdk-lib'; export class CdkSampleStack extends Stack { constructor(scope: Construct, id: string, props: StackProps) { super(scope, id, props); // マップステート内のタスク const pass1 = new aws_stepfunctions.Pass(this, 'pass1'); const pass2 = new aws_stepfunctions.Pass(this, 'pass2'); const pass3 = new aws_stepfunctions.Pass(this, 'pass3', { parameters: { // 1と配列内の要素を加算 mathAdd: aws_stepfunctions.JsonPath.mathAdd( 1, aws_stepfunctions.JsonPath.numberAt('$.item') ), }, }); // パラレルステート const parallel = new aws_stepfunctions.Parallel(this, 'parallel'); parallel.branch(pass1.next(pass2).next(pass3)); // パラレルステートのエラーキャッチ parallel.addCatch(new aws_stepfunctions.Succeed(this, 'succeed')); // マップステート const mapState = new aws_stepfunctions.Map(this, 'mapState', { itemsPath: aws_stepfunctions.JsonPath.stringAt('$.items'), parameters: { 'item.$': '$$.Map.Item.Value', }, }); mapState.iterator(parallel); new aws_stepfunctions.StateMachine(this, 'StateMachine', { stateMachineName: 'StateMachine', definition: mapState, }); } }
ステートマシングラフは次のようになりました。マップステート内の各タスクがパラレルステートの1ブランチ内で実行されるようになり、そのブランチに addCatch
が設定されています。
数値以外の要素を含む配列を入力としてステートマシンを実行してみます。
{ "items": [1,"あああ",3] }
しかしマップステートは引き続き abort されてしまいました。
parallel.addCatch(new aws_stepfunctions.Succeed(this, 'succeed'))
によりブランチ内でキャッチされた Fail が問答無用で Success になると期待していたのですが、そうはなりませんでした。
"Type": "Fail" ステートで Fail させる必要がありそう
なぜ参考記事ではキャッチできて、私の実装ではできなかったのでしょうか。
前述の私の実装と参考記事の ASL を見比べてみると、参考記事ではパラレルステートのブランチ内で "Type": "Fail"
のステートで Fail を発生させていました。
そこで切り分けとして、私の実装でもパラレルステートのブランチ内で Fail するタスクを "Type": "Fail"
のステートで置き換えてみます。
{ "StartAt": "mapState", "States": { "mapState": { "Type": "Map", "End": true, "Parameters": { "item.$": "$$.Map.Item.Value" }, "Iterator": { "StartAt": "parallel", "States": { "parallel": { "Type": "Parallel", "End": true, "Catch": [ { "ErrorEquals": [ "States.ALL" ], "Next": "succeed" } ], "Branches": [ { "StartAt": "pass1", "States": { "pass1": { "Type": "Pass", "Next": "pass2" }, "pass2": { "Type": "Pass", "Next": "pass3" }, "pass3": { + "Type": "Fail", - "Type": "Pass", - "Parameters": { - "mathAdd.$": "States.MathAdd(1, $.item)" }, "End": true } } } ] }, "succeed": { "Type": "Succeed" } } }, "ItemsPath": "$.items" } } }
ステートマシンを実行すると Fail がキャッチされて各イテレーターおよびステートマシン実行が Success となりました。
パラレルステートに addCatch
を設定する方法は、"Type": "Fail"
のステートで Fail した場合にのみ有効なようです。
おわりに
AWS Step Functions でマップステートの各イテレーター全体の Fail をキャッチする方法はあるのか確認してみました。
結論としては、そのような方法は確認できませんでした。よってマップステート内で Fail が発生した場合にも他のイテレーターやステートマシン本体の実行が Abort および Fail しないようにする場合は、マップステート内のすべてのタスクに addCatch を設定する必要がありそうです。
参考
以上