AWS Step FunctionsでParallelステートによる並列処理を実装してみた(AWS CDK v2)

2022.06.22

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、CX事業本部 IoT事業部の若槻です。

AWS Step Functionsでは、ParallelStateを使用すればState Machineで並列処理を実装することができます。

今回は、Step FunctionsでのParallel Stateによる並列処理をAWS CDK v2で実装してみました。

やってみた

実装

AWS CDK v2(TypeScript)で次のようなCDKスタックを作成します。

lib/process-stack.ts

import { Construct } from 'constructs';
import { aws_stepfunctions, Duration, Stack, StackProps } from 'aws-cdk-lib';

export class ProcessStack extends Stack {
  constructor(scope: Construct, id: string, props: StackProps) {
    super(scope, id, props);

    // Pass1
    const pass1 = new aws_stepfunctions.Pass(this, 'pass1', {
      parameters: {
        val: 'クラス',
      },
      resultPath: '$.pass1Output',
    });

    // Pass2
    const pass2 = new aws_stepfunctions.Pass(this, 'pass2', {
      parameters: {
        val: 'メソッド',
      },
      resultPath: '$.pass2Output',
    });

    // Wait
    const wait = new aws_stepfunctions.Wait(this, 'wait', {
      time: aws_stepfunctions.WaitTime.duration(Duration.seconds(30)),
    });

    // Parallel
    const parallel = new aws_stepfunctions.Parallel(this, 'parallel');
    parallel.branch(pass1);
    parallel.branch(pass2.next(wait));

    // 各Branchの出力をconcat
    const concatPass = new aws_stepfunctions.Pass(this, 'concatPass', {
      parameters: {
        concat: aws_stepfunctions.JsonPath.format(
          '{}{}',
          aws_stepfunctions.JsonPath.stringAt('$[0].pass1Output.val'),
          aws_stepfunctions.JsonPath.stringAt('$[1].pass2Output.val'),
        ),
      },
    });

    // State Machine
    new aws_stepfunctions.StateMachine(this, 'stateMachine', {
      stateMachineName: 'stateMachine',
      definition: parallel.next(concatPass),
    });
  }
}
  • Parallelを使用して並列処理を実装しています。
    • Parallelでは並列化したいStateをBranchとして追加します。
    • Parallelでは各Branchの結果が配列形式で出力されます。concatPassでそれらの結果を文字列結合しています。

上記をCDK Deployしてスタックをデプロイします。これにより次のDefinitionのステートマシンが作成されます。

Definition

{
  "StartAt": "parallel",
  "States": {
    "parallel": {
      "Type": "Parallel",
      "Next": "concatPass",
      "Branches": [
        {
          "StartAt": "pass1",
          "States": {
            "pass1": {
              "Type": "Pass",
              "ResultPath": "$.pass1Output",
              "Parameters": {
                "val": "クラス"
              },
              "End": true
            }
          }
        },
        {
          "StartAt": "pass2",
          "States": {
            "pass2": {
              "Type": "Pass",
              "ResultPath": "$.pass2Output",
              "Parameters": {
                "val": "メソッド"
              },
              "Next": "wait"
            },
            "wait": {
              "Type": "Wait",
              "Seconds": 30,
              "End": true
            }
          }
        }
      ]
    },
    "concatPass": {
      "Type": "Pass",
      "Parameters": {
        "concat.$": "States.Format('{}{}', $[0].pass1Output.val, $[1].pass2Output.val)"
      },
      "End": true
    }
  }
}

動作確認

State Machineの実行を開始します。

Graph viewをみると、Parallel内のすべてのBranchが完了するまでParalellの次のステートへの遷移は待機されています。

すべてのBranchが完了し、State Machineが完了しました。

concatPassのInputは次のような配列形式となっています。

concat input

[
  {
    "Comment": "Insert your JSON here",
    "pass1Output": {
      "val": "クラス"
    }
  },
  {
    "Comment": "Insert your JSON here",
    "pass2Output": {
      "val": "メソッド"
    }
  }
]

各Branchの結果が結合できています。

concat output

{
  "concat": "クラスメソッド"
}

Parallelの結果の並び順について

ここでParallel Stateの結果の各Branchの配列内の並び順の決まり方が気になりました。

そこで検証としてpass1から始まるBranchとpass2から始まるBranchの順番を入れ替えてみます。

Definition

{
  "StartAt": "parallel",
  "States": {
    "parallel": {
      "Type": "Parallel",
      "Next": "concatPass",
      "Branches": [
        {
          "StartAt": "pass2",
          "States": {
            "pass2": {
              "Type": "Pass",
              "ResultPath": "$.pass2Output",
              "Parameters": {
                "val": "メソッド"
              },
              "Next": "wait"
            },
            "wait": {
              "Type": "Wait",
              "Seconds": 30,
              "End": true
            }
          }
        },
        {
          "StartAt": "pass1",
          "States": {
            "pass1": {
              "Type": "Pass",
              "ResultPath": "$.pass1Output",
              "Parameters": {
                "val": "クラス"
              },
              "End": true
            }
          }
        }
      ]
    },
    "concatPass": {
      "Type": "Pass",
      "Parameters": {
        "concat.$": "States.Format('{}{}', $[0].pass1Output.val, $[1].pass2Output.val)"
      },
      "End": true
    }
  }
}

State Machineを実行すると、1番目がpass2、2番目がpass1となりました。結果の並び順はState Machine定義上でのBranchの並び順に依存するようです。(実行が完了した順ではない)

おわりに

AWS Step FunctionsでParallel Stateによる並列処理を実装してみました。

Parallel Stateを使うことにより依存関係に無いが同列な処理をまとめられ、また実行時間の短縮も図れます。使いこなしていきたいですね。

以上