こんにちは、CX事業本部 Delivery部の若槻です。
分散型アプリケーションをサーバーレスで構築できる AWS Step Functions では、配列に対する繰り返し処理を実行するステートとして マップステート(Map state)が利用できます。
マップステートでは配列の各要素に対するイテレーターが並列に実行され、各イテレーター内でタスクによる処理が行われます。
今回は、このマップステートで、各イテレーター内での例外処理(Fail)をキャッチする方法はあるのか、確認してみました。
エラーキャッチを実装しない場合
まずエラーキャッチを実装しない場合の挙動を確認します。
AWS CDK(TypeScript)でステートマシンを実装します。
lib/cdk-sample-app.ts
import { Construct } from 'constructs';
import { aws_stepfunctions, Stack, StackProps } from 'aws-cdk-lib';
export class CdkSampleStack extends Stack {
constructor(scope: Construct, id: string, props: StackProps) {
super(scope, id, props);
// マップステート内のタスク
const pass1 = new aws_stepfunctions.Pass(this, 'pass1');
const pass2 = new aws_stepfunctions.Pass(this, 'pass2');
const pass3 = new aws_stepfunctions.Pass(this, 'pass3', {
parameters: {
// 1と配列内の要素を加算
mathAdd: aws_stepfunctions.JsonPath.mathAdd(
1,
aws_stepfunctions.JsonPath.numberAt('$.item')
),
},
});
// マップステート
const mapState = new aws_stepfunctions.Map(this, 'mapState', {
itemsPath: aws_stepfunctions.JsonPath.stringAt('$.items'),
parameters: {
'item.$': '$$.Map.Item.Value',
},
});
mapState.iterator(pass1.next(pass2).next(pass3));
new aws_stepfunctions.StateMachine(this, 'StateMachine', {
stateMachineName: 'StateMachine',
definition: mapState,
});
}
}
マップステート内で3つのパスが実行されるようにし、3つ目のパスでは、配列内の要素を利用した計算(加算処理)を行うようにしています。
ステートマシングラフは次のようになります。
すべてのイテレーターが Success する場合
次のような数値のみから成る配列を入力としてステートマシンを実行してみます。
Input
{
"items": [1,2,3]
}
するとマップステートのすべてのイテレーターおよびステートマシンの実行が成功しました。
一部のイテレーターが Fail する場合
続いて、次のような数値以外の要素を含む配列を入力としてステートマシンを実行してみます。
Input
{
"items": [1,"あああ",3]
}
するとイテレーター#1内のタスクが Fail し、またそれによってイテレーター#1および#2が Abort しており、ステートマシンの実行も Fail してしまっています。
あるイテレーター内のタスクが Fail した場合に、そのイテレーターのみ Abort して、他のイテレーターは継続して実行させる方法として、マップステート内のタスクに addCatch を設定する方法があります。
しかし今回のようにマップステート内のタスクが複数ある場合、すべてのタスクに addCatch を設定するのはあまりスマートではありませんね。そこで他に方法が無いか確認してみます。
確認してみた
マップステートに addCatch を設定してみる
まず思いついたのが、マップステート自体に addCatch
を設定する方法です。
AWS CDK のコードを修正します。
lib/cdk-sample-app.ts
new aws_stepfunctions.StateMachine(this, 'StateMachine', {
stateMachineName: 'StateMachine',
definition: mapState.addCatch(
new aws_stepfunctions.Pass(this, 'catchPass')
),
});
mapState
に addCatch
を設定すると、次のようなステートマシングラフとなりました。
数値以外の要素を含む配列を入力としてステートマシンを実行してみます。
Input
{
"items": [1,"あああ",3]
}
するとまたマップステートで処理が abort してしまいました。
マップステート自体に addCatch
を設定しても、 Fail したイテレーターのみ Abort させることはできませんでした。
パラレルステートに addCatch を設定してみる
調べてみるとパラレルステートに addCatch
を設定すれば良いとの情報がありました。
試してみます。AWS CDK のコードを次のように修正します。
lib/cdk-sample-app.ts
import { Construct } from 'constructs';
import { aws_stepfunctions, Stack, StackProps } from 'aws-cdk-lib';
export class CdkSampleStack extends Stack {
constructor(scope: Construct, id: string, props: StackProps) {
super(scope, id, props);
// マップステート内のタスク
const pass1 = new aws_stepfunctions.Pass(this, 'pass1');
const pass2 = new aws_stepfunctions.Pass(this, 'pass2');
const pass3 = new aws_stepfunctions.Pass(this, 'pass3', {
parameters: {
// 1と配列内の要素を加算
mathAdd: aws_stepfunctions.JsonPath.mathAdd(
1,
aws_stepfunctions.JsonPath.numberAt('$.item')
),
},
});
// パラレルステート
const parallel = new aws_stepfunctions.Parallel(this, 'parallel');
parallel.branch(pass1.next(pass2).next(pass3));
// パラレルステートのエラーキャッチ
parallel.addCatch(new aws_stepfunctions.Succeed(this, 'succeed'));
// マップステート
const mapState = new aws_stepfunctions.Map(this, 'mapState', {
itemsPath: aws_stepfunctions.JsonPath.stringAt('$.items'),
parameters: {
'item.$': '$$.Map.Item.Value',
},
});
mapState.iterator(parallel);
new aws_stepfunctions.StateMachine(this, 'StateMachine', {
stateMachineName: 'StateMachine',
definition: mapState,
});
}
}
ステートマシングラフは次のようになりました。マップステート内の各タスクがパラレルステートの1ブランチ内で実行されるようになり、そのブランチに addCatch
が設定されています。
数値以外の要素を含む配列を入力としてステートマシンを実行してみます。
Input
{
"items": [1,"あああ",3]
}
しかしマップステートは引き続き abort されてしまいました。
parallel.addCatch(new aws_stepfunctions.Succeed(this, 'succeed'))
によりブランチ内でキャッチされた Fail が問答無用で Success になると期待していたのですが、そうはなりませんでした。
"Type": "Fail" ステートで Fail させる必要がありそう
なぜ参考記事ではキャッチできて、私の実装ではできなかったのでしょうか。
前述の私の実装と参考記事の ASL を見比べてみると、参考記事ではパラレルステートのブランチ内で "Type": "Fail"
のステートで Fail を発生させていました。
そこで切り分けとして、私の実装でもパラレルステートのブランチ内で Fail するタスクを "Type": "Fail"
のステートで置き換えてみます。
ASL
{
"StartAt": "mapState",
"States": {
"mapState": {
"Type": "Map",
"End": true,
"Parameters": {
"item.$": "$$.Map.Item.Value"
},
"Iterator": {
"StartAt": "parallel",
"States": {
"parallel": {
"Type": "Parallel",
"End": true,
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "succeed"
}
],
"Branches": [
{
"StartAt": "pass1",
"States": {
"pass1": {
"Type": "Pass",
"Next": "pass2"
},
"pass2": {
"Type": "Pass",
"Next": "pass3"
},
"pass3": {
+ "Type": "Fail",
- "Type": "Pass",
- "Parameters": {
- "mathAdd.$": "States.MathAdd(1, $.item)"
},
"End": true
}
}
}
]
},
"succeed": {
"Type": "Succeed"
}
}
},
"ItemsPath": "$.items"
}
}
}
ステートマシンを実行すると Fail がキャッチされて各イテレーターおよびステートマシン実行が Success となりました。
パラレルステートに addCatch
を設定する方法は、"Type": "Fail"
のステートで Fail した場合にのみ有効なようです。
おわりに
AWS Step Functions でマップステートの各イテレーター全体の Fail をキャッチする方法はあるのか確認してみました。
結論としては、そのような方法は確認できませんでした。よってマップステート内で Fail が発生した場合にも他のイテレーターやステートマシン本体の実行が Abort および Fail しないようにする場合は、マップステート内のすべてのタスクに addCatch を設定する必要がありそうです。
参考
以上