Digdagからタグごとにdbtのモデルをスケジュール実行してAthenaにデータを作成してみる

Digdagワークフローのスケジュールとdbtのタグを使って、モデル作成をスケジュール実行する例をご紹介します。
2023.09.12

データアナリティクス事業本部 機械学習チームの鈴木です。

機械学習モデル開発用のデータマート作成に、dbt-athenaを使いたく、どんな感じで使えそうか試行錯誤中です。

前回はDigdagからdbt runを実行するサンプルを作ってみましたが、この例は全てのモデルを手動実行したときのものなので、改めてタグを付けたモデルをスケジュール設定した時間に実行させられるか確認してみました。

この記事で紹介するスケジュール実行方法

Digdagワークフローのスケジュールとdbtのタグを使って、モデル作成をスケジュール実行する例をご紹介します。

あらかじめモデルにタグを設定しておき、dbt run実行時に--selectオプションでどのモデルを実行するかタグの値を元に制御しました。

スケジュール実行時間とモデルの紐付けは、以下のタグのガイドを確認し、hourlydailyのようなスケジュール設定用のタグの使用例が紹介されていたためこの方法を採用しました。

なお、タグの細かな挙動については以下のブログ記事が大変参考になりました。

AirflowやDagsterなど別のオーケストレーションツールであれば、ツール固有の機能もありそうでしたが、ひとまずタグが一番オーソドックスな方法だと思ったので試してみました。

環境構築

一つのEC2にdbt CoreとDigdagをまとめてインストールし、Digdagのワークフローからインスタンスのローカルでdbt runを実行する形としました。

EC2のセットアップについては、冒頭の『Digdagからdbtを実行してAthenaにモデルを作成してみた』と同じため割愛します。

検証したバージョンとしては以下のようになります。

  • Python:3.11
  • dbt Core:1.5.6
  • dbt-athena-community:1.5.1
  • Digdag:0.10.5

dbtおよびdigdagの準備

dbtのモデルの設定

今回は1つのソーステーブルから、合わせて2つのモデルを作成するようにしました。データリネージは以下のようになります。

モデルの全体像

上のリネージではiris_avgをハイライトしていて分かるように、iris_avgiris_with_avgは順番はあれども別々の時間にずらして作成しても問題ないので、スケジュールを2つ設定して、2つのモデル実行が期待通りされるか確認することとしました。

データソースとモデルの定義は以下のようにしました。

models/raw_tables.yml

version: 2

sources:
  - name: raw_tables
    database: awsdatacatalog 
    schema: cm-nayuts-sample-db
    tables:
      - name: iris_raw

models/iris_avg.sql

{{
    config(
        tags=['hourly-1'],
    )
}}

select
    class,
    avg(sepal_length) as avg_sepal_length,
    avg(sepal_width) as avg_sepal_width,
    avg(petal_length) as avg_petal_length,
    avg(petal_width) as avg_petal_width
from {{ source('raw_tables','iris_raw') }}
group by class

models/iris_with_avg.sql

{{
    config(
        tags=['hourly-2'],
    )
}}

select
    iris_raw.class,
    iris_raw.sepal_length,
    iris_raw.sepal_width,
    iris_raw.petal_length,
    iris_raw.petal_width,
    iris_avg.avg_sepal_length,
    iris_avg.avg_sepal_width,
    iris_avg.avg_petal_length,
    iris_avg.avg_petal_width
from {{ source('raw_tables','iris_raw') }} as iris_raw
join {{ref('iris_avg')}} as iris_avg
on iris_raw.class = iris_avg.class

モデルの定義のポイントとしては、counfig()でタグの設定をした点です。今回はhourly-1およびhourly-2の2つのタグを準備しました。このタグごとにDigdagワークフローから--selectオプションを付けてモデルを実行しました。

なお、タグ名は検証なので適当ですが、実際の設計ではスケジュールの時間も多くのバリエーションが出てくると思うので、拡張性のある名前の付け方をした方がよさそうです。

Digdagワークフローの設定

以下のようにsample_projectという名前のDigdagプロジェクト配下に、hourly-1およびhourly-2のワークフローを作成しました。

pushしたワークフロー

ワークフローの定義は以下になります。

digdag_work/hourly-1.dig

timezone: Asia/Tokyo

schedule:
  daily>: 14:40:00

+dbt_run:
   sh>: dbt run --select tag:hourly-1 --project-dir test_project --profiles-dir dbt_profiles

digdag_work/hourly-2.dig

timezone: Asia/Tokyo

schedule:
    daily>: 14:45:00

+dbt_run:
   sh>: dbt run --select tag:hourly-2 --project-dir test_project --profiles-dir dbt_profiles

先にも記載したように、--selectオプションでタグを指定してdbt runを実行するようにしました。スケジュールはDigdagの機能を使っています。

digdag server -o ./digdag_test -O ./digdag_test -n 8001のようにサーバーを起動し、digdag push sample_project -e localhost:8001 --copy-outgoing-symlinksでpushしました。

出来上がった各々のワークフローは以下のようになりました。

ワークフロー1

ワークフロー2

実行結果の確認

スケジュール実行時間になるのを待って、ワークフローが実行されたことを確認しました。以下はhourly-2実行直後で、設定通り5分ずれて2つのワークフローが実行され、両方が成功したことを確認できました。

スケジュール実行結果

Athenaのクエリ履歴を確認すると、Digdagワークフローで設定した時間にSQLが実行されていることを確認できました。

Athenaのクエリ履歴

それぞれのワークフローのセッションの記録は以下のようになっていました。dbt実行時の標準出力もDigdagのUIから確認できました。

ワークフロー1実行結果1

ワークフロー1実行結果2

ワークフロー2実行結果1

ワークフロー2実行結果2

最後に

Digdagワークフローのスケジュールとdbtのタグを使って、モデル作成をスケジュール実行する例をご紹介しました。参考になりましたら幸いです。