
Cloud FunctionsからDataformワークフローを起動してみた
この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
データアナリティクス事業本部の根本です。最近DataformのAPIが好きすぎて暇な時はリファレンスを見たり脳内でAPI実行して遊んでいます。そんなわけで、今回はCloud FunctionsからDataformワークフローをAPIで起動してみました。よかったら読んでみてください。
この記事の対象者
- DataformワークフローをCloud Functionsから起動したいひと
前提条件
- Dataform,Cloud Functions APIが使用できること
検証の全体像
- Cloud FunctionsからDataformAPIを起動できること
- 起動するDataformAPIはコンパイル実行、ワークフロー実行API
 以下のイメージです。
  
DataformのAPIを叩いたことがない人はこの記事を先に見ていただけるとイメージが湧きやすいと思います。
それでは早速やってみます!
やってみる
まずはCloud Functions関数を準備します。(Dataformワークフロー実行のためだけの最低限の実装で例外処理などはしません)
Cloud Functions関数の処理フローは以下のイメージです。

具体的な実装は以下となります。
import functions_framework
import requests
import json
ENDPOINT = "http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token"
COMPILE_ENDPOINT = "https://dataform.googleapis.com/v1beta1/projects/プロジェクトID/locations/asia-northeast1/repositories/リポジトリ名/compilationResults"
EXECUTE_ENDPOINT = "https://dataform.googleapis.com/v1beta1/projects/プロジェクトID/locations/asia-northeast1/repositories/リポジトリ名/workflowInvocations"
def fetch_token():
    '''
    Token取得用関数
    '''
    token_header = {"Metadata-Flavor": "Google"}
    token_response = requests.get(ENDPOINT, headers=token_header)
    token_json_data = token_response.json()
    token = token_json_data["access_token"]
    return token
def execute_compile(auth_header):
    '''
    コンパイル実行関数
    '''
    compile_data = {"gitCommitish" : "main"}
    compile_response = requests.post(COMPILE_ENDPOINT, headers=auth_header, data=json.dumps(compile_data))
    compile_json = compile_response.json()
    print(compile_json)
    return compile_json["name"]
def execute_workflow(auth_header, result):
    '''
    ワークフロー実行関数
    '''
    exec_data = {"compilationResult" : result}
    execute_response = requests.post(EXECUTE_ENDPOINT, headers=auth_header, data=json.dumps(exec_data))
    print(execute_response.json())
    return "Dataformワークフローを実行しました"
@functions_framework.http
def dataform_call(request):
    '''
    メイン関数
    '''
    access_token = fetch_token()
    header = {"Authorization": f"Bearer {access_token}"}
    compile_result = execute_compile(header)
    response_message = execute_workflow(header, compile_result)
    return response_message
処理の流れをそれぞれ見ていきます。
認証
まずはDataformのREST APIを叩くので、認証をしないといけません。
今回はCLIで叩くときと同様に、Bearer認証にて行いますがそのためにはトークンが必要となります。ローカル環境であればgcloudコマンドでトークンの取得やそのトークンを
用いてAPIの実行ができるのですがCloud Functionsではgcloudコマンドでトークンの所得を行うことができません。そこで今回はメタデータサーバからトークンを取得してBearer認証を行いました。リファレンス
※メタデータサーバはローカル環境からではアクセスできません。Google Cloud環境上のリソース(Cloud FunctionsやComput Engineなど)からアクセスできます
メタデータサーバからのトークン取得は以下コードです。
def fetch_token():
    '''
    Token取得用関数
    '''
    token_header = {"Metadata-Flavor": "Google"}
    token_response = requests.get(ENDPOINT, headers=token_header)
    token_json_data = token_response.json()
    token = token_json_data["access_token"]
    return token
requestsライブラリを用いて、メタデータサーバへリクエストを送りレスポンスのaccess_tokenを取得して返却する関数となります。この関数を用いて後続のDataformAPIを叩くときのBearer認証トークンを取得します。
コンパイル実行
Bearer認証トークンを入手してしまえさえすれば、あとはもうDataform APIを叩いていくだけです。Dataformワークフローを実行する場合は
1. コンパイルの実行
2. コンパイル結果を元にDataformワークフロー実行
という流れになるので、順番にAPIを叩いていきます。まずはコンパイルを行います。
コンパイルは以下のコードで行います。
def execute_compile(auth_header):
    '''
    コンパイル実行関数
    '''
    compile_data = {"gitCommitish" : "main"}
    compile_response = requests.post(COMPILE_ENDPOINT, headers=auth_header, data=json.dumps(compile_data))
    compile_json = compile_response.json()
    print(compile_json)
    return compile_json["name"]
COMPILE_ENDPOINTでDataformのコンパイル用APIを指定しています。リファレンスはこちらです。
コンパイルAPIではコンパイル対象としては以下が指定できます。
| key | 概要 | 
|---|---|
| gitCommitish | GitコミットID、ブランチ名、Gitタグ名 | 
| workspace | ワークスペース名 | 
| releaseConfig | リリース構成の名称(リリースとスケジュールで設定した名称) | 
今回はgitCommitishにてmainブランチを指定しています。
Dataformワークフロー実行
コンパイル結果を元に後続のDataformワークフロー実行処理を行なっていきます。
def execute_workflow(auth_header, result):
    '''
    ワークフロー実行関数
    '''
    exec_data = {"compilationResult" : result}
    execute_response = requests.post(EXECUTE_ENDPOINT, headers=auth_header, data=json.dumps(exec_data))
    print(execute_response.json())
    return "Dataformワークフローを実行しました"
EXECUTE_ENDPOINTでDataformのワークフロー実行APIを指定しています。リファレンスはこちら
コンパイル結果を{"compilationResult" : result}にて作成して、リクエストボディに格納してAPIを叩きます。このAPIはワークフローを起動するのみで、実行後はワークフローの状態を返却します。
状態(state)として返却される値はリファレンスより以下のどれかになります。
| state | 概要 | 
|---|---|
| STATE_UNSPECIFIED | 初期値。使用されることはない | 
| RUNNING | ワークフローが実行中 | 
| SUCCEEDED | 成功 | 
| CANCELLED | キャンセル | 
| FAILED | 失敗 | 
| CANCELING | キャンセル中 | 
実行状態を取得して、後続の処理を行うなどする場合は状態(state)をAPIでポーリングする実装を組む必要があります。今回は検証なので実行した後は画面で実行結果を確認するのでこれ以上の実装はしません(今度やりたいと思ってます)。
実装ができたので関数をデプロイします。デプロイは以下のgcloudコマンドにて行いました(今回関数に付与したサービスアカウントは検証用なので編集者権限を付与しています)。
gcloud functions deploy dataform_call \ --gen2 \ --region=asia-northeast1 \ --runtime=python311 \ --source=. \ --entry-point=dataform_call \ --trigger-http \ --service-account=サービスアカウント
実行
それではコマンドラインよりcurlコマンドでCloud Functions関数を起動します。
curl https://asia-northeast1-プロジェクトID.cloudfunctions.net/エンドポイント \ -H "Authorization: bearer $(gcloud auth print-identity-token)"
問題なく実行されたので、Dataformの画面から実行結果を確認してみます。

無事実行されていました。Cloud FunctionsからDataformのAPIを実行することができました!DataformのAPIを叩いて成功するとやっぱり嬉しいですね。最高です。
おわりに
Cloud FunctionsからDataformワークフローを実行することができると、例えばCloud Storageへのファイル保存トリガーでDataform起動!なんてイベントドリブンなDataform起動もできると思います。Workflowsから起動するのも良いですが、簡単な処理フローだったらCloud Functions→Dataform起動、なんてのもありなのかなと思いました。選択肢の一つとして心の片隅に持っておきたいと思います。
この記事がどなたかのお役に立てば嬉しいです。それではまた。












