Cloud workflowのワークフロー中で並列実行をしてみる

2023.04.24

はじめに

データアナリティクス事業本部のkobayashiです。

GoogleCloudのWorkflowsで並列でStepを実行できるParallel Stepsを使ってみたのでまとめます。

Workflows release notes  |  Google Cloud

Parallel Stepsとは

ワークフロー中で一つの処理をStepとして実行しますが、Parallel Stepsはその名の通りStepを同時実行するステップになります。 並列ステップを使う場面としては複数の独立したAPIへリクエストを送りそのレスポンスを取得したいがレスポンスまでの時間が長くかかかる場合において、逐次処理に比べ並列ステップを使い複数のAPIへリクエストを同時に行うことで全体の処理時間を圧縮できるため重宝します。

Parallel steps  |  Workflows  |  Google Cloud

記述の仕方としては以下のようになります。

並列ループ

- {ステップ名}:
    parallel:
      # exception_policy: continueAll
      shared: [ ret_val ]
      # concurrency_limit: 10
      for:
        value: test
        in: ${tests}

並列ブランチ

- {ステップ名}:
    parallel:
      # exception_policy: continueAll
      shared: [ ret_val ]
      # concurrency_limit: 10
      branches:
        - branch_1:
            ...
        - branch_2:
            ...

並列ループと並列ブランチを使うパターンで処理を行うステップの記述が異なりますがそれ以外の属性は同じです。

各属性はexception_policyは今日(2023/04/20)現在continueAllしか設定がないので設定する必要はありません。またsharedは並列ステップ内で書き込み可能な変数を指定します。concurrency_limitは同時に実行できる並列ステップ数になります。

実際の処理を行うステップは列ループの場合はforの配下に共通のステップを記述します。並列ブランチの場合はbranches配下に子ステップを続けて記述していきます。使い分けですが、並列ループはある程度共通化した処理を行う場合に使います。具体的にはAPIに対してリクエストを行うがそのパラメータが違う場合などに使用できるかと思います。一方、並列ブランチは子ステップの処理が異なる場合に使います。

Parallel Stepsを使ってみる

では実際にParallel Stepsを使ってみたいと思います。今回はAPIへリクエストを送るのではなく、単純にsys.logでログを書き出してみます。

並列ループ

はじめに並列ループを使ってみたいと思います。コードは以下になります。

parallel_for.yml

main:
  steps:
    - assign_value:
        assign:
          - str_val: "str_val"
          - int_val: 999
          - tests: [ "logging_1","logging_2","logging_3","logging_4","logging_5","logging_6","logging_7","logging_8","logging_9" ]
          - ret_val: [ ]
    - loggings:
        parallel:
          shared: [ ret_val ]
          for:
            value: test
            in: ${tests}
            steps:
              - logging_1:
                  call: sys.log
                  args:
                    text: ${test + " " + "str_val:" + str_val + ", int_val:" + int_val }
                    severity: INFO
              - append:
                  assign:
                    - ret_val: ${list.concat(ret_val,test)}
    - logging_end:
        call: sys.log
        args:
          text: ${ret_val}
          severity: INFO

行っていることは、loggingsを並列ステップとしてその配下で並列ループでtestsの中身をループ処理します。子ステップではステップ名の書き出しとステップ名を並列ステップのshared変数に追記します。すべての子ステップが完了した後にlogging_endステップでshared変数の中身をログに書き出します。

上記のコードをデプロイ&実行してみます。

$ gcloud workflows deploy test-parallel_for --source=parallel_for.yml --service-account ${SERVICE_ACCOUNT} --location ${DEFAULT_REGION}
$ gcloud workflows run test-parallel_for --location asia-northeast1

WorkflowsのコンソールでソースからデプロイしたWorkflowを可視化したものを確認してみます。

実行ログを確認すると意図した通りtestsの中身をループ処理して子ステップ内でステップ名の書き出しとステップ名をshared変数に追記し、すべての子ステップが完了した後にlogging_endステップでshared変数の中身をログに書き出しています。

注意点として並列ステップなので必ずしもtestsに記述した順序で実行されることは無いという点です。実行順序を気にする場合は並列ステップではなく逐次にステップを記述する必要があります。

2023-04-20 14:36:07.551 JST
 {@type: type.googleapis.com/google.cloud.workflows.type.ExecutionsSystemLog, activityTime: 2023-04-20T05:36:07Z, start: {…}, state: ACTIVE}
2023-04-20 14:36:07.733 JST
 logging_7 str_val:str_val, int_val:999
2023-04-20 14:36:07.767 JST
 logging_1 str_val:str_val, int_val:999
2023-04-20 14:36:07.800 JST
 logging_2 str_val:str_val, int_val:999
2023-04-20 14:36:07.861 JST
 logging_3 str_val:str_val, int_val:999
2023-04-20 14:36:07.887 JST
 logging_4 str_val:str_val, int_val:999
2023-04-20 14:36:07.979 JST
 logging_9 str_val:str_val, int_val:999
2023-04-20 14:36:08.008 JST
 logging_5 str_val:str_val, int_val:999
2023-04-20 14:36:08.391 JST
 ["logging_8","logging_2","logging_1","logging_7","logging_9","logging_5","logging_6","logging_4","logging_3"]
2023-04-20 14:36:08.560 JST
 {@type: type.googleapis.com/google.cloud.workflows.type.ExecutionsSystemLog, activityTime: 2023-04-20T05:36:08Z, state: SUCCEEDED, success: {…}}

並列ブランチ

次に並列ブランチを使ってみたいと思います。コードは以下になります。

parallel_branch.yml

main:
  steps:
    - assign_value:
        assign:
          - str_val: "str_val"
          - int_val: 999
          - ret_val: []
    - loggings:
        parallel:
          shared: [ ret_val ]
          branches:
            - logging_1_branch:
                steps:
                  - logging_1:
                      call: sys.log
                      args:
                        text: ${"logging_1" + " " + "str_val:" + str_val + ", int_val:" + int_val }
                        severity: INFO
                  - logging_1_sleep:
                      call: sys.sleep
                      args:
                        seconds: 10
                  - logging_1_2:
                      call: sys.log
                      args:
                        text: ${"logging_1_2" + " " + "str_val:" + str_val + ", int_val:" + int_val }
                        severity: INFO
                  - logging_1_append:
                      assign:
                        - ret_val: ${list.concat(ret_val,"logging_1")}
            - logging_2_branch:
                steps:
                  - logging_2:
                      call: sys.log
                      args:
                        text: ${"logging_2" + " " + "str_val:" + str_val + ", int_val:" + int_val }
                        severity: INFO
                  - logging_2_append:
                      assign:
                        - ret_val: ${list.concat(ret_val,"logging_2")}
            - logging_3_branch:
                steps:
                  - logging_3:
                      call: sys.log
                      args:
                        text: ${"logging_3" + " " + "str_val:" + str_val + ", int_val:" + int_val }
                        severity: INFO
                  - logging_3_append:
                      assign:
                        - ret_val: ${list.concat(ret_val,"logging_3")}
            - logging_4_branch:
                steps:
                  - logging_4:
                      call: sys.log
                      args:
                        text: ${"logging_4" + " " + "str_val:" + str_val + ", int_val:" + int_val }
                        severity: INFO
                  - logging_4_append:
                      assign:
                        - ret_val: ${list.concat(ret_val,"logging_4")}
    - logging_end:
        call: sys.log
        args:
          text: ${ret_val}
          severity: INFO

行っていることは、loggingsを並列ステップとしてその配下で4つのステップを並列実行します。子ステップではステップ名の書き出しとステップ名を並列ステップのshared変数に追記します。すべての子ステップが完了した後にlogging_endステップでshared変数の中身をログに書き出します。

またlogging_1_branchでは途中でSleep処理を入れて10秒待機することで他のステップと処理を変えています。

上記のコードをデプロイ&実行してみます。

$ gcloud workflows deploy test-parallel_branch --source=parallel_branch.yml --service-account ${SERVICE_ACCOUNT} --location ${DEFAULT_REGION}
$ gcloud workflows run test-parallel_branch --location asia-northeast1

WorkflowsのコンソールでソースからデプロイしたWorkflowを可視化したものを確認してみます。

logging_1_branchステップのみ内容の異なった並列処理になっていることがわかります。

実行ログを確認すると意図した通りlogging_1_branchステップでは最初のログを書き出した後に10秒待機してから次のログを書き出し、すべての子ステップの完了を待ってからshared変数の中身をログに書き出されていることがわかります。

こちらも並列ループと同様に必ずしも子ステップが記述順に実行されるわけではないということがわかります。

2023-04-20 14:53:25.844 JST
 {@type: type.googleapis.com/google.cloud.workflows.type.ExecutionsSystemLog, activityTime: 2023-04-20T05:53:25Z, start: {…}, state: ACTIVE}
2023-04-20 14:53:26.018 JST
 logging_4 str_val:str_val, int_val:999
2023-04-20 14:53:26.062 JST
 logging_2 str_val:str_val, int_val:999
2023-04-20 14:53:26.073 JST
 logging_1 str_val:str_val, int_val:999
2023-04-20 14:53:26.120 JST
 logging_3 str_val:str_val, int_val:999
2023-04-20 14:53:36.493 JST
 logging_1_2 str_val:str_val, int_val:999
2023-04-20 14:53:36.792 JST
 ["logging_4","logging_2","logging_3","logging_1"]
2023-04-20 14:53:36.966 JST
 {@type: type.googleapis.com/google.cloud.workflows.type.ExecutionsSystemLog, activityTime: 2023-04-20T05:53:36Z, state: SUCCEEDED, success: {…}}

まとめ

GoogleCloudのWorkflowsで並列でStepを実行できるParallel Stepsを使ってみました。並列処理が行える処理はParallel Stepsを使うことで処理時間を短縮できるので使わない手はないと思います。ただし実行数、深度、メモリ等の上限があります(割り当てと上限  |  ワークフロー  |  Google Cloud )のでその点は注意して使いましょう。

最後まで読んで頂いてありがとうございました。