WorkflowsでWorkflowsコネクタを用いて同じワークフローが多重起動できないようにする方法を考えてみた
概要
Workflowsでは、同じワークフローを複数同時実行(=多重起動)することができます。大規模な処理をする場合には便利でありつつも、
データパイプライン構築などでWorkflowsを用いている場合は同じワークフローを
多重起動することを避けたい場合があると思います。多重起動を防止するためにはワークフローの実行状態をFirestoreなど何らかのDBに記録しておく方法がよくあると思いますが、
ワークロードによっては使用するリソースを増やしたくない時もあると思います。そこで今回はWorkflowsのWorkflowsコネクタを用いてワークフローの多重起動する方法を考えて試してみました。
やってみる
考えてみた結論としては、Workflowsのlistコネクタを用いることで多重起動を防止することが可能です。が、注意点もあります。
listコネクタの仕様
listコネクタは引数に指定した名前のワークフローの実行履歴を返却します。また返却される値は実行時間の降順(新しい順)となっています。
以下が代表的な引数です。
引数 | 説明 |
---|---|
parent | 必須。projects/プロジェクトID/locations/リージョン/workflows/ワークフロー名 の形式で指定する |
pageSize | 返却される実行履歴の最大数。pageSizeを設定しなかった場合は100。view に設定した値で設定できる最大値は異なる |
pageToken | ページネーションを行う場合はpageTokenを用いる |
view | 返却されるフィールド値を制御する引数。EXECUTION_VIEW_UNSPECIFIED ・BASIC ・FULL から選択。省略した場合はBASIC がデフォルト値。BASIC の場合はname ・start_time 、end_time 、state 、workflow_revision_id が返却される。 |
代表的なレスポンスパラメータを以下に示します。
パラメータ | 説明 |
---|---|
name | 実行履歴名。projects/プロジェクトID/locations/リージョン/workflows/ワークフロー名/executions/ワークフロー実行ID の形式。ワークフロー実行IDとほぼ同義 |
state | 実行状態。今回の肝なので返却値のパターンは別表 |
startTime | ワークフロー実行開始時間 |
workflowRevisionId | 実行されたワークフローのリビジョン |
上表の実行状態(state
)が返却する値は以下です。
state | 説明 |
---|---|
STATE_UNSPECIFIED | 無効な状態 |
ACTIVE | ワークフロー実行中 |
SUCCEEDED | ワークフローが実行に成功して終了 |
FAILED | ワークフローの実行に失敗 |
CANCELLED | ワークフローの実行がキャンセル |
このコネクタを用いることで多重起動を防止することができます。
どうやって防止するのか
listコネクタで取得した実行履歴の中で【state
がACTIVE
になっていて、かつ実行履歴名name
が自身のワークフローのname
ではない場合】
そのワークフローはすでに実行されているということになります。
例えば、同一ワークフローが2つ起動されている場合、以下のイメージの実行履歴が取得できます。
name | state |
---|---|
projects/プロジェクトID/locations/リージョン/workflows/ワークフロー名/executions/ABC |
ACTIVE |
projects/プロジェクトID/locations/リージョン/workflows/ワークフロー名/executions/XYZ |
ACTIVE |
実際にコードを見たほうが理解も早いと思うので見てみましょう。
コードを見てみましょう
とりあえず全文です。現在実行中のワークフローの中で、同じワークフローがすでにACTIVE
の状態で実行されているかを確認し、もしそうであればエラーを発生させるロジックを実装しています。
- init:
assign:
- project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
- project_num: ${sys.get_env("GOOGLE_CLOUD_PROJECT_NUMBER")}
- workflow_exec_id: ${sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
- workflow_name: ${sys.get_env("GOOGLE_CLOUD_WORKFLOW_ID")}
- workflow_location: ${sys.get_env("GOOGLE_CLOUD_LOCATION")}
- readWorkflowsExecList:
call: googleapis.workflowexecutions.v1.projects.locations.workflows.executions.list
args:
parent: ${"projects/" + project_id + "/locations/" + workflow_location + "/workflows/" + workflow_name}
pageSize: 10
result: listResult
- loopStep:
for:
in: ${listResult.executions}
value: s
steps:
- checkActiveStep:
switch:
- condition: ${s.state == "ACTIVE" AND s.name != "projects/" + project_num + "/locations/" + workflow_location + "/workflows/" + workflow_name +"/executions/" + workflow_exec_id}
steps:
- errorStep:
raise: "Already running."
- sleepStep:
call: sys.sleep
args:
seconds: 30
- endStep:
return: "workflow done"
全体のフロー
- 組み込み環境変数を初期化して、ワークフローの実行に必要な情報を取得
- 現在のワークフローの実行履歴を取得(最新10件)
- 実行履歴をループ処理し、現在ACTIVEなワークフロー実行が存在するか確認
- もしACTIVEな実行が見つかった場合、エラーを発生させて終了
- ACTIVEな実行がない場合、30秒スリープして終了
30秒スリープするのは、多重起動を防止できているか確認するためです。(スリープしている間に再度同じワークフローを実行します)
以下に、各ステップの詳細とその目的を解説します。
init
ステップ
- 目的: Workflowsの組み込み環境変数を取得し初期化
- 処理内容:
project_id
: Google Cloud プロジェクトの ID を取得project_num
: Google Cloud プロジェクトの番号を取得workflow_exec_id
: 現在のワークフロー実行IDを取得workflow_name
: 現在実行しているワークフローの名前を取得workflow_location
: ワークフローが実行されているリージョンを取得
readWorkflowsExecList
ステップ
- 目的: 現在のワークフローの実行履歴を取得
- 処理内容:
- listコネクタを使用して、現在実行しているワークフローの実行履歴を取得
- 最大10件の実行履歴を取得するように
pageSize
を指定 - 結果は
listResult
変数に格納
loopStep
ステップ
- 目的: 取得した実行リストをループ処理し、現在ACTIVEな実行が存在するか確認
- 処理内容:
listResult.executions
に含まれる各実行(s
)をループで処理- 以下の条件で多重起動かどうか判定
- 実行の状態(
s.state
)が"ACTIVE"
name
が現在のワークフローのname
(実行Iworkflow_exec_id
)と異なる
- 実行の状態(
- 条件が一致する場合(=多重起動)、
errorStep
へ進む
checkActiveStep
ステップ
- 目的: 多重起動の場合例外を発生させてワークフローを終了
- 処理内容:
- 条件(
s.state == "ACTIVE"
かつACTIVEなワークフローの実行IDが現在の実行IDと異なる場合
)がtrue
の場合にerrorStep
を実行 errorStep
内では、エラーを発生させてワークフローを終了
- 条件(
sleepStep
ステップ
- 目的: 多重起動を防止できているかテストするためにスリープ
- 処理内容:
- ワークフローを 30 秒間一時停止
endStep
ステップ
- 目的: ワークフローを正常終了
- 処理内容:
workflow done
を返却
試してみる
多重起動が防止できるか実際に動作させてみて確認します。確認方法としては以下となります。
- ワークフローを実行する
- 30秒以内に同一のワークフローを再実行
(2.)で起動したワークフローが失敗すれば多重起動が防止できていると考えられます。
というわけで実行してみました。以下が実行結果です。
失敗になっている実行は後から実行したワークフロー(2.のもの)です。
ワークフロー実行ID | 開始時間 | 終了時間 | |
---|---|---|---|
42dbedd8-880b-454b-bf6b-9839f0f64768 | 後続実行ワークフロー | 2024/12/14 0:22:06 | 2024/12/14 0:22:07 |
5e2e58b1-8530-49d3-8646-090328c0c247 | 先行実行ワークフロー | 2024/12/14 0:22:02 | 2024/12/14 0:22:35 |
先行実行ワークフローは30秒程度実行され成功しているのに対して、後続実行ワークフローは1秒程度しか実行されてされておらずまた失敗しております。
後続実行ワークフローの実行結果詳細を見てみます。
RuntimeError: "Already running."
と出力されていますね。Already running
は多重起動の判定を行った結果Trueになると実行されるステップで返却しているメッセージです。
このことから多重起動の判定が正しく行われたことが確認できました。
注意点
ほぼ同時(コンマ何秒の差など)にワークフローを実行した場合、正しく多重起動を防止できない可能性があります。
1つのワークフローに1分間隔で実行するスケジューラを2つ設定して動作確認してみましたが、正しく多重起動を防止できないケースを確認しました。
正しく多重起動防止できなかった原因としては、ほぼ同時に多重起動チェックを行ったためにどちらも多重起動判定がTrueになってしまったためです。
以上よりほぼ同時の実行の場合に正しく多重起動を判定するには、startTime
を用いてどちらが先行処理か判定する、ランダムなスリープ処理を入れる、Firestoreなど外部リソースにワークフロー実行状態や開始時刻を記録して複数実行中の場合は先行ワークフローを時刻を元に判定して実行するなど
Workflowsのコネクタを用いただけの多重起動対策はシビアな多重起動タイミングの場合には何かしらの工夫がいると考えます。
所感
Workflowsのコネクタだけで多重起動対策ができるのは、とてもありがたいことだと考えます。例えば人がWorkflowsのコンソールから実行ボタンを押して実行するワークロードであれば、今回の実装でも十分多重起動対策になると考えます。
一方で、pub/subや並列処理しているFunctionsからWorkflowsを起動する場合などではほぼ同時にワークフローが実行される可能性があるため意図した動作をしない可能性があると考えられます。
こちらは十分注意する必要がある点と考えます。
今度はほぼ同時の実行タイミングの多重起動対策について考えてみたいと思います。
それではまた。ナマステー