WorkflowsからDataformワークフローをポーリングしてみた

WorkflowsからDataformを呼び出して、Dataformワークフローの実行状態をポーリングしてみました。
2024.05.08

データアナリティクス事業本部の根本です。今日はWorkflowsからDataformワークフローを起動して、ポーリングしてみました。DataformワークフローをWorkflowsから起動しただけで終わるのだと物足りないなと思っていた方はぜひ読んでみてください。

この記事の対象者

  • Dataformワークフローをポーリングしようと思っているひと

前提条件

  • Dataform、WorkflowsAPIが使用できること

検証の全体像

  • WorkflowsからDataformワークフローの実行状態を取得して、実行状態が完了になるまでポーリングする

検証用に実装するWorkflowsの処理の流れとしては、以下となります。
1. Dataformのソースをコンパイル
2. その結果を元にDataformワークフローを実行
3. Dataformワークフロー実行後は即時ワークフローの実行状態が返却される。その後実行状態を確認するAPIを数秒間隔で発行してDataformワークフローが実行完了するまでポーリング
イメージとしては以下となります。
それでは検証していきます!

やってみる

Dataform側の準備

まずはDataform側の準備を行います。Workflowsからのポーリングで実行中の状態を確認したいので10秒程度かかる処理を組みました。
以下の4つのSQLXファイルを作成しました。

  • test_1.sqlx
  • test_2.sqlx
  • test_3.sqlx
  • test_4.sqlx

各SQLXファイルの処理は単純なもので、まずtest_1テーブルを列の値1で作成して、それ以降は$refを用いて前のViewを参照して作成していくというものです。
test_1test_2のSQLXファイルだけ例に示します。

test_1.sqlx

config { 
    type: "view",
}
SELECT 1 AS test_1

test_2.sqlx

config { 
    type: "view",
}
SELECT * FROM ${ref("test_1")}

環境にもよりますが、試してみたら大体10秒前後の処理時間になっていたのでこのDataformワークフローを元にしてWorkflowsからポーリングしてみます。

Workflows側の準備

まずはWorkflowsのyamlを以下に示します。

main:
    steps:
    - init:
        assign:
        - repository: projects/プロジェクトID/locations/asia-northeast1/repositories/リポジトリ名
    - createCompilationResult:
        call: http.post
        args:
            url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"}
            auth:
                type: OAuth2
            body:
                gitCommitish: "main"
        result: compilationResult
    - createWorkflowInvocation:
        call: http.post
        args:
            url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"}
            auth:
                type: OAuth2
            body:
                compilationResult: ${compilationResult.body.name}
        result: createWorkflowInvocationResult
    - checkDataform:
        call: http.get
        args:
            url: ${"https://dataform.googleapis.com/v1beta1/" + createWorkflowInvocationResult.body.name}
            auth:
                type: OAuth2
        result: checkStatus
    - checkIfDone:
        switch:
          - condition: ${checkStatus.body.state == "SUCCEEDED"}
            return: ${"ポーリングを終了します。 STATE:" + checkStatus.body.state}
    - printLog:
          call: sys.log
          args:
              text: ${"ポーリングを開始します。STATE:" + checkStatus.body.state}
              severity: INFO
    - wait:
        call: sys.sleep
        args:
            seconds: 2
        next: checkDataform

それではポーリング処理の箇所に関して説明していきます。
※コンパイル結果作成、ワークフロー呼び出しに関してはこちらの記事をご確認ください。

    - checkDataform:
        call: http.get
        args:
            url: ${"https://dataform.googleapis.com/v1beta1/" + createWorkflowInvocationResult.body.name}
            auth:
                type: OAuth2
        result: jobStatus

上記処理にてDataformワークフロー実行状態を取得しています。APIとしては以下になります。
リファレンスはこちらです。

Method: projects.locations.repositories.workflowInvocations.get

本APIは、Dataformワークフロー実行結果のnameを元に呼び出すことができます。   createWorkflowInvocation:ステップにて呼び出し結果をcreateWorkflowInvocationResultに格納して、その値を用いて以下のようにしてURLを組み立てます。

url: ${"https://dataform.googleapis.com/v1beta1/" + createWorkflowInvocationResult.body.name}

Dataformワークフローの実行結果はリファレンスより以下となるので、ポーリングを終了する状態(state)を選んで設定します。今回はSUCCEEDEDになったらポーリングを終了するようにしました。

state 概要
STATE_UNSPECIFIED 初期値。使用されることはない
RUNNING ワークフローが実行中
SUCCEEDED 成功
CANCELLED キャンセル
FAILED 失敗
CANCELING キャンセル中

注意点としてポーリングする状態、Dataformの実装、Dataformワークフロー実行結果によっては意図せぬ待機時間、実行ステップ数になってしまう可能性があるのでご注意ください。

ワークフロー実行の最長時間(開始時刻から終了時刻まで)。この上限を超えると、ワークフローはタイムアウト エラーで終了します。

引用:リソースの上限
※今回の実装は検証用なので最低限の実装しかしていません。ポーリング最大時間を制限したり、起動後にWorkflowsが停止しない場合はWorkflowsの実行をキャンセルするなどして停止させてください。
それではポーリング処理を見ていきます。

    - checkIfDone:
        switch:
          - condition: ${checkStatus.body.state == "SUCCEEDED"}
            return: ${"ポーリングを終了します。 STATE:" + checkStatus.body.state}
    - printLog:
          call: sys.log
          args:
              text: ${"ポーリングを開始します。STATE:" + checkStatus.body.state}
              severity: INFO
    - wait:
        call: sys.sleep
        args:
            seconds: 2
        next: checkDataform

switchステートメントを用いてDataformワークフローの実行状態がSUCCEEDEDかどうかを判定しています。
SUCCEEDED以外であれば、後続のwait処理が実行されて指定秒数(今回は2秒)待機して、再度checkDataformステップを実行してDataformワークフローの実行状態を取得します。
wait処理で用いているsys.sleep関数はWorkflowsの指定秒数待機してくれる関数です。

動かしてみる

それでは実際にWorkflowsを動かしてみます。 実行に成功していました。"ポーリングを終了します。 STATE:SUCCEEDED"のメッセージより、DataformワークフローがSUCCEEDEDになったことも確認できました。
Workflowsのログも見てみます。 4回ポーリングをして、Dataformワークフローの状態がRUNNINGとなっていることが確認できました。検証成功です。

おわりに

WorkflowsからDataformワークフローのポーリングができてよかったです。今後Dataformワークフローを起動した後、状態を追いたい場合に使っていきたいと思います。ただ、ポーリングする際にはDataform側の状態に注意して意図せぬ待機時間にならないようにWorkflows側での実装に注意する必要があります。この点は注意ポイントだと思います。
この記事がどなたかのお役に立てば幸いです。それではまた。

参考

埋め込み switch ステートメントを使用してステップを実行する
Dataform API
Workflowsリソースの上限
switchステートメントのリファレンス