dbt Cloudのジョブがエラーになったら(Cloud Runを使って)PagerDutyにインシデントとして挙がるようにしてみた

Cloud Runすごすぎ
2023.03.30

大阪オフィスの玉井です。

dbt Cloudの利用が本格的になってくると、ジョブが何らかのエラーでこけた場合、早急な対応が求められてくると思います(データ変換が止まる→BIツールや機械学習等といった下流のデータ利用に支障が出る→ビジネス的にヤバい)。

そういうユーザーのために、dbt Cloudには、ジョブが失敗したときの通知の仕組みに関する機能が色々用意されています。その中でも、今回はPagerDutyと連携する方法を実際に試してみました。

やることの概要

Webhookを利用する

最近(2023年3月現在)、dbt CloudのジョブがWebhookに対応しました。

これまでは、dbt側のAPIを定期的にポーリングする等して、こちら側が能動的にジョブのステータスを確認する必要がありました。しかし、Webhookに対応してくれたおかげで、こちら側から定期的に確認せずとも、ジョブがこけたタイミングで、dbt側から通知させることができるようになりました。

公式案内あり

このWebhookを使った通知の仕組みを構築するやり方ですが、実は一通りの手順(のサンプル)が公式にあったりします。手順だけでなく、dbt Cloud側のWebhookを受け取ってPagerDutyのAPIに流すサンプルアプリまで既に用意されています。

サンプルアプリはDockerコンテナとして用意されているのですが、この公式手順ではfly.ioというプラットフォーム(Herokuみたいなやつ)にデプロイして動かすようになっています。非常に楽そうなのですが、そのままやっても面白みがないので、今回はfly.ioではなくCloud Runを使ってみたいと思います(コンテナなので)。

概要図

つまり、今回やることを図にすると以下のようになります。

やってみた(設定編)

前もって使えるようにしておくサービス

  • dbt Cloud
    • Teamプラン以上である
    • ユーザートークンもしくはサービストークンが発行済である
  • PagerDuty
    • Events API V2が使用できるようになっている(Integrationsが作成済である)
  • Google Cloud

サンプルアプリのリポジトリをクローンする

公式ドキュメントにもありますが、以下のリポジトリ(dbt社のエンジニアが開発したサンプルアプリ)をクローンします。

クローンしてきたリポジトリをそのままCloud Runにデプロイする

もってきたやつをまるっとそのままデプロイします。特に何か追記したり改変したりする必要はありませんでした。

ちなみに私はCloud Code for VS Codeを利用しました(すげー簡単)。デプロイ時の設定は以下の通り(ほぼデフォルトのまんま)。

dbt CloudのWebhookを設定する

dbt CloudのWeb画面を開き、右上の歯車アイコンからAccount Settingsを選びます。

画面の一番下にWebhookの設定があるので、新規作成します。

エラーを検知したいジョブ、検知するジョブのステータス、Cloud Runにデプロイしたサービスのエンドポイントを設定します。

念の為、エンドポイントとdbt Cloudが疎通できているかどうかテストしておきましょう(メニュー上部のボタンからテストできる)。

ちなみに、Webhookの設定時、シークレットキーが発行されるので必ずメモしておきます(後で使用する)。

PagerDutyを設定する

基本、以下の通りに設定すればOKです(インテグレーション設定時、選択するサービスとしてEvents API V2を選択する)。

設定後、Events API V2のインテグレーションキーが発行されるので必ずメモしておきます(後で使用する)。

Cloud Runサービスに環境変数を設定する

dbt CloudとPagerDutyの設定が完了したので、これらを、先程デプロイしたCloud Runサービスと統合していきます。

今回のサンプルアプリは3つの環境変数が必要です。

  • DBT_CLOUD_SERVICE_TOKEN
    • dbt Cloudのユーザートークンかサービストークン
  • DBT_CLOUD_AUTH_TOKEN
    • dbt CloudのWebhookのシークレットキー
  • PD_ROUTING_KEY
    • PagerDutyのEvents API V2のインテグレーションキー

これらを環境変数としてCloud Runサービスが読み取れるようにしてあげる必要があります。Cloud Runサービスに普通に環境変数として設定することもできますが、今回セットする情報はどれも重要なもの(漏れちゃいけない)なので、Secret Managerを利用します。

Google CloudのSecret Managerの画面に行き、APIを有効化します。その後は普通に上記3つの値を登録するだけです。

次に、Cloud Runのサービス詳細画面に行き、「新しいリビジョンの編集とデプロイ」を選択します。

ページの下部に「シークレット」という設定欄があるので、ここから先程設定したシークレットを指定します。

このCloud Runサービスは実行するサービスアカウントをデフォルト(Compute Engineを実行するサービスアカウントと同じ)としてデプロイしたため、最初の設定時に「実行サービスアカウントはこのシークレットにアクセスできる権限がない」的な注意メッセージが出ます。その際は、そのメッセージの「権限を付与」を選択すればOKです(もしくは、予め、実行サービスアカウントにSecret Managerへのアクセス権限を付与しておく)。

シークレットを設定後、新たなリビジョンとしてデプロイします。

やってみた(検証編)

一通りの準備が完了したので、実際にエラーが通知されるかどうかやってみましょう。

dbt Cloud側で(エラーが起こる)ジョブを実行します。

地味に、ジョブエラーを人為的に起こす方法に苦労したのですが、私は無茶苦茶なcsvファイルを用意して、それをdbt seedすることで、わざとエラーを起こしました。

そして、予定通り、エラーが発生しました。

dbt Cloudの画面でエラーを目視してから数秒後、PagerDutyに設定しているメールアドレスに通知が!

エラーの検知に成功しました!

PagerDutyの画面上でも確認できます。

おわりに

PagerDutyは、サービスから発せられるアラートを受け取り、適切に対応できるようにしてくれるSaaSです。

データ分析基盤というものは、dbt以外にも色々なシステムやサービスが動いているはずで、どれかが止まるとデータの流れ全体に影響が出てしまうと思います。それらが発するアラートを仕分けして適切な担当者にエスカレーションしたいという時、このPagerDutyを使った連携はとても有効かと思いました。