Workflows で並列実行ステップのエラーをキャッチした際の挙動を確認してみた #cm_google_cloud_adcal_2024

Workflows で並列実行ステップのエラーをキャッチした際の挙動を確認してみた #cm_google_cloud_adcal_2024

Clock Icon2024.12.10

こんにちは!エノカワです。

この記事はクラスメソッドの Google Cloud アドベントカレンダー2024 の 10日目の記事です。

今回は、Workflows で並列実行ステップのエラーをキャッチした際の挙動を確認してみましたので、その内容をご紹介します。

Workflows の並列処理と例外処理

Workflows は、複数のタスクを順序立てて実行したり、並列に実行したりすることができるサービスです。
並列処理には parallel ステップを使用し、複数のタスクを同時に処理できます。
また、エラーが発生した際には、try ブロックと except ブロックを使用してエラーハンドリングを行うことが可能です。

試したこと

Workflows の並列処理で以下のような疑問がありました。

  • 並列実行中に一部のステップでエラーが発生した場合、他のステップやワークフロー全体にどのような影響があるのか?
  • ステップやワークフロー全体に try ~ except を配置した場合、エラーハンドリングの挙動はどう変わるのか?

これらの疑問を解決するために、以下の3つのケースに分けて検証を行いました。

  1. ステップが try ~ except で囲まれている場合
  2. ステップが try ~ except で囲まれていない場合
  3. ステップが try ~ except で囲まれており、例外を再スローする場合

1. ステップが try ~ except で囲まれている場合

以下は、並列処理内で個々のステップがエラーをキャッチするコードです。

main:
  steps:
    - parallel_step:
        parallel:
          for:
            value: i
            range: [0, 9]  # 0から9の範囲で並列処理を実行
            steps:
              - try_step:
                  try:
                    steps:
                      - log_start:
                          call: sys.log
                          args:
                            text: '${"Processing item: " + i}'
                            severity: "INFO"
                      - http_get:
                          call: http.get
                          args:
                            url: ${"https://jsonplaceholder.typicode.com/posts/" + i}  # JSONPlaceholder API にリクエストを送信
                          result: response
                      - log_success:
                          call: sys.log
                          args:
                            text: '${"Success for item: " + i}'
                            severity: "INFO"
                  except:
                    as: error
                    steps:
                      - log_error:
                          call: sys.log
                          args:
                            text: '${"Error occurred for item: " + i + ". Error message: " + error.message}'
                            severity: "ERROR"

以下は、実行時に出力されたログです。

2024-12-09 16:40:59.534 JST {@type: type.googleapis.com/google.cloud.workflows.type.ExecutionsSystemLog, activityTime: 2024-12-09T07:40:59Z, start: {}, state: ACTIVE}
2024-12-09 16:40:59.651 JST Processing item: 7
2024-12-09 16:40:59.680 JST Processing item: 3
2024-12-09 16:40:59.697 JST Processing item: 6
2024-12-09 16:40:59.721 JST Processing item: 0
2024-12-09 16:40:59.744 JST Processing item: 8
2024-12-09 16:40:59.771 JST Processing item: 5
2024-12-09 16:40:59.804 JST Processing item: 2
2024-12-09 16:40:59.831 JST Processing item: 1
2024-12-09 16:40:59.854 JST Processing item: 9
2024-12-09 16:40:59.883 JST Processing item: 4
2024-12-09 16:41:00.244 JST Success for item: 8
2024-12-09 16:41:00.428 JST Success for item: 9
2024-12-09 16:41:00.500 JST Success for item: 4
2024-12-09 16:41:00.527 JST Success for item: 7
2024-12-09 16:41:00.558 JST Success for item: 3
2024-12-09 16:41:00.613 JST Success for item: 6
2024-12-09 16:41:00.639 JST Error occurred for item: 0. Error message: HTTP server responded with error code 404
2024-12-09 16:41:00.730 JST Success for item: 2
2024-12-09 16:41:00.771 JST Success for item: 5
2024-12-09 16:41:00.806 JST Success for item: 1
2024-12-09 16:41:01.025 JST {@type: type.googleapis.com/google.cloud.workflows.type.ExecutionsSystemLog, activityTime: 2024-12-09T07:41:01Z, state: SUCCEEDED, success: {}}

挙動:

  • ログを見ると、Processing item: X が順不同で出力されており、並列ステップが非同期で実行されていることが確認できます。
  • item: 0 のリクエストが失敗し、エラーログが記録されています。
  • エラーが発生したステップのみが except ブロックで処理され、他のステップには影響がありません。

2. ステップが try ~ except で囲まれていない場合

以下のコードでは、並列処理内でエラーはキャッチせず、ワークフロー全体でエラーをキャッチするコードです。

  • 各ステップでエラーをキャッチせず、try ~ except はワークフロー全体でのみ使用しています。
main:
  steps:
    - main_workflow:
        try:
          steps:
            - parallel_step:
                parallel:
                  for:
                    value: i
                    range: [0, 9]  # 0から9の範囲で並列処理を実行
                    steps:
                      - main_step:
                          steps:
                            - log_start:
                                call: sys.log
                                args:
                                  text: '${"Processing item: " + i}'
                                  severity: "INFO"
                            - http_get:
                                call: http.get
                                args:
                                  url: ${"https://jsonplaceholder.typicode.com/posts/" + i}  # JSONPlaceholder API にリクエストを送信
                                result: response
                            - log_success:
                                call: sys.log
                                args:
                                  text: '${"Success for item: " + i}'
                                  severity: "INFO"
        except:
          as: global_error
          steps:
            - global_log_error:
                call: sys.log
                args:
                  text: '${"Error occurred during workflow execution. Error message: " + global_error.message}'
                  severity: "ERROR"

以下は、実行時に出力されたログです。

2024-12-09 16:49:03.184 JST {@type: type.googleapis.com/google.cloud.workflows.type.ExecutionsSystemLog, activityTime: 2024-12-09T07:49:03Z, start: {}, state: ACTIVE}
2024-12-09 16:49:03.324 JST Processing item: 3
2024-12-09 16:49:03.339 JST Processing item: 8
2024-12-09 16:49:03.371 JST Processing item: 4
2024-12-09 16:49:03.402 JST Processing item: 2
2024-12-09 16:49:03.440 JST Processing item: 9
2024-12-09 16:49:03.474 JST Processing item: 7
2024-12-09 16:49:03.511 JST Processing item: 0
2024-12-09 16:49:03.530 JST Processing item: 5
2024-12-09 16:49:03.556 JST Processing item: 6
2024-12-09 16:49:03.587 JST Processing item: 1
2024-12-09 16:49:03.807 JST Success for item: 2
2024-12-09 16:49:03.851 JST Success for item: 9
2024-12-09 16:49:04.046 JST Success for item: 8
2024-12-09 16:49:04.106 JST Success for item: 3
2024-12-09 16:49:04.151 JST Success for item: 4
2024-12-09 16:49:04.282 JST Success for item: 7
2024-12-09 16:49:04.377 JST Success for item: 5
2024-12-09 16:49:04.439 JST Success for item: 6
2024-12-09 16:49:04.631 JST Success for item: 1
2024-12-09 16:49:04.933 JST Error occurred during workflow execution. Error message: UnhandledBranchError: One or more branches or iterations encountered an unhandled runtime error
2024-12-09 16:49:05.030 JST {@type: type.googleapis.com/google.cloud.workflows.type.ExecutionsSystemLog, activityTime: 2024-12-09T07:49:05Z, state: SUCCEEDED, success: {}}

挙動:

  • 並列処理内で item: 0 のリクエストが失敗しましたが、他のステップには影響を与えず、正常に処理が続行されました。
  • 並列処理がすべて終了した後、例外処理が実行され、ワークフロー全体の except ブロックでエラーが処理されました。

3. ステップが try ~ except で囲まれており、例外を再スローする場合

以下は、ステップ内でエラーをキャッチした後、例外を再スローするコードです。
また、ワークフロー全体でエラーをキャッチしています。

main:
  steps:
    - main_workflow:
        try:
          steps:
            - parallel_step:
                parallel:
                  for:
                    value: i
                    range: [0, 9]  # 0から9の範囲で並列処理を実行
                    steps:
                      - try_step:
                          try:
                            steps:
                              - log_start:
                                  call: sys.log
                                  args:
                                    text: '${"Processing item: " + i}'
                                    severity: "INFO"
                              - http_get:
                                  call: http.get
                                  args:
                                    url: ${"https://jsonplaceholder.typicode.com/posts/" + i}  # JSONPlaceholder API にリクエストを送信
                                  result: response
                              - log_success:
                                  call: sys.log
                                  args:
                                    text: '${"Success for item: " + i}'
                                    severity: "INFO"
                          except:
                            as: error
                            steps:
                              - log_error:
                                  call: sys.log
                                  args:
                                    text: '${"Error occurred for item: " + i + ". Error message: " + error.message}'
                                    severity: "ERROR"
                              - raise_error:
                                  raise: ${error}  # 例外を再スロー
        except:
          as: global_error
          steps:
            - global_log_error:
                call: sys.log
                args:
                  text: '${"Error occurred during workflow execution. Error message: " + global_error.message}'
                  severity: "ERROR"

以下は、実行時に出力されたログです。

2024-12-09 17:03:25.607 JST {@type: type.googleapis.com/google.cloud.workflows.type.ExecutionsSystemLog, activityTime: 2024-12-09T08:03:25Z, start: {}, state: ACTIVE}
2024-12-09 17:03:25.726 JST Processing item: 9
2024-12-09 17:03:25.746 JST Processing item: 0
2024-12-09 17:03:25.764 JST Processing item: 2
2024-12-09 17:03:25.785 JST Processing item: 4
2024-12-09 17:03:25.812 JST Processing item: 7
2024-12-09 17:03:25.829 JST Processing item: 6
2024-12-09 17:03:25.848 JST Processing item: 3
2024-12-09 17:03:25.878 JST Processing item: 8
2024-12-09 17:03:25.898 JST Processing item: 1
2024-12-09 17:03:25.920 JST Processing item: 5
2024-12-09 17:03:26.221 JST Success for item: 3
2024-12-09 17:03:26.292 JST Error occurred for item: 0. Error message: HTTP server responded with error code 404
2024-12-09 17:03:26.329 JST Success for item: 9
2024-12-09 17:03:26.563 JST Success for item: 5
2024-12-09 17:03:26.597 JST Success for item: 4
2024-12-09 17:03:26.628 JST Success for item: 2
2024-12-09 17:03:26.659 JST Success for item: 6
2024-12-09 17:03:26.722 JST Success for item: 7
2024-12-09 17:03:26.812 JST Success for item: 1
2024-12-09 17:03:26.841 JST Success for item: 8
2024-12-09 17:03:27.094 JST Error occurred during workflow execution. Error message: UnhandledBranchError: One or more branches or iterations encountered an unhandled runtime error
2024-12-09 17:03:27.245 JST {@type: type.googleapis.com/google.cloud.workflows.type.ExecutionsSystemLog, activityTime: 2024-12-09T08:03:27Z, state: SUCCEEDED, success: {}}

挙動:

  • ステップ内でエラーがキャッチされ、エラーログが記録されました。
  • その後、raise によって例外が再スローされ、ワークフロー全体の try ~ except ブロックでキャッチされました。
  • 並列処理内で失敗したステップがあっても、他のステップには影響がなく正常に実行されました。

まとめ

以上、Workflows で並列実行ステップのエラーをキャッチした際の挙動を確認しました。

今回の検証を通じて、Workflows の並列処理と例外処理の挙動について以下のことが分かりました。

  • ステップごとに try ~ except を使用することで、個別のエラーハンドリングが可能
  • ワークフロー全体でエラーをキャッチすることで、統一的なエラーハンドリングが実現できる
  • 並列処理内で失敗したステップがあっても、他のステップには影響がない

以下に、検証結果を表形式でまとめてみました。

ケース エラーハンドリングの範囲 他のステップへの影響 並列処理後の例外処理
ステップごとに try ~ except を使用 ステップごと 影響なし 実行されない
ステップに try ~ except を使用しない ワークフロー全体 影響なし 実行される
例外を再スローする ステップごと + ワークフロー全体 影響なし 実行される

今回の内容が、同じような疑問を持つ方や、Workflows を活用したワークフロー設計に取り組む方の参考になれば幸いです!

明日 12/11 は hanzawa.yuya さんです。よろしくお願いします!

参考

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.