AWS Step FunctionsでParallelステートによる並列処理を実装してみた(AWS CDK v2)
こんにちは、CX事業本部 IoT事業部の若槻です。
AWS Step Functionsでは、ParallelStateを使用すればState Machineで並列処理を実装することができます。
今回は、Step FunctionsでのParallel Stateによる並列処理をAWS CDK v2で実装してみました。
やってみた
実装
AWS CDK v2(TypeScript)で次のようなCDKスタックを作成します。
import { Construct } from 'constructs'; import { aws_stepfunctions, Duration, Stack, StackProps } from 'aws-cdk-lib'; export class ProcessStack extends Stack { constructor(scope: Construct, id: string, props: StackProps) { super(scope, id, props); // Pass1 const pass1 = new aws_stepfunctions.Pass(this, 'pass1', { parameters: { val: 'クラス', }, resultPath: '$.pass1Output', }); // Pass2 const pass2 = new aws_stepfunctions.Pass(this, 'pass2', { parameters: { val: 'メソッド', }, resultPath: '$.pass2Output', }); // Wait const wait = new aws_stepfunctions.Wait(this, 'wait', { time: aws_stepfunctions.WaitTime.duration(Duration.seconds(30)), }); // Parallel const parallel = new aws_stepfunctions.Parallel(this, 'parallel'); parallel.branch(pass1); parallel.branch(pass2.next(wait)); // 各Branchの出力をconcat const concatPass = new aws_stepfunctions.Pass(this, 'concatPass', { parameters: { concat: aws_stepfunctions.JsonPath.format( '{}{}', aws_stepfunctions.JsonPath.stringAt('$[0].pass1Output.val'), aws_stepfunctions.JsonPath.stringAt('$[1].pass2Output.val'), ), }, }); // State Machine new aws_stepfunctions.StateMachine(this, 'stateMachine', { stateMachineName: 'stateMachine', definition: parallel.next(concatPass), }); } }
- Parallelを使用して並列処理を実装しています。
- Parallelでは並列化したいStateをBranchとして追加します。
- Parallelでは各Branchの結果が配列形式で出力されます。
concatPass
でそれらの結果を文字列結合しています。
上記をCDK Deployしてスタックをデプロイします。これにより次のDefinitionのステートマシンが作成されます。
{ "StartAt": "parallel", "States": { "parallel": { "Type": "Parallel", "Next": "concatPass", "Branches": [ { "StartAt": "pass1", "States": { "pass1": { "Type": "Pass", "ResultPath": "$.pass1Output", "Parameters": { "val": "クラス" }, "End": true } } }, { "StartAt": "pass2", "States": { "pass2": { "Type": "Pass", "ResultPath": "$.pass2Output", "Parameters": { "val": "メソッド" }, "Next": "wait" }, "wait": { "Type": "Wait", "Seconds": 30, "End": true } } } ] }, "concatPass": { "Type": "Pass", "Parameters": { "concat.$": "States.Format('{}{}', $[0].pass1Output.val, $[1].pass2Output.val)" }, "End": true } } }
動作確認
State Machineの実行を開始します。
Graph viewをみると、Parallel内のすべてのBranchが完了するまでParalellの次のステートへの遷移は待機されています。
すべてのBranchが完了し、State Machineが完了しました。
concatPass
のInputは次のような配列形式となっています。
[ { "Comment": "Insert your JSON here", "pass1Output": { "val": "クラス" } }, { "Comment": "Insert your JSON here", "pass2Output": { "val": "メソッド" } } ]
各Branchの結果が結合できています。
{ "concat": "クラスメソッド" }
Parallelの結果の並び順について
ここでParallel Stateの結果の各Branchの配列内の並び順の決まり方が気になりました。
そこで検証としてpass1
から始まるBranchとpass2
から始まるBranchの順番を入れ替えてみます。
{ "StartAt": "parallel", "States": { "parallel": { "Type": "Parallel", "Next": "concatPass", "Branches": [ { "StartAt": "pass2", "States": { "pass2": { "Type": "Pass", "ResultPath": "$.pass2Output", "Parameters": { "val": "メソッド" }, "Next": "wait" }, "wait": { "Type": "Wait", "Seconds": 30, "End": true } } }, { "StartAt": "pass1", "States": { "pass1": { "Type": "Pass", "ResultPath": "$.pass1Output", "Parameters": { "val": "クラス" }, "End": true } } } ] }, "concatPass": { "Type": "Pass", "Parameters": { "concat.$": "States.Format('{}{}', $[0].pass1Output.val, $[1].pass2Output.val)" }, "End": true } } }
State Machineを実行すると、1番目がpass2
、2番目がpass1
となりました。結果の並び順はState Machine定義上でのBranchの並び順に依存するようです。(実行が完了した順ではない)
おわりに
AWS Step FunctionsでParallel Stateによる並列処理を実装してみました。
Parallel Stateを使うことにより依存関係に無いが同列な処理をまとめられ、また実行時間の短縮も図れます。使いこなしていきたいですね。
以上