BigQueryのデータ転送の結果を知りたい

2022.04.11

アマゾンのS3に溜めたデータをBigQueryで利用できるようにデータ転送を使っています。

ちゃんとデータが転送されたかどうかを知りたいという要望が出てきた時のため、データ転送サービスに用意されている通知オプションで通知を受け取る方法を試していきたいと思います。

参考: BigQuery Data Transfer Service の実行通知

データ転送にはメール通知とPub/Sub通知の2つが用意されており、どちらも有効にできます。

  • メール通知: 転送実行の失敗時に送信される、人が読んで理解できる通知
  • Pub/Sub通知: 転送実行の正常完了時または失敗時に送信される、機械で読み取り可能な通知

とドキュメントに定義されています。

メール通知は失敗時だけなので、Pub/Sub通知を使って人間が理解できる内容にして通知する方法を考えます。

Pub/Subとは

やってみる

前提

  • 権限

通知を受け取るトピックを作成でpubsub.topics.createが、新規トピックと既存トピックのいずれを使用するかにかかわりなく、pubsub.topics.setIamPolicyが必要です。

転送に通知設定を行う

新規作成の画面、既存の通知の編集画面の下部にある通知オプションまでスクロールし、Pub/Sub通知のトグルをオンにします

トピックを作成するをクリックします。

作成のダイアログが立ち上がるのでトピック IDを入力し,作成ボタンを押して作成します。

その他のオプションでの

  • スキーマを使用する
  • メッセージの保持期間を設定する(有料)
  • 顧客管理の暗号鍵(CMEK)を使用する

は何もチェックしなくても大丈夫です。

スキーマに関してですが、

Pub/Sub トピックの作成時にカスタム スキーマを指定しないでください

とドキュメントに書かれていますので注意書きに従います。

スキーマの作成と管理

トピックが作成されたら保存を押して完了です

コンソールのPub/Subのページに作成されたトピックが表示されていますね。

トピックにサブスクリプションを追加

トピックについてのメッセージを外部に送信するためにサブスクリプションを作成します。

サブスクリプションの作成 についてはリンクを参照。

配信タイプをPush型にして、エンドポイントのURLにメッセージを受信させるようにしました。

トピックのメッセージの中身を確認

エンドポイント側で受信したメッセージの中身は以下のようになっています

{
  message: {
    attributes: {
      eventType: 'TRANSFER_RUN_FINISHED',
      payloadFormat: 'JSON_API_V1'
    },
    data: 'ewogICJkYXRhU291cmNlSWQiOiAiYW1hem9uX<<snip>>',
    messageId: '4030107615706953',
    message_id: '4030107615706953',
    publishTime: '2022-04-08T07:13:15.933Z',
    publish_time: '2022-04-08T07:13:15.933Z'
  },
  subscription: 'projects/cm-alliance-prd/subscriptions/s3-transfer-topic-to-slack'
}

eventTypeTRANSFER_RUN_FINISHEDが固定となります。

message.dataの中に実際のメッセージが入っており、ここはBase64エンコードされています。

message.dataの中身をデコードして通知用のメッセージに使っていけます。

データソースがどこなのか、転送先がどこなのか、転送結果がどうだったか などが入っています。

{
  "dataSourceId": "amazon_s3",
  "destinationDatasetId": "xxxxxxxx",
  "emailPreferences": {
    "enableFailureEmail": true
  },
  "endTime": "2022-04-08T07:13:15.733904Z",
  "errorStatus": {

  },
  "name": "projects/xxxxxxxx",
  "notificationPubsubTopic": "projects/cm-alliance-prd/topics/s3-transfer-notification-topic",
  "params": {
    "access_key_id": "xxxxxxxx",
    "data_path": "s3://xxxxxxxx/*",
    "destination_table_name_template": "xxxxxxxx",
    "field_delimiter": ",",
    "file_format": "PARQUET",
    "max_bad_records": "0",
    "secret_access_key": "xxxxxxxx",
    "skip_leading_rows": "0"
  },
  "runTime": "2022-04-08T07:10:43.241Z",
  "scheduleTime": "2022-04-08T07:10:45.489952Z",
  "startTime": "2022-04-08T07:10:45.98042Z",
  "state": "SUCCEEDED",
  "updateTime": "2022-04-08T07:13:15.733914Z",
  "userId": "-729723311078993498"
}

トピックのメッセージをSlackに通知してみる

slackのwebhookを利用してメッセージを通知してみます。

Slack での Incoming Webhook の利用

Push型にしたサブスクリプションのエンドポイントでこの処理を行います。

サンプル)

@slack/webhookを使ってメッセージを送信する例です。

message.dataをデコードして、その中の要素をテキストに組み込んでみました。

const { IncomingWebhook } = require('@slack/webhook');

const webhook = new IncomingWebhook(process.env.SLACK_WEBHOOK_URL);

const message = req.body.message
const data = JSON.parse((Buffer.from(message.data, 'base64').toString())
console.log(data)

const custom_text = `Bigquery データ転送完了通知\nデータソース: ${data.dataSourceId}\n転送先: ${data.destinationDatasetId}->${data.params.destination_table_name_template}\n結果: ${data.state}\n\nPubsubTopic: ${data.notificationPubsubTopic}\n実行開始時間: ${data.startTime}`

(async () => {
        await webhook.send({
            text: custom_text,
        });
})();

上記のようにサブスクリプションで受信したメッセージを使ってslckへの通知をすることができました。