Workflows で並列実行ステップのエラーをキャッチした際の挙動を確認してみた #cm_google_cloud_adcal_2024
こんにちは!エノカワです。
この記事はクラスメソッドの Google Cloud アドベントカレンダー2024 の 10日目の記事です。
今回は、Workflows で並列実行ステップのエラーをキャッチした際の挙動を確認してみましたので、その内容をご紹介します。
Workflows の並列処理と例外処理
Workflows は、複数のタスクを順序立てて実行したり、並列に実行したりすることができるサービスです。
並列処理には parallel
ステップを使用し、複数のタスクを同時に処理できます。
また、エラーが発生した際には、try
ブロックと except
ブロックを使用してエラーハンドリングを行うことが可能です。
試したこと
Workflows の並列処理で以下のような疑問がありました。
- 並列実行中に一部のステップでエラーが発生した場合、他のステップやワークフロー全体にどのような影響があるのか?
- ステップやワークフロー全体に
try ~ except
を配置した場合、エラーハンドリングの挙動はどう変わるのか?
これらの疑問を解決するために、以下の3つのケースに分けて検証を行いました。
- ステップが
try ~ except
で囲まれている場合 - ステップが
try ~ except
で囲まれていない場合 - ステップが
try ~ except
で囲まれており、例外を再スローする場合
try ~ except
で囲まれている場合
1. ステップが 以下は、並列処理内で個々のステップがエラーをキャッチするコードです。
main:
steps:
- parallel_step:
parallel:
for:
value: i
range: [0, 9] # 0から9の範囲で並列処理を実行
steps:
- try_step:
try:
steps:
- log_start:
call: sys.log
args:
text: '${"Processing item: " + i}'
severity: "INFO"
- http_get:
call: http.get
args:
url: ${"https://jsonplaceholder.typicode.com/posts/" + i} # JSONPlaceholder API にリクエストを送信
result: response
- log_success:
call: sys.log
args:
text: '${"Success for item: " + i}'
severity: "INFO"
except:
as: error
steps:
- log_error:
call: sys.log
args:
text: '${"Error occurred for item: " + i + ". Error message: " + error.message}'
severity: "ERROR"
以下は、実行時に出力されたログです。
2024-12-09 16:40:59.534 JST {@type: type.googleapis.com/google.cloud.workflows.type.ExecutionsSystemLog, activityTime: 2024-12-09T07:40:59Z, start: {…}, state: ACTIVE}
2024-12-09 16:40:59.651 JST Processing item: 7
2024-12-09 16:40:59.680 JST Processing item: 3
2024-12-09 16:40:59.697 JST Processing item: 6
2024-12-09 16:40:59.721 JST Processing item: 0
2024-12-09 16:40:59.744 JST Processing item: 8
2024-12-09 16:40:59.771 JST Processing item: 5
2024-12-09 16:40:59.804 JST Processing item: 2
2024-12-09 16:40:59.831 JST Processing item: 1
2024-12-09 16:40:59.854 JST Processing item: 9
2024-12-09 16:40:59.883 JST Processing item: 4
2024-12-09 16:41:00.244 JST Success for item: 8
2024-12-09 16:41:00.428 JST Success for item: 9
2024-12-09 16:41:00.500 JST Success for item: 4
2024-12-09 16:41:00.527 JST Success for item: 7
2024-12-09 16:41:00.558 JST Success for item: 3
2024-12-09 16:41:00.613 JST Success for item: 6
2024-12-09 16:41:00.639 JST Error occurred for item: 0. Error message: HTTP server responded with error code 404
2024-12-09 16:41:00.730 JST Success for item: 2
2024-12-09 16:41:00.771 JST Success for item: 5
2024-12-09 16:41:00.806 JST Success for item: 1
2024-12-09 16:41:01.025 JST {@type: type.googleapis.com/google.cloud.workflows.type.ExecutionsSystemLog, activityTime: 2024-12-09T07:41:01Z, state: SUCCEEDED, success: {…}}
挙動:
- ログを見ると、
Processing item: X
が順不同で出力されており、並列ステップが非同期で実行されていることが確認できます。 item: 0
のリクエストが失敗し、エラーログが記録されています。- エラーが発生したステップのみが
except
ブロックで処理され、他のステップには影響がありません。
try ~ except
で囲まれていない場合
2. ステップが 以下のコードでは、並列処理内でエラーはキャッチせず、ワークフロー全体でエラーをキャッチするコードです。
- 各ステップでエラーをキャッチせず、try ~ except はワークフロー全体でのみ使用しています。
main:
steps:
- main_workflow:
try:
steps:
- parallel_step:
parallel:
for:
value: i
range: [0, 9] # 0から9の範囲で並列処理を実行
steps:
- main_step:
steps:
- log_start:
call: sys.log
args:
text: '${"Processing item: " + i}'
severity: "INFO"
- http_get:
call: http.get
args:
url: ${"https://jsonplaceholder.typicode.com/posts/" + i} # JSONPlaceholder API にリクエストを送信
result: response
- log_success:
call: sys.log
args:
text: '${"Success for item: " + i}'
severity: "INFO"
except:
as: global_error
steps:
- global_log_error:
call: sys.log
args:
text: '${"Error occurred during workflow execution. Error message: " + global_error.message}'
severity: "ERROR"
以下は、実行時に出力されたログです。
2024-12-09 16:49:03.184 JST {@type: type.googleapis.com/google.cloud.workflows.type.ExecutionsSystemLog, activityTime: 2024-12-09T07:49:03Z, start: {…}, state: ACTIVE}
2024-12-09 16:49:03.324 JST Processing item: 3
2024-12-09 16:49:03.339 JST Processing item: 8
2024-12-09 16:49:03.371 JST Processing item: 4
2024-12-09 16:49:03.402 JST Processing item: 2
2024-12-09 16:49:03.440 JST Processing item: 9
2024-12-09 16:49:03.474 JST Processing item: 7
2024-12-09 16:49:03.511 JST Processing item: 0
2024-12-09 16:49:03.530 JST Processing item: 5
2024-12-09 16:49:03.556 JST Processing item: 6
2024-12-09 16:49:03.587 JST Processing item: 1
2024-12-09 16:49:03.807 JST Success for item: 2
2024-12-09 16:49:03.851 JST Success for item: 9
2024-12-09 16:49:04.046 JST Success for item: 8
2024-12-09 16:49:04.106 JST Success for item: 3
2024-12-09 16:49:04.151 JST Success for item: 4
2024-12-09 16:49:04.282 JST Success for item: 7
2024-12-09 16:49:04.377 JST Success for item: 5
2024-12-09 16:49:04.439 JST Success for item: 6
2024-12-09 16:49:04.631 JST Success for item: 1
2024-12-09 16:49:04.933 JST Error occurred during workflow execution. Error message: UnhandledBranchError: One or more branches or iterations encountered an unhandled runtime error
2024-12-09 16:49:05.030 JST {@type: type.googleapis.com/google.cloud.workflows.type.ExecutionsSystemLog, activityTime: 2024-12-09T07:49:05Z, state: SUCCEEDED, success: {…}}
挙動:
- 並列処理内で
item: 0
のリクエストが失敗しましたが、他のステップには影響を与えず、正常に処理が続行されました。 - 並列処理がすべて終了した後、例外処理が実行され、ワークフロー全体の
except
ブロックでエラーが処理されました。
try ~ except
で囲まれており、例外を再スローする場合
3. ステップが 以下は、ステップ内でエラーをキャッチした後、例外を再スローするコードです。
また、ワークフロー全体でエラーをキャッチしています。
main:
steps:
- main_workflow:
try:
steps:
- parallel_step:
parallel:
for:
value: i
range: [0, 9] # 0から9の範囲で並列処理を実行
steps:
- try_step:
try:
steps:
- log_start:
call: sys.log
args:
text: '${"Processing item: " + i}'
severity: "INFO"
- http_get:
call: http.get
args:
url: ${"https://jsonplaceholder.typicode.com/posts/" + i} # JSONPlaceholder API にリクエストを送信
result: response
- log_success:
call: sys.log
args:
text: '${"Success for item: " + i}'
severity: "INFO"
except:
as: error
steps:
- log_error:
call: sys.log
args:
text: '${"Error occurred for item: " + i + ". Error message: " + error.message}'
severity: "ERROR"
- raise_error:
raise: ${error} # 例外を再スロー
except:
as: global_error
steps:
- global_log_error:
call: sys.log
args:
text: '${"Error occurred during workflow execution. Error message: " + global_error.message}'
severity: "ERROR"
以下は、実行時に出力されたログです。
2024-12-09 17:03:25.607 JST {@type: type.googleapis.com/google.cloud.workflows.type.ExecutionsSystemLog, activityTime: 2024-12-09T08:03:25Z, start: {…}, state: ACTIVE}
2024-12-09 17:03:25.726 JST Processing item: 9
2024-12-09 17:03:25.746 JST Processing item: 0
2024-12-09 17:03:25.764 JST Processing item: 2
2024-12-09 17:03:25.785 JST Processing item: 4
2024-12-09 17:03:25.812 JST Processing item: 7
2024-12-09 17:03:25.829 JST Processing item: 6
2024-12-09 17:03:25.848 JST Processing item: 3
2024-12-09 17:03:25.878 JST Processing item: 8
2024-12-09 17:03:25.898 JST Processing item: 1
2024-12-09 17:03:25.920 JST Processing item: 5
2024-12-09 17:03:26.221 JST Success for item: 3
2024-12-09 17:03:26.292 JST Error occurred for item: 0. Error message: HTTP server responded with error code 404
2024-12-09 17:03:26.329 JST Success for item: 9
2024-12-09 17:03:26.563 JST Success for item: 5
2024-12-09 17:03:26.597 JST Success for item: 4
2024-12-09 17:03:26.628 JST Success for item: 2
2024-12-09 17:03:26.659 JST Success for item: 6
2024-12-09 17:03:26.722 JST Success for item: 7
2024-12-09 17:03:26.812 JST Success for item: 1
2024-12-09 17:03:26.841 JST Success for item: 8
2024-12-09 17:03:27.094 JST Error occurred during workflow execution. Error message: UnhandledBranchError: One or more branches or iterations encountered an unhandled runtime error
2024-12-09 17:03:27.245 JST {@type: type.googleapis.com/google.cloud.workflows.type.ExecutionsSystemLog, activityTime: 2024-12-09T08:03:27Z, state: SUCCEEDED, success: {…}}
挙動:
- ステップ内でエラーがキャッチされ、エラーログが記録されました。
- その後、
raise
によって例外が再スローされ、ワークフロー全体のtry ~ except
ブロックでキャッチされました。 - 並列処理内で失敗したステップがあっても、他のステップには影響がなく正常に実行されました。
まとめ
以上、Workflows で並列実行ステップのエラーをキャッチした際の挙動を確認しました。
今回の検証を通じて、Workflows の並列処理と例外処理の挙動について以下のことが分かりました。
- ステップごとに
try ~ except
を使用することで、個別のエラーハンドリングが可能 - ワークフロー全体でエラーをキャッチすることで、統一的なエラーハンドリングが実現できる
- 並列処理内で失敗したステップがあっても、他のステップには影響がない
以下に、検証結果を表形式でまとめてみました。
ケース | エラーハンドリングの範囲 | 他のステップへの影響 | 並列処理後の例外処理 |
---|---|---|---|
ステップごとに try ~ except を使用 |
ステップごと | 影響なし | 実行されない |
ステップに try ~ except を使用しない |
ワークフロー全体 | 影響なし | 実行される |
例外を再スローする | ステップごと + ワークフロー全体 | 影響なし | 実行される |
今回の内容が、同じような疑問を持つ方や、Workflows を活用したワークフロー設計に取り組む方の参考になれば幸いです!
明日 12/11 は hanzawa.yuya さんです。よろしくお願いします!