はじめに
データアナリティクス事業本部のkobayashiです。
GoogleCloudのWorkflowsで並列でStepを実行できるParallel Stepsを使ってみたのでまとめます。
Workflows release notes | Google Cloud
Parallel Stepsとは
ワークフロー中で一つの処理をStepとして実行しますが、Parallel Stepsはその名の通りStepを同時実行するステップになります。 並列ステップを使う場面としては複数の独立したAPIへリクエストを送りそのレスポンスを取得したいがレスポンスまでの時間が長くかかかる場合において、逐次処理に比べ並列ステップを使い複数のAPIへリクエストを同時に行うことで全体の処理時間を圧縮できるため重宝します。
Parallel steps | Workflows | Google Cloud
記述の仕方としては以下のようになります。
並列ループ
- {ステップ名}:
parallel:
# exception_policy: continueAll
shared: [ ret_val ]
# concurrency_limit: 10
for:
value: test
in: ${tests}
並列ブランチ
- {ステップ名}:
parallel:
# exception_policy: continueAll
shared: [ ret_val ]
# concurrency_limit: 10
branches:
- branch_1:
...
- branch_2:
...
並列ループと並列ブランチを使うパターンで処理を行うステップの記述が異なりますがそれ以外の属性は同じです。
各属性はexception_policy
は今日(2023/04/20)現在continueAll
しか設定がないので設定する必要はありません。またshared
は並列ステップ内で書き込み可能な変数を指定します。concurrency_limit
は同時に実行できる並列ステップ数になります。
実際の処理を行うステップは列ループの場合はfor
の配下に共通のステップを記述します。並列ブランチの場合はbranches
配下に子ステップを続けて記述していきます。使い分けですが、並列ループはある程度共通化した処理を行う場合に使います。具体的にはAPIに対してリクエストを行うがそのパラメータが違う場合などに使用できるかと思います。一方、並列ブランチは子ステップの処理が異なる場合に使います。
Parallel Stepsを使ってみる
では実際にParallel Stepsを使ってみたいと思います。今回はAPIへリクエストを送るのではなく、単純にsys.log
でログを書き出してみます。
並列ループ
はじめに並列ループを使ってみたいと思います。コードは以下になります。
parallel_for.yml
main:
steps:
- assign_value:
assign:
- str_val: "str_val"
- int_val: 999
- tests: [ "logging_1","logging_2","logging_3","logging_4","logging_5","logging_6","logging_7","logging_8","logging_9" ]
- ret_val: [ ]
- loggings:
parallel:
shared: [ ret_val ]
for:
value: test
in: ${tests}
steps:
- logging_1:
call: sys.log
args:
text: ${test + " " + "str_val:" + str_val + ", int_val:" + int_val }
severity: INFO
- append:
assign:
- ret_val: ${list.concat(ret_val,test)}
- logging_end:
call: sys.log
args:
text: ${ret_val}
severity: INFO
行っていることは、loggings
を並列ステップとしてその配下で並列ループでtests
の中身をループ処理します。子ステップではステップ名の書き出しとステップ名を並列ステップのshared変数に追記します。すべての子ステップが完了した後にlogging_end
ステップでshared変数の中身をログに書き出します。
上記のコードをデプロイ&実行してみます。
$ gcloud workflows deploy test-parallel_for --source=parallel_for.yml --service-account ${SERVICE_ACCOUNT} --location ${DEFAULT_REGION}
$ gcloud workflows run test-parallel_for --location asia-northeast1
Workflowsのコンソールでソース
からデプロイしたWorkflowを可視化したものを確認してみます。
実行ログを確認すると意図した通りtests
の中身をループ処理して子ステップ内でステップ名の書き出しとステップ名をshared変数に追記し、すべての子ステップが完了した後にlogging_end
ステップでshared変数の中身をログに書き出しています。
注意点として並列ステップなので必ずしもtests
に記述した順序で実行されることは無いという点です。実行順序を気にする場合は並列ステップではなく逐次にステップを記述する必要があります。
2023-04-20 14:36:07.551 JST
{@type: type.googleapis.com/google.cloud.workflows.type.ExecutionsSystemLog, activityTime: 2023-04-20T05:36:07Z, start: {…}, state: ACTIVE}
2023-04-20 14:36:07.733 JST
logging_7 str_val:str_val, int_val:999
2023-04-20 14:36:07.767 JST
logging_1 str_val:str_val, int_val:999
2023-04-20 14:36:07.800 JST
logging_2 str_val:str_val, int_val:999
2023-04-20 14:36:07.861 JST
logging_3 str_val:str_val, int_val:999
2023-04-20 14:36:07.887 JST
logging_4 str_val:str_val, int_val:999
2023-04-20 14:36:07.979 JST
logging_9 str_val:str_val, int_val:999
2023-04-20 14:36:08.008 JST
logging_5 str_val:str_val, int_val:999
2023-04-20 14:36:08.391 JST
["logging_8","logging_2","logging_1","logging_7","logging_9","logging_5","logging_6","logging_4","logging_3"]
2023-04-20 14:36:08.560 JST
{@type: type.googleapis.com/google.cloud.workflows.type.ExecutionsSystemLog, activityTime: 2023-04-20T05:36:08Z, state: SUCCEEDED, success: {…}}
並列ブランチ
次に並列ブランチを使ってみたいと思います。コードは以下になります。
parallel_branch.yml
main:
steps:
- assign_value:
assign:
- str_val: "str_val"
- int_val: 999
- ret_val: []
- loggings:
parallel:
shared: [ ret_val ]
branches:
- logging_1_branch:
steps:
- logging_1:
call: sys.log
args:
text: ${"logging_1" + " " + "str_val:" + str_val + ", int_val:" + int_val }
severity: INFO
- logging_1_sleep:
call: sys.sleep
args:
seconds: 10
- logging_1_2:
call: sys.log
args:
text: ${"logging_1_2" + " " + "str_val:" + str_val + ", int_val:" + int_val }
severity: INFO
- logging_1_append:
assign:
- ret_val: ${list.concat(ret_val,"logging_1")}
- logging_2_branch:
steps:
- logging_2:
call: sys.log
args:
text: ${"logging_2" + " " + "str_val:" + str_val + ", int_val:" + int_val }
severity: INFO
- logging_2_append:
assign:
- ret_val: ${list.concat(ret_val,"logging_2")}
- logging_3_branch:
steps:
- logging_3:
call: sys.log
args:
text: ${"logging_3" + " " + "str_val:" + str_val + ", int_val:" + int_val }
severity: INFO
- logging_3_append:
assign:
- ret_val: ${list.concat(ret_val,"logging_3")}
- logging_4_branch:
steps:
- logging_4:
call: sys.log
args:
text: ${"logging_4" + " " + "str_val:" + str_val + ", int_val:" + int_val }
severity: INFO
- logging_4_append:
assign:
- ret_val: ${list.concat(ret_val,"logging_4")}
- logging_end:
call: sys.log
args:
text: ${ret_val}
severity: INFO
行っていることは、loggings
を並列ステップとしてその配下で4つのステップを並列実行します。子ステップではステップ名の書き出しとステップ名を並列ステップのshared変数に追記します。すべての子ステップが完了した後にlogging_end
ステップでshared変数の中身をログに書き出します。
またlogging_1_branch
では途中でSleep処理を入れて10秒待機することで他のステップと処理を変えています。
上記のコードをデプロイ&実行してみます。
$ gcloud workflows deploy test-parallel_branch --source=parallel_branch.yml --service-account ${SERVICE_ACCOUNT} --location ${DEFAULT_REGION}
$ gcloud workflows run test-parallel_branch --location asia-northeast1
Workflowsのコンソールでソース
からデプロイしたWorkflowを可視化したものを確認してみます。
logging_1_branch
ステップのみ内容の異なった並列処理になっていることがわかります。
実行ログを確認すると意図した通りlogging_1_branch
ステップでは最初のログを書き出した後に10秒待機してから次のログを書き出し、すべての子ステップの完了を待ってからshared変数の中身をログに書き出されていることがわかります。
こちらも並列ループと同様に必ずしも子ステップが記述順に実行されるわけではないということがわかります。
2023-04-20 14:53:25.844 JST
{@type: type.googleapis.com/google.cloud.workflows.type.ExecutionsSystemLog, activityTime: 2023-04-20T05:53:25Z, start: {…}, state: ACTIVE}
2023-04-20 14:53:26.018 JST
logging_4 str_val:str_val, int_val:999
2023-04-20 14:53:26.062 JST
logging_2 str_val:str_val, int_val:999
2023-04-20 14:53:26.073 JST
logging_1 str_val:str_val, int_val:999
2023-04-20 14:53:26.120 JST
logging_3 str_val:str_val, int_val:999
2023-04-20 14:53:36.493 JST
logging_1_2 str_val:str_val, int_val:999
2023-04-20 14:53:36.792 JST
["logging_4","logging_2","logging_3","logging_1"]
2023-04-20 14:53:36.966 JST
{@type: type.googleapis.com/google.cloud.workflows.type.ExecutionsSystemLog, activityTime: 2023-04-20T05:53:36Z, state: SUCCEEDED, success: {…}}
まとめ
GoogleCloudのWorkflowsで並列でStepを実行できるParallel Stepsを使ってみました。並列処理が行える処理はParallel Stepsを使うことで処理時間を短縮できるので使わない手はないと思います。ただし実行数、深度、メモリ等の上限があります(割り当てと上限 | ワークフロー | Google Cloud )のでその点は注意して使いましょう。
最後まで読んで頂いてありがとうございました。