Google BigQueryで作成した複数のSQLを含むクエリをスケジュール実行&実行完了後にSlack通知を行う

2023.04.26

アライアンス統括部 サービスグループの しんや です。

DevelopersIOのブログ投稿データを収集・加工・可視化して社内に共有することをここまで実践してきていたのですが、その際に用いている環境にはAmazon Redshiftを活用していました。先日個人的に部署異動(2023年03月01日付けでアライアンス統括部に異動)したのもあり、またデータ連携と可視化の仕組みもよりスムーズに/便利に/広範に連携出来るようにしたいと思い、「社内投稿データ分析環境をBigQueryに載せ替える手順をそれぞれブログにまとめておこう」と思い立ちました。

当エントリではその中から「Google BigQueryのクエリをスケジュール実行し、実行完了をSlackに通知する」手順についてまとめておこうと思います。

目次

 

データ連携イメージ(構成図)

当エントリの登場要素は大きく分けて「BigQuery環境(実行クエリとスケジュール設定)」「Google Pub/Sub」「Cloud Functions」「Slackチャンネル」の計4つあります。それぞれ設定間で紐付け設定を行う事で、最終的にはそれらが繋がってやりたいことが実現出来る...という寸法です。以降、設定や連携手順について説明していきます。

 

実行するクエリの準備

まずはじめに、これは当然といえば当然なのですが「スケジュール実行対象のSQL」を用意しておいてください。今回の環境では以下のように「プロジェクトクエリ」配下に、任意のSQLクエリ(データ加工をするための様々なSQL:CREATE TABLE、INSERT、UPDATE等を記載)を、実行可能な状態(全てまとめて実行してエラーなく終えられる状態)で作成しました。

 

Webhook URLの取得

当エントリで行うSlack通知にはWebhook URLを用います。下記エントリを参考にして対象となるSlack AppのWebhook URLを取得しておいてください。

設定が完了すると、対象のSlack App上でWebhook URLを利用出来るようになります。取得したWebhook URLの情報は後述手順で使いますので控えておいてください。

 

Cloud Pub/Subトピックの作成

Slack通知の連携用にCloud Pub/Subでトピックを作成しておきます。Cloud Pub/Subの管理コンソール画面より[トピックを作成]を指定。

任意の名前を指定して[作成]を押下。

作成出来ました。トピックの名前についても後程活用するので文字列情報として控えておきましょう。

 

クエリに対するスケジュール指定(「スケジュールされたクエリ」の作成と実行確認)

作成・準備したBigQueryクエリ群をまとめたSQLファイル(タブ記載)を所定のサイクルで実行するための「クエリスケジュール」を作成します。対象としたいクエリのタブを開き、[スケジュール]から[スケジュールされたクエリを新規作成]を選択。

スケジュールに関する設定を行います。

  • スケジュールオプションを指定。ここでは「毎日AM06:30」にこのクエリを実行させたかったのでその内容に合うように(UTC指定で)時間を指定しています。
  • これとは別に、一旦このクエリスケジュールが稼働するかを確認するために「設定した時間に開始」を選んでいます。ここでは任意の時間を(JST指定で)記載することが出来ます。今回は検証日のAM06:30になる前に試したので検証日のAM06:30以前の日付を記載しています。
  • 指定した情報を踏まえて、「あなたの今の設定内容だとこういうサイクルでスケジュール稼働しますよ」というインフォメーションが表示されます。これは確認する上で便利ですね。

その他リージョン、詳細オプション、通知オプションを指定します。最後の「通知オプション」では前述手順で作成したCloud Pub/Subトピックを指定してください。

「スケジュールされたクエリ」が作成されました。

ちなみにここまでの設定内容で「スケジュールに応じた形でクエリを実行させる」事が出来るようにはなっています。時間になると以下の様にログが表示され、

実行が完了するとその旨コメントが表示されます。

また、作成した「スケジュールされたクエリ」に関しては構成の確認や編集も可能となっています。[編集]リンクをクリックすると、

設定作成前に参照したクエリの内容をそのままに、タイトルが別名のクエリとして作成・保存されていることが分かります。クエリ内容と同様にスケジュール設定自体も更新が可能です。

 

Cloud FunctionsでSlack通知コードをデプロイ

Slackに通知を行う部分の処理は、Cloud Pub/Subから連動させる形でCloud Functionsを活用して行います。

当エントリにおける最後のピース、Slack通知の部分です。Google CloudのCloud Function管理コンソールから「関数の作成」を指定。

必要事項を入力、選択していきます。「第2世代の〜」云々の表示がありますがここではそのままデフォルト第1世代で進めます。関数名に任意の名前を、そしてリージョンを指定。トリガーについては「Cloud Pub/Sub」を指定した上で、予め前述手順で作成していたCloud Pub/Subトピックを指定します。この部分の紐付けを行う事で「スケジュールされたクエリ」「Cloud Pub/Subトピック」「Cloud Functionで作成したコード」がそれぞれ連携される流れとなります。

設定が完了したら[保存]を押下しておいてください。

ランタイム以下の設定はデフォルト指定のままで進めました。[次へ]を押下。

デプロイするコードや設定を用意します。今回は極々シンプルに「"SQLの処理が終わったよ"を通知する」だけで良かったので、実装するコードもそれが最低限叶えられていれば良いという内容にしました。下記エントリを参考にWebhook URLに対して所定のメッセージを投げるだけのNode.jsコードを記載。※エントリポイント名とファンクション名の「exports.」の後の名前は同じものを記載しておく必要があります。

index.js

/**
 * Triggered from a message on a Cloud Pub/Sub topic.
 *
 * @param {!Object} event Event payload.
 * @param {!Object} context Metadata for the event.
 */
exports.slackNotification = (event, context) => {

  const { IncomingWebhook } = require("@slack/webhook");
  const webhook = new IncomingWebhook("https://hooks.slack.com/services/XXXXXXXXX/XXXXXXXXXXX/XXXXXxxxxXXXXxXxXXXXxxXX");
  (async () => {
    await webhook.send({
      text: "[ブログ投稿データ加工・編集クエリ実行] 完了.",
    });
  })();
};

上記コードでは別途モジュールを利用する必要があったので、package.jsonも以下の設定としました。コードやファイルの準備が出来たら画面下部の[デプロイ]を押下。

package.json

{
  "name": "sample-pubsub",
  "version": "0.0.1",
  "dependencies": {
    "@google-cloud/pubsub": "^0.18.0",
    "@slack/webhook" : "6.1.0"
  }
}

暫く時間を要する形でデプロイ処理が行われます。完了すると[テスト中]のメニューも実行出来るようになるので試してみます。

ここではデプロイしたコードのテストが行なえます。[関数をテストする]を押下するとコードが実行され、ログ等も確認出来るようになります。

ちなみにこのコードを実行した時点でSlackの所定のチャンネルに以下のようなメッセージが投稿されることが確認出来ていました。

 

全体実行確認

これで全ての準備が整いました!下記の流れで処理がそれぞれ実行され、

  • 「スケジュールされたクエリ」で指定した時間が来る
  • 「スケジュールされたクエリ」が実行される
  • 「スケジュールされたクエリ」に紐付けたCloud Pub/Subトピックから、トピックに紐付けているCloud Functionsで実装したコードが呼ばれる
  • Cloud Functionsで実装したコードが実行され、指定したSlackのWebhook URLにメッセージが投稿される


(※処理フロー再掲)

最終的に所定の時間を迎えたタイミングで以下の様にSlackチャンネルにメッセージが投稿されていることが確認出来ました。

 

まとめ

という訳で、「Google BigQueryで作成した複数のSQLを含むクエリをスケジュール実行&Slack通知させる」手順に関する紹介でした。下記2本プラス当エントリの計3本に渡って情報をまとめてきましたが、これで必要最低限の移行が出来た形です。

BigQueryに集約させたい、またブログ投稿データと連携して分析可視化させたい情報も色々ありますし、上記エントリでも言及していましたがこれらのデータや情報をモダンデータスタック(MDS)の各種サービスを介してより便利により有効活用出来る形で連携・展開してみたいという構想や野望(?)等もあったりします。引き続きその辺りを連携して進めていきつつ、良い感じに実現出来たものから適宜当ブログでも公開・共有していければと思います。