プロジェクト間でイベントルーティングして、別プロジェクトの BigQuery INSERT トリガで Cloud Functions を実行してみた。

2023.01.13

こんにちは、みかみです。

本年も、どうぞよろしくお願いいたします。

やりたいこと

  • BigQuery へのデータ追加を検知して、後続処理を実行したい
  • 別プロジェクトの BigQuery へのデータ INSERT を検知して、Cloud Functions を実行したい

以前、 BigQuery へのデータ INSERT を検知して Cloud Functions を実行してみましたが、この Audit Log トリガの Cloud Functions 実行は、ログが同じプロジェクトに出力される場合にしか使えません。

複数プロジェクトに跨がる BigQuery イベントをまとめて管理したい場合などには、Pub/Sub 経由のログシンクを作成することにより実現できるようです。

構成としてはこんなイメージです。

前提

Google Cloud SDK(gcloud コマンド)の実行環境は準備済みであるものとします。 本エントリでは、Cloud Shell を使用しました。

本エントリで利用している BigQuery, Cloud Functions, Eventarc, Cloud Build などの API は有効化済みで、操作するアカウントに各設定に必要な権限は付与済みです。 動作確認時には、両方のプロジェクトでプロジェクトオーナーロールを付与したアカウントを使用しています。

また、Compute Engine サービスアカウント( [PROJECT_NUMBER]-compute@developer.gserviceaccount.com )に、「Eventarc イベント受信者」と 「Eventarc サービスエージェント」ロールを付与済みです。

Pub/Sub トピックを作成

プロジェクト A のログを受け取るために、プロジェクト B に Pub/Sub トピックを作成します。

gcloud config set project でプロジェクト B を設定後、以下のコマンドを実行しました。

gcloud pubsub topics create audit_log_sync

ログシンクを作成

先ほど作成したプロジェクト B の Pub/Sub に対象のログをパブリッシュするログシンクを、プロジェクト A に作成します。

gcloud config のプロジェクトをプロジェクト A に切り替えてから、以下のコマンドを実行しました。 --log-filter 条件に一致するログを、プロジェクト B の audit_log_sync トピックにパブリッシュします。

gcloud logging sinks create cross-project-sink \
    pubsub.googleapis.com/projects/cm-da-mikami-yuki-xxxx/topics/audit_log_sync \
    --log-filter='protoPayload.methodName="google.cloud.bigquery.v2.JobService.InsertJob"'

※一部伏せ字に変更しました。

コマンド実行結果でログシンクのサービスエージェントが表示されるので、メモっておきます。

$ gcloud logging sinks create cross-project-sink \
    pubsub.googleapis.com/projects/cm-da-mikami-yuki-xxxx/topics/audit_log_sync \
    --log-filter='protoPayload.methodName="google.cloud.bigquery.v2.JobService.InsertJob"'
(省略)
Please remember to grant `serviceAccount:p[PROJECT_NUMBER]-445944@gcp-sa-logging.iam.gserviceaccount.com` the Pub/Sub Publisher role on the topic.
More information about sinks can be found at https://cloud.google.com/logging/docs/export/configure_export

※一部伏せ字に変更しました。

このサービスエージェントに、プロジェクト B の Pub/Sub にメッセージをパブリッシュするための権限を付与します。

以下のコマンドを実行しました。

gcloud projects add-iam-policy-binding cm-da-mikami-yuki-xxxx \
    --member serviceAccount:p[PROJECT_NUMBER]-445944@gcp-sa-logging.iam.gserviceaccount.com \
    --role roles/pubsub.publisher

※一部伏せ字に変更しました。

この Pub/Sub パブリッシャーロールをつけ忘れると、ログシンク時に権限エラーが発生します。

合わせて、メールでもシンクエラーが通知されました。。(プロジェクトオーナー宛てに通知?

Cloud Functions をデプロイ

以下の python コードを、main.py というファイル名で保存しました。

import base64
import json
import functions_framework

@functions_framework.cloud_event
def fetch_insert_log_sync(cloud_event):
   message = json.loads(base64.b64decode(cloud_event.data["message"]["data"]))
   print(message)
   protoPayload = message.get('protoPayload', None)
   metadata = protoPayload.get('metadata', None)
   tableDataChange = metadata.get('tableDataChange', None)
   if tableDataChange is None:
      print('no tableDataChange...')
      return
   insertedRowsCount = tableDataChange.get('insertedRowsCount', None)
   if insertedRowsCount is None:
      print('no INSERT data...')
      return

   resourceName = protoPayload.get('resourceName', None)
   project = resourceName.split('/')[1]
   dataset = resourceName.split('/')[3]
   table = resourceName.split('/')[5]
   print(f'**** insert data: {project}.{dataset}.{table}({insertedRowsCount} rows) ****')

Pub/Sub メッセージからシンクログの内容を参照して、BigQuery にデータが INSERT された場合に、対象のテーブル名と追加行数をログ出力します。

以下のコマンドで Cloud Functions をデプロイします。 audit_log_sync トピックへのパブリッシュイベントをトリガに Cloud Functions を起動するように、--trigger-topic でトピック名を指定します。

gcloud functions deploy fetch-insert-log-sync \
    --gen2 \
    --region asia-northeast1 \
    --runtime python310 \
    --entry-point=fetch_insert_log_sync \
    --trigger-topic="audit_log_sync" \
    --trigger-location=asia-northeast1

合わせて、以下のコマンドで Cloud Run サービス間認証も設定しました。

gcloud run services add-iam-policy-binding fetch-insert-log-sync \
  --region=asia-northeast1 \
  --member='serviceAccount:[PROJECT_NUMBER]-compute@developer.gserviceaccount.com' \
  --role='roles/run.invoker'

※一部伏せ字に変更しました。

動作確認

準備できたので、プロジェクト A の BigQuery テーブルにデータを INSERT して、プロジェクト B のCloud Functions が実行されるか確認してみます。

プロジェクト A の BigQuery には、動作確認用に以下の簡単なテーブルを作成済みです。

プロジェクト A の BigQuery クエリエディタから以下のクエリを実行して、テーブルにデータを INSERT しました。

insert into ds_sample.table_test values (1, 'test for cross project event trigger.')

先ほどデプロイした Cloud Functions のログを確認してみます。

意図した通り、プロジェクト A の BigQuery への データ INSERT イベントをトリガに、プロジェクト B で Cloud Functions を実行することができました。

まとめ(所感)

別プロジェクトの BigQuery からでもイベントドリブンで後続処理を実装できることが確認できました。 今回は動作確認目的だったため Cloud Functions ではログ出力しただけでしたが、ログ出力内容を参照して、例えば INSERT 行数を Slack などで通知したり、意図しないユーザーからのクエリ発行を検出したり、様々な後続処理を実装することが可能です。

本エントリでは別プロジェクトのイベント検知を試してみましたが、別リージョンのイベントルーティングも、同じように Pub/Sub 経由で実現できるそうです。 また、もちろん BigQuery 以外のイベントのルーティングも可能なので、用途に合わせていろいろと活用できそうです。

参考