BigQuery のスケジューリングクエリが失敗した時だけSlackで通知したい

2022.01.31

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは!エノカワです。

BigQuery にはSQLを定期実行することができるスケジューリングクエリという機能があります。

また、オプションでスケジューリングクエリが失敗した際にメールを送信するように設定することができます。
となると、「Slackにも通知して欲しい」と考えてしまうのが人情ですよね?

ということで、今回はBigQuery のスケジューリングクエリが失敗した時だけSlackで通知するを試してみました。

概要

構成図はこんな感じです。

BigQuery でスケジューリングクエリが実行されると、Cloud Pub/Sub のトピックに通知するように設定します。
トピックをトリガーとした Cloud Functions の関数を作成し、Cloud Functions からSlackの Webhook をコールして通知します。

Cloud Functions の関数では、クエリが失敗した時だけ情報を抽出してSlackに通知するようにします。

Cloud Pub/Sub

トピック作成

まずは、Cloud Pub/Sub でトピックを作成します。

Cloud Pub/Sub のコンソールから「トピックの作成」ボタンをクリックします。

トピックIDにnotify_slackと入力して、「トピックを作成」をクリックします。

トピックnotify_slackが作成されました。

このトピックnotify_slackがこれから作成する

  • BigQuery スケジューリングクエリの通知先
  • Cloud Functions 関数のトリガー

になります。

BigQuery

次にスケジューリングクエリを作成します。

今回は BigQuery 一般公開データセットであるニューヨークのタクシーゾーンのテーブルを使用します。

テーブルのSELECT結果をテーブル出力するスケジューリングクエリを作成します。

bigquery-public-data.new_york_taxi_trips.taxi_zone_geom データサンプル

データセット作成

BigQuery コンソールに移動し、テーブルの出力先となるデータセットを作成します。

データセットの作成画面で下記の内容を入力し、「データセットを作成」ボタンをクリックします。

  • データセットID
    output
  • データのロケーション
    us(米国の複数のリージョン)

スケジューリングクエリ作成

クエリエディタに下記SQLを入力します。

SELECT
  zone_id,
  zone_name,
  borough
FROM
  `bigquery-public-data.new_york_taxi_trips.taxi_zone_geom`
;

「スケジュール」から「スケジュールされたクエリを新規作成」を選択します。

スケジューリングクエリの詳細画面が表示されるので、下記の内容を入力します。

  • スケジューリングされたクエリの名前
    ondemand_query
  • 繰り返しの頻度
    オンデマンド
  • クエリ結果の書き込み先
    データセット名: output
    テーブル名: taxi_zone_geom

繰り返しの頻度では、「毎時」や「毎日」などを設定することができます。
今回は任意のタイミングで実行したいので、「オンデマンド」を設定しています。

通知オプションの Cloud Pub/Sub トピックに作成したトピックnotify_slackのパスを入力し、
「スケジュール」ボタンをクリックします。

スケジューリングクエリondemand_queryが作成されました。

Slack

WebHook URL取得

Cloud Functions 関数を作成する前に通知先Slackチャンネルの WebHook URL を取得しておきます。

取得手順は公式ドキュメントを参考にしました。
ここでは割愛しますので、詳細は下記を参照ください。

Cloud Functions

関数作成

最後の仕上げに Cloud Functions でSlack通知の関数を作成します。

Cloud Functions のコンソールから「関数の作成」ボタンをクリックします。

関数の構成画面が表示されるので、下記の内容を入力します。

  • 関数名
    notify_slack
  • リージョン
    asia-northeast1
  • トリガー
    トリガーのタイプ: Cloud Pub/Sub

Cloud Pub/Sub トピックは作成したトピックnotify_slackを選択します。

ランタイム環境変数も設定しておきます。
WEBHOOK_URLという名前で通知先Slackチャンネルの Webhook URL を設定します。

「次へ」ボタンをクリックします。

関数のコード画面が表示されるので、下記の内容を入力して「デプロイ」ボタンをクリックします。

  • ランタイム
    Python3.7
  • エントリ ポイント
    notify_slack

  • main.pyのコード
import base64
import requests
import json
import os


def get_scheduled_query_error_summary(message):
    run = message['name']
    schedule_time = message['scheduleTime']
    run_time = message['endTime']
    error_status = message['errorStatus']['message']
    query = message['params']['query']

    error_summary = {
        "text": "スケジュールされたクエリが失敗しました。",
        "attachments": [{
            "color": "danger",
            "pretext": "BigQuery Data Transfer Service - Transfer Run Failure",
            "title": "Run summary",
            "fields": [
                {
                    "title": "Run",
                    "value": run,
                },
                {
                    "title": "Schedule time",
                    "value": schedule_time,
                },
                {
                    "title": "Run time",
                    "value": run_time,
                },
                {
                    "title": "Error status",
                    "value": error_status,
                },
                {
                    "title": "Query",
                    "value": query,
                }],
        }]
    }

    return error_summary


def notify_slack(event, context):
    # イベントデータ取得
    event_data = base64.b64decode(event['data']).decode('utf-8')

    # メッセージ取得
    message = json.loads(event_data)

    # クエリ失敗以外は処理せず終了
    if message['state'] != 'FAILED':
        return

    # エラーサマリ取得
    notify_msg = get_scheduled_query_error_summary(message)

    # Slack通知
    url = os.getenv('WEBHOOK_URL')
    response = requests.post(url, data=json.dumps(notify_msg))

    return print(response.text.encode('utf8'))

コードの説明

  • スケジューリングクエリが完了すると、Cloud Pub/Sub 経由で BigQuery Data Transfer Service の実行通知 が送られてきます。
  • notify_slack関数では、通知イベントからメッセージを取り出し、statusの値が FAILED (クエリ失敗)であれば、メッセージの内容をもとにエラー情報を構築して、Slack通知を行っています。
  • BigQuery Data Transfer Service の実行通知 の詳細は下記ドキュメントを参考にしました。

関数notify_slackが作成されました。

スケジューリングクエリ実行

それでは、スケジューリングクエリを実行してSlack通知を確認してみましょう。

クエリ成功

まずは、クエリ成功のパターンです。
今回はクエリが失敗した時だけSlack通知するようにしているため、「Slack通知されない」が期待動作です。

スケジューリングクエリはオンデマンドにしているので、手動で実行します。

ondemand_queryの詳細画面で「今すく転送を実行」をクリックします。

Run one time transferが選択された状態で「OK」ボタンをクリックすると、
スケジューリングクエリが即時実行されます。

データセットoutputにテーブルtaxi_zone_goemが作成されました。
クエリが成功したようです。

想定通り、Slack通知は来ていません!

クエリ失敗

つづいて、クエリ失敗のパターンです。
今回はクエリが失敗した時だけSlack通知するようにしているため、「Slack通知される」が期待動作です。

意図的にクエリを失敗させたいので、SQLを下記のように書き換えます。
存在しないテーブルを参照しているので、クエリは失敗するはずです。

SELECT
  zone_id,
  zone_name,
  borough
FROM
  `bigquery-public-data.new_york_taxi_trips.taxi_zone_geom_phantom`
;

「スケジュール」から「スケジュールされたクエリを更新」を選択します。

クエリが更新されたので、スケジューリングクエリを即時実行します。

すると、クエリが失敗し、、、

Slack通知が来ました!

失敗したクエリの実行時刻やステータス等が確認できます。

まとめ

以上、BigQuery のスケジューリングクエリが失敗した時だけSlackで通知するを試してみました。

今回は利用しませんでしたが、スケジューリングクエリの通知オプションで
「メール通知を送信する」を有効にすると、メールにも通知を送信することができます。
ただし、クエリ失敗時のみであり、送信先はクエリのオーナーに限定されています。

その点、今回のように Cloud Pub/Sub と Cloud Functions を利用する方法であれば、
通知イベントの内容を解析できるので、クエリが成功した時も通知することができますし、
Slackのようなメール以外の通知手段も活用することができます。

ただし、メール通知と通知イベントとでは受け取れる情報に差があるようなので、この点は注意が必要そうです。
※例えば、メール通知には含まれる「スケジューリングクエリの表示名」(今回の場合はondemand_query)が通知イベントには見当たりませんでした。

参考