Digdagからタグごとにdbtのモデルをスケジュール実行してAthenaにデータを作成してみる
データアナリティクス事業本部 機械学習チームの鈴木です。
機械学習モデル開発用のデータマート作成に、dbt-athenaを使いたく、どんな感じで使えそうか試行錯誤中です。
前回はDigdagからdbt run
を実行するサンプルを作ってみましたが、この例は全てのモデルを手動実行したときのものなので、改めてタグを付けたモデルをスケジュール設定した時間に実行させられるか確認してみました。
この記事で紹介するスケジュール実行方法
Digdagワークフローのスケジュールとdbtのタグを使って、モデル作成をスケジュール実行する例をご紹介します。
あらかじめモデルにタグを設定しておき、dbt run
実行時に--select
オプションでどのモデルを実行するかタグの値を元に制御しました。
スケジュール実行時間とモデルの紐付けは、以下のタグのガイドを確認し、hourly
やdaily
のようなスケジュール設定用のタグの使用例が紹介されていたためこの方法を採用しました。
なお、タグの細かな挙動については以下のブログ記事が大変参考になりました。
AirflowやDagsterなど別のオーケストレーションツールであれば、ツール固有の機能もありそうでしたが、ひとまずタグが一番オーソドックスな方法だと思ったので試してみました。
環境構築
一つのEC2にdbt CoreとDigdagをまとめてインストールし、Digdagのワークフローからインスタンスのローカルでdbt run
を実行する形としました。
EC2のセットアップについては、冒頭の『Digdagからdbtを実行してAthenaにモデルを作成してみた』と同じため割愛します。
検証したバージョンとしては以下のようになります。
- Python:3.11
- dbt Core:1.5.6
- dbt-athena-community:1.5.1
- Digdag:0.10.5
dbtおよびdigdagの準備
dbtのモデルの設定
今回は1つのソーステーブルから、合わせて2つのモデルを作成するようにしました。データリネージは以下のようになります。
上のリネージではiris_avg
をハイライトしていて分かるように、iris_avg
とiris_with_avg
は順番はあれども別々の時間にずらして作成しても問題ないので、スケジュールを2つ設定して、2つのモデル実行が期待通りされるか確認することとしました。
データソースとモデルの定義は以下のようにしました。
version: 2 sources: - name: raw_tables database: awsdatacatalog schema: cm-nayuts-sample-db tables: - name: iris_raw
{{ config( tags=['hourly-1'], ) }} select class, avg(sepal_length) as avg_sepal_length, avg(sepal_width) as avg_sepal_width, avg(petal_length) as avg_petal_length, avg(petal_width) as avg_petal_width from {{ source('raw_tables','iris_raw') }} group by class
{{ config( tags=['hourly-2'], ) }} select iris_raw.class, iris_raw.sepal_length, iris_raw.sepal_width, iris_raw.petal_length, iris_raw.petal_width, iris_avg.avg_sepal_length, iris_avg.avg_sepal_width, iris_avg.avg_petal_length, iris_avg.avg_petal_width from {{ source('raw_tables','iris_raw') }} as iris_raw join {{ref('iris_avg')}} as iris_avg on iris_raw.class = iris_avg.class
モデルの定義のポイントとしては、counfig()
でタグの設定をした点です。今回はhourly-1
およびhourly-2
の2つのタグを準備しました。このタグごとにDigdagワークフローから--select
オプションを付けてモデルを実行しました。
なお、タグ名は検証なので適当ですが、実際の設計ではスケジュールの時間も多くのバリエーションが出てくると思うので、拡張性のある名前の付け方をした方がよさそうです。
Digdagワークフローの設定
以下のようにsample_project
という名前のDigdagプロジェクト配下に、hourly-1
およびhourly-2
のワークフローを作成しました。
ワークフローの定義は以下になります。
timezone: Asia/Tokyo schedule: daily>: 14:40:00 +dbt_run: sh>: dbt run --select tag:hourly-1 --project-dir test_project --profiles-dir dbt_profiles
timezone: Asia/Tokyo schedule: daily>: 14:45:00 +dbt_run: sh>: dbt run --select tag:hourly-2 --project-dir test_project --profiles-dir dbt_profiles
先にも記載したように、--select
オプションでタグを指定してdbt run
を実行するようにしました。スケジュールはDigdagの機能を使っています。
digdag server -o ./digdag_test -O ./digdag_test -n 8001
のようにサーバーを起動し、digdag push sample_project -e localhost:8001 --copy-outgoing-symlinks
でpushしました。
出来上がった各々のワークフローは以下のようになりました。
実行結果の確認
スケジュール実行時間になるのを待って、ワークフローが実行されたことを確認しました。以下はhourly-2
実行直後で、設定通り5分ずれて2つのワークフローが実行され、両方が成功したことを確認できました。
Athenaのクエリ履歴を確認すると、Digdagワークフローで設定した時間にSQLが実行されていることを確認できました。
それぞれのワークフローのセッションの記録は以下のようになっていました。dbt実行時の標準出力もDigdagのUIから確認できました。
最後に
Digdagワークフローのスケジュールとdbtのタグを使って、モデル作成をスケジュール実行する例をご紹介しました。参考になりましたら幸いです。