WorkflowsでWorkflowsコネクタを用いて同じワークフローが多重起動できないようにする方法を考えてみた

WorkflowsでWorkflowsコネクタを用いて同じワークフローが多重起動できないようにする方法を考えてみた

Clock Icon2024.12.14

概要

Workflowsでは、同じワークフローを複数同時実行(=多重起動)することができます。大規模な処理をする場合には便利でありつつも、
データパイプライン構築などでWorkflowsを用いている場合は同じワークフローを
多重起動することを避けたい場合があると思います。多重起動を防止するためにはワークフローの実行状態をFirestoreなど何らかのDBに記録しておく方法がよくあると思いますが、
ワークロードによっては使用するリソースを増やしたくない時もあると思います。そこで今回はWorkflowsのWorkflowsコネクタを用いてワークフローの多重起動する方法を考えて試してみました。

やってみる

考えてみた結論としては、Workflowsのlistコネクタを用いることで多重起動を防止することが可能です。が、注意点もあります。

https://cloud.google.com/workflows/docs/reference/googleapis/workflowexecutions/v1/projects.locations.workflows.executions/list

listコネクタの仕様

listコネクタは引数に指定した名前のワークフローの実行履歴を返却します。また返却される値は実行時間の降順(新しい順)となっています。
以下が代表的な引数です。

引数 説明
parent 必須。projects/プロジェクトID/locations/リージョン/workflows/ワークフロー名の形式で指定する
pageSize 返却される実行履歴の最大数。pageSizeを設定しなかった場合は100。viewに設定した値で設定できる最大値は異なる
pageToken ページネーションを行う場合はpageTokenを用いる
view 返却されるフィールド値を制御する引数。EXECUTION_VIEW_UNSPECIFIEDBASICFULLから選択。省略した場合はBASICがデフォルト値。BASICの場合はnamestart_timeend_timestateworkflow_revision_idが返却される。

代表的なレスポンスパラメータを以下に示します。

パラメータ 説明
name 実行履歴名。projects/プロジェクトID/locations/リージョン/workflows/ワークフロー名/executions/ワークフロー実行IDの形式。ワークフロー実行IDとほぼ同義
state 実行状態。今回の肝なので返却値のパターンは別表
startTime ワークフロー実行開始時間
workflowRevisionId 実行されたワークフローのリビジョン

上表の実行状態(state)が返却する値は以下です。

state 説明
STATE_UNSPECIFIED 無効な状態
ACTIVE ワークフロー実行中
SUCCEEDED ワークフローが実行に成功して終了
FAILED ワークフローの実行に失敗
CANCELLED ワークフローの実行がキャンセル

このコネクタを用いることで多重起動を防止することができます。

どうやって防止するのか

listコネクタで取得した実行履歴の中で【stateACTIVEになっていて、かつ実行履歴名nameが自身のワークフローのnameではない場合】
そのワークフローはすでに実行されているということになります。
例えば、同一ワークフローが2つ起動されている場合、以下のイメージの実行履歴が取得できます。

name state
projects/プロジェクトID/locations/リージョン/workflows/ワークフロー名/executions/ABC ACTIVE
projects/プロジェクトID/locations/リージョン/workflows/ワークフロー名/executions/XYZ ACTIVE

実際にコードを見たほうが理解も早いと思うので見てみましょう。

コードを見てみましょう

とりあえず全文です。現在実行中のワークフローの中で、同じワークフローがすでにACTIVEの状態で実行されているかを確認し、もしそうであればエラーを発生させるロジックを実装しています。

- init:
    assign:
        - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
        - project_num: ${sys.get_env("GOOGLE_CLOUD_PROJECT_NUMBER")}
        - workflow_exec_id: ${sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
        - workflow_name: ${sys.get_env("GOOGLE_CLOUD_WORKFLOW_ID")}
        - workflow_location: ${sys.get_env("GOOGLE_CLOUD_LOCATION")}
- readWorkflowsExecList:
    call: googleapis.workflowexecutions.v1.projects.locations.workflows.executions.list
    args:
        parent: ${"projects/" + project_id + "/locations/" + workflow_location + "/workflows/" + workflow_name}
        pageSize: 10
    result: listResult
- loopStep:
    for:
        in: ${listResult.executions}
        value: s
        steps:
            - checkActiveStep:
                switch:
                    - condition: ${s.state == "ACTIVE"  AND s.name != "projects/" + project_num + "/locations/" + workflow_location + "/workflows/" + workflow_name +"/executions/" + workflow_exec_id}
                      steps:
                        - errorStep:
                            raise: "Already running."
- sleepStep:
    call: sys.sleep
    args:
        seconds: 30
- endStep:
    return: "workflow done"

全体のフロー

  1. 組み込み環境変数を初期化して、ワークフローの実行に必要な情報を取得
  2. 現在のワークフローの実行履歴を取得(最新10件)
  3. 実行履歴をループ処理し、現在ACTIVEなワークフロー実行が存在するか確認
    • もしACTIVEな実行が見つかった場合、エラーを発生させて終了
  4. ACTIVEな実行がない場合、30秒スリープして終了

30秒スリープするのは、多重起動を防止できているか確認するためです。(スリープしている間に再度同じワークフローを実行します)

以下に、各ステップの詳細とその目的を解説します。


  1. init ステップ
  • 目的: Workflowsの組み込み環境変数を取得し初期化
  • 処理内容:
    • project_id: Google Cloud プロジェクトの ID を取得
    • project_num: Google Cloud プロジェクトの番号を取得
    • workflow_exec_id: 現在のワークフロー実行IDを取得
    • workflow_name: 現在実行しているワークフローの名前を取得
    • workflow_location: ワークフローが実行されているリージョンを取得
  1. readWorkflowsExecList ステップ
  • 目的: 現在のワークフローの実行履歴を取得
  • 処理内容:
    • listコネクタを使用して、現在実行しているワークフローの実行履歴を取得
    • 最大10件の実行履歴を取得するように pageSizeを指定
    • 結果はlistResult 変数に格納
  1. loopStep ステップ
  • 目的: 取得した実行リストをループ処理し、現在ACTIVEな実行が存在するか確認
  • 処理内容:
    • listResult.executions に含まれる各実行(s)をループで処理
    • 以下の条件で多重起動かどうか判定
      1. 実行の状態(s.state)が "ACTIVE"
      2. nameが現在のワークフローのname(実行I workflow_exec_id)と異なる
    • 条件が一致する場合(=多重起動)、errorStepへ進む
  1. checkActiveStep ステップ
  • 目的: 多重起動の場合例外を発生させてワークフローを終了
  • 処理内容:
    • 条件(s.state == "ACTIVE" かつ ACTIVEなワークフローの実行IDが現在の実行IDと異なる場合)が true の場合に errorStep を実行
    • errorStep 内では、エラーを発生させてワークフローを終了
  1. sleepStep ステップ
  • 目的: 多重起動を防止できているかテストするためにスリープ
  • 処理内容:
    • ワークフローを 30 秒間一時停止
  1. endStep ステップ
  • 目的: ワークフローを正常終了
  • 処理内容:
    • workflow doneを返却

試してみる

多重起動が防止できるか実際に動作させてみて確認します。確認方法としては以下となります。

  1. ワークフローを実行する
  2. 30秒以内に同一のワークフローを再実行

(2.)で起動したワークフローが失敗すれば多重起動が防止できていると考えられます。

というわけで実行してみました。以下が実行結果です。

スクリーンショット 2024-12-14 0.24.55

失敗になっている実行は後から実行したワークフロー(2.のもの)です。

ワークフロー実行ID 開始時間 終了時間
42dbedd8-880b-454b-bf6b-9839f0f64768 後続実行ワークフロー 2024/12/14 0:22:06 2024/12/14 0:22:07
5e2e58b1-8530-49d3-8646-090328c0c247 先行実行ワークフロー 2024/12/14 0:22:02 2024/12/14 0:22:35

先行実行ワークフローは30秒程度実行され成功しているのに対して、後続実行ワークフローは1秒程度しか実行されてされておらずまた失敗しております。
後続実行ワークフローの実行結果詳細を見てみます。
スクリーンショット 2024-12-14 0.30.41

RuntimeError: "Already running."と出力されていますね。Already runningは多重起動の判定を行った結果Trueになると実行されるステップで返却しているメッセージです。
このことから多重起動の判定が正しく行われたことが確認できました。

注意点

ほぼ同時(コンマ何秒の差など)にワークフローを実行した場合、正しく多重起動を防止できない可能性があります。
1つのワークフローに1分間隔で実行するスケジューラを2つ設定して動作確認してみましたが、正しく多重起動を防止できないケースを確認しました。
正しく多重起動防止できなかった原因としては、ほぼ同時に多重起動チェックを行ったためにどちらも多重起動判定がTrueになってしまったためです。
スクリーンショット 2024-12-14 0.47.50

以上よりほぼ同時の実行の場合に正しく多重起動を判定するには、startTimeを用いてどちらが先行処理か判定する、ランダムなスリープ処理を入れる、Firestoreなど外部リソースにワークフロー実行状態や開始時刻を記録して複数実行中の場合は先行ワークフローを時刻を元に判定して実行するなど
Workflowsのコネクタを用いただけの多重起動対策はシビアな多重起動タイミングの場合には何かしらの工夫がいると考えます。

所感

Workflowsのコネクタだけで多重起動対策ができるのは、とてもありがたいことだと考えます。例えば人がWorkflowsのコンソールから実行ボタンを押して実行するワークロードであれば、今回の実装でも十分多重起動対策になると考えます。
一方で、pub/subや並列処理しているFunctionsからWorkflowsを起動する場合などではほぼ同時にワークフローが実行される可能性があるため意図した動作をしない可能性があると考えられます。
こちらは十分注意する必要がある点と考えます。

今度はほぼ同時の実行タイミングの多重起動対策について考えてみたいと思います。
それではまた。ナマステー

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.