BigQuery へのデータ INSERT をトリガに、Cloud Functions を実行してみた。

2022.11.23

こんにちは、みかみです。

沖縄もそろそろ肌寒い毎日になってきました。

やりたいこと

  • BigQuery へのデータ追加を検知して、後続処理を実行したい
  • BigQuery へのデータ追加の監査ログをトリガに、Cloud Functions を実行したい

図にしてみると、こんな感じです。

前提

Google Cloud SDK(gcloud コマンド)の実行環境は準備済みであるものとします。 本エントリでは、Cloud Shell を使用しました。

Eventarc API や、本エントリで利用している BigQuery, Cloud Functions, Cloud Build などの API は有効化済みです。

また、Compute Engine サービスアカウント( [PROJECT_NUMBER]-compute@developer.gserviceaccount.com )に、「Eventarc イベント受信者」と 「Eventarc サービスエージェント」ロールを付与済みです。

Cloud Functions をデプロイ

以下の python コードを、main.py というファイル名で保存しました。

import functions_framework

@functions_framework.cloud_event
def fetch_insert(cloudevent):
    print(f"Event type: {cloudevent['type']}")

    event_data = cloudevent.data
    protoPayload = event_data['protoPayload']

    metadata = protoPayload['metadata']
    tableDataChange = metadata.get('tableDataChange', None)
    if tableDataChange is None:
        return 'OK', 200

    insertedRowsCount = tableDataChange.get('insertedRowsCount', None)
    if insertedRowsCount is None:
        return 'OK', 200

    resourceName = protoPayload['resourceName']
    datasetId = resourceName.split('/')[3]
    tableId = resourceName.split('/')[5]

    print(f'{insertedRowsCount} rows inserted to {datasetId}.{tableId}')

    return 'OK', 200

イベント( Audit Log )データから metadata を取得し、テーブルデータが変更されていない場合や、insert された行数の情報がない場合はそのまま終了。 insert 行数がある場合はロード先データセットとテーブル名を取得してログ出力します。

以下のコマンドで Cloud Functions をデプロイします。 BigQuery の InsertJob 監査ログ出力をトリガに Cloud Functions を起動するように、--trigger-event-filters で指定しました。

gcloud functions deploy fetch-insert \
    --gen2 \
    --region asia-northeast1 \
    --runtime python310 \
    --entry-point=fetch_insert \
    --trigger-event-filters="type=google.cloud.audit.log.v1.written" \
    --trigger-event-filters="serviceName=bigquery.googleapis.com" \
    --trigger-event-filters="methodName=google.cloud.bigquery.v2.JobService.InsertJob" \
    --trigger-location=asia-northeast1

デプロイ時に、「全てのデータアクセスログが有効になってません」な WARNING が表示されましたが、

Enable all Data Access audit logs for [bigquery.googleapis.com]? (Y/n)?  n

WARNING: Manual enablement of Data Access audit logs may be necessary.

今回使用する InsertJob のログ出力はデフォルトで有効になっているので、無視して( n + Enter で)デプロイします。

無事、デプロイできました。

Cloud Functions を実行

BigQuery のクエリエディタから insert クエリを実行して、既存テーブルに2行のレコードを追加しました。

少し待ってから、Cloud Functions のログを確認してみます。

期待通り、BigQuery へのでデータ insert をトリガに、CLoud Functions を起動することができました。

はまったところ

権限周りで、ちょっとはまりました。。

Pub/Sub サービスアカウント

はじめ、いくら待っても Cloud Functions が起動しない。。

Eventarc の「トリガーの詳細」確認したら、権限エラーになってました。。

Pub/Sub デフォルトサービスアカウントに「サービスアカウントトークン作成者」ロールを付与して解決しました。

gcloud projects add-iam-policy-binding [PROJECT_ID] \
    --member="serviceAccount:service-[PROJECT_NUMBER]@gcp-sa-pubsub.iam.gserviceaccount.com" \
    --role='roles/iam.serviceAccountTokenCreator'

ドキュメントにもちゃんと書いてありますね。。

Cloud Run のサービス間認証

Pub/Sub サービスアカウントに権限追加して、これで大丈夫だろうと思ったら、今度は 403 エラー発生。。

Compute Engine サービスアカウントに、対象サービス( Cloud Functions 関数)を実行する権限を付与して解決しました。

gcloud run services add-iam-policy-binding fetch-insert \
  --region=asia-northeast1 \
  --member='serviceAccount:[PROJECT_NUMBER]-compute@developer.gserviceaccount.com' \
  --role='roles/run.invoker'

補足:データロードの場合

load job( bq load コマンド使用)で --autodetect を指定して BigQuery にデータを追加した場合には、Audit Log 出力内容に追加行数の情報がなく、本エントリで使用しているサンプルコードでは insert 行数を検出することはできませんでした。(テーブル再作成になるためでしょうか?

ですが、--autodetect 指定なしの場合は、期待通り Cloud Functions で追加行数を検出することができました。

また、最近 GA になった LOAD DATA 構文を使用して GCS から BigQuery にデータをロードする場合には、query job で実行されるものの、ログ出力内容にはテーブルデータ更新情報は含まれず、こちらも本エントリサンプルコードの Cloud Functions では検出することはできませんでした。

処理に合わせてログ出力内容をご確認ください。

まとめ(所感)

はじめ、Eventarc のトリガー諸々を作成した後に Cloud Functions デプロイしないといけないのかと思っていたのですが、Cloud Functions デプロイ( gcloud functions deploy )コマンドだけで Eventarc トリガーも Pub/Sub トピックも自動で作成してくれるので、思ったより簡単に Cloud Functions v2 デプロイできました。 Eventarc を使えるようになったことで、Cloud Functions 起動トリガが増えたことは嬉しい限りです!

BigQuery へのデータロード後に、スケジュール実行でマート作成などの後続処理を行っているケースも多いのではないかと思いますが、Eventarc 経由で Cloud Functions や Cloud Run、Workflows を実行すれば、イベントドリブンで後続処理を実行できます。 先行処理実行後すぐに後続処理を実行したい場合や、先行処理の実行結果を判定後に後続処理を実行したい場合など、スケジュール駆動よりイベント駆動の方が望ましい場合には、Eventarc 経由の実装を是非ご検討ください。

参考