この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
こんにちは、CX事業本部 IoT事業部の若槻です。
AWS Step Functionsでは、ParallelStateを使用すればState Machineで並列処理を実装することができます。
今回は、Step FunctionsでのParallel Stateによる並列処理をAWS CDK v2で実装してみました。
やってみた
実装
AWS CDK v2(TypeScript)で次のようなCDKスタックを作成します。
lib/process-stack.ts
import { Construct } from 'constructs';
import { aws_stepfunctions, Duration, Stack, StackProps } from 'aws-cdk-lib';
export class ProcessStack extends Stack {
constructor(scope: Construct, id: string, props: StackProps) {
super(scope, id, props);
// Pass1
const pass1 = new aws_stepfunctions.Pass(this, 'pass1', {
parameters: {
val: 'クラス',
},
resultPath: '$.pass1Output',
});
// Pass2
const pass2 = new aws_stepfunctions.Pass(this, 'pass2', {
parameters: {
val: 'メソッド',
},
resultPath: '$.pass2Output',
});
// Wait
const wait = new aws_stepfunctions.Wait(this, 'wait', {
time: aws_stepfunctions.WaitTime.duration(Duration.seconds(30)),
});
// Parallel
const parallel = new aws_stepfunctions.Parallel(this, 'parallel');
parallel.branch(pass1);
parallel.branch(pass2.next(wait));
// 各Branchの出力をconcat
const concatPass = new aws_stepfunctions.Pass(this, 'concatPass', {
parameters: {
concat: aws_stepfunctions.JsonPath.format(
'{}{}',
aws_stepfunctions.JsonPath.stringAt('$[0].pass1Output.val'),
aws_stepfunctions.JsonPath.stringAt('$[1].pass2Output.val'),
),
},
});
// State Machine
new aws_stepfunctions.StateMachine(this, 'stateMachine', {
stateMachineName: 'stateMachine',
definition: parallel.next(concatPass),
});
}
}
- Parallelを使用して並列処理を実装しています。
- Parallelでは並列化したいStateをBranchとして追加します。
- Parallelでは各Branchの結果が配列形式で出力されます。
concatPass
でそれらの結果を文字列結合しています。
上記をCDK Deployしてスタックをデプロイします。これにより次のDefinitionのステートマシンが作成されます。
Definition
{
"StartAt": "parallel",
"States": {
"parallel": {
"Type": "Parallel",
"Next": "concatPass",
"Branches": [
{
"StartAt": "pass1",
"States": {
"pass1": {
"Type": "Pass",
"ResultPath": "$.pass1Output",
"Parameters": {
"val": "クラス"
},
"End": true
}
}
},
{
"StartAt": "pass2",
"States": {
"pass2": {
"Type": "Pass",
"ResultPath": "$.pass2Output",
"Parameters": {
"val": "メソッド"
},
"Next": "wait"
},
"wait": {
"Type": "Wait",
"Seconds": 30,
"End": true
}
}
}
]
},
"concatPass": {
"Type": "Pass",
"Parameters": {
"concat.$": "States.Format('{}{}', $[0].pass1Output.val, $[1].pass2Output.val)"
},
"End": true
}
}
}
動作確認
State Machineの実行を開始します。
Graph viewをみると、Parallel内のすべてのBranchが完了するまでParalellの次のステートへの遷移は待機されています。
すべてのBranchが完了し、State Machineが完了しました。
concatPass
のInputは次のような配列形式となっています。
concat input
[
{
"Comment": "Insert your JSON here",
"pass1Output": {
"val": "クラス"
}
},
{
"Comment": "Insert your JSON here",
"pass2Output": {
"val": "メソッド"
}
}
]
各Branchの結果が結合できています。
concat output
{
"concat": "クラスメソッド"
}
Parallelの結果の並び順について
ここでParallel Stateの結果の各Branchの配列内の並び順の決まり方が気になりました。
そこで検証としてpass1
から始まるBranchとpass2
から始まるBranchの順番を入れ替えてみます。
Definition
{
"StartAt": "parallel",
"States": {
"parallel": {
"Type": "Parallel",
"Next": "concatPass",
"Branches": [
{
"StartAt": "pass2",
"States": {
"pass2": {
"Type": "Pass",
"ResultPath": "$.pass2Output",
"Parameters": {
"val": "メソッド"
},
"Next": "wait"
},
"wait": {
"Type": "Wait",
"Seconds": 30,
"End": true
}
}
},
{
"StartAt": "pass1",
"States": {
"pass1": {
"Type": "Pass",
"ResultPath": "$.pass1Output",
"Parameters": {
"val": "クラス"
},
"End": true
}
}
}
]
},
"concatPass": {
"Type": "Pass",
"Parameters": {
"concat.$": "States.Format('{}{}', $[0].pass1Output.val, $[1].pass2Output.val)"
},
"End": true
}
}
}
State Machineを実行すると、1番目がpass2
、2番目がpass1
となりました。結果の並び順はState Machine定義上でのBranchの並び順に依存するようです。(実行が完了した順ではない)
おわりに
AWS Step FunctionsでParallel Stateによる並列処理を実装してみました。
Parallel Stateを使うことにより依存関係に無いが同列な処理をまとめられ、また実行時間の短縮も図れます。使いこなしていきたいですね。
以上