dbt CloudでAmazon Athenaがサポートされたので使ってみた

dbt CloudでAmazon Athenaがサポートされたので使ってみた

ついにSaaS版のdbt CloudでAmazon Athenaのアダプタが利用できるようになりました
Clock Icon2025.02.07

データ事業本部インテグレーション部機械学習チーム・新納(にいの)です。

これまでdbt-AthenaはオープンソースのPythonパッケージであるdbt Coreでしか利用できませんでしたが、2024年11月にSaaS版であるdbt Cloudでもサポートされました。このアップデートにより、dbt Cloudのマネージドな環境での変換処理やドキュメントのホスティングができるようになります。

https://docs.getdbt.com/docs/cloud/connect-data-platform/connect-amazon-athena
https://aws.amazon.com/jp/blogs/big-data/from-data-lakes-to-insights-dbt-adapter-for-amazon-athena-now-supported-in-dbt-cloud/

本記事ではdbt CloudでAthenaを使ったデータ変換を実際に試してみます。

利用するデータとシナリオ

以下のクイックスタートに沿って、データやモデルなどをアレンジしながら進めます。

https://docs.getdbt.com/guides/athena?step=1

データはクイックスタートのStep2からダウンロードできる以下のファイルを利用します。

  • jaffle_shop_orders.csv(注文履歴)

    ID,USER_ID,ORDER_DATE,STATUS
    1,1,2018-01-01,returned
    2,3,2018-01-02,completed
    3,94,2018-01-04,completed
    ...
    
  • jaffle_shop_customers.csv(顧客情報)

    ID,FIRST_NAME,LAST_NAME
    1,Michael,P.
    2,Shawn,M.
    3,Kathleen,P.
    ...
    

クイックスタートではHive形式のテーブルを使用していますが、マージ機能を試すためIcebergテーブルとして作成します。利用したSQLは以下の通りです。まず上述のCSVファイルからHIVE形式のテーブルを作成し、CTASを使ってIcebergテーブルを作成します。

-- 1. CSVファイルを外部テーブルとして作成(orders)
CREATE EXTERNAL TABLE raw_orders (
    id STRING,
    user_id STRING,
    order_date DATE,
    status STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION 's3://<CSVファイルが置かれたS3ロケーションURI>/'
TBLPROPERTIES ('skip.header.line.count'='1');

-- 2. CTASを使用してIcebergテーブルを作成(orders)
CREATE TABLE orders
WITH (
    table_type = 'ICEBERG',
    format = 'parquet',
    write_compression = 'snappy',
    partitioning = ARRAY['order_date'],
    is_external=false,
    location='s3://<任意のS3ロケーションURI>/'
)
AS SELECT * FROM raw_orders;

-- 1. CSVファイルを外部テーブルとして作成(customers)
CREATE EXTERNAL TABLE raw_customers (
    id STRING,
    first_name STRING,
    last_name STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION 's3://<CSVファイルが置かれたS3ロケーションURI>/'
TBLPROPERTIES ('skip.header.line.count'='1');

-- 2. CTASを使用してIcebergテーブルを作成(customers)
CREATE TABLE customers
WITH (
    table_type = 'ICEBERG',
    format = 'parquet',
    write_compression = 'snappy',
    is_external=false,
    location='s3://<任意のS3ロケーションURI>/'
)
AS SELECT * FROM raw_customers;

上記のテーブルを使って、顧客データと注文データを組み合わせた顧客ごとの注文サマリーを作成します。データが更新されたらデータをmergeで更新していくことを想定しています。

dbtcloudathena-overview

dbt CloudからAthenaへ接続する

dbt Cloudへログインし、Account Settingsに移動します。

20250205_dbtcloud_acsettings

Projectsメニューから「+New Project」で任意の名前の新しいプロジェクトを作成します。ConnectionにはAthenaを指定し、必要情報を入力します。テスト接続も可能です。

項目
AWS access key ID AWSリソースにアクセスするためのアクセスキー(今回はAmazonAthenaFullPolicyAmazonS3FullAccessAWSGlueServiceRoleポリシーを指定したIAMユーザーで検証)
AWS secret access key AWSリソースにアクセスするためのシークレットキー
schema dbtで作成したテーブルを配置するAthenaのデータベース名
Threads dbtが実行する際の並列処理のスレッド数(任意)

20250205_dbtcloud_athena-setup

リポジトリをセットアップします。今回は検証のため、dbt Cloudのマネージドのリポジトリを使用しました。

20250205_dbtcloud_athena-setup2

これでAthenaへの接続が確立されました。

20250205_dbtcloud_projectisready

モデルを作成する

上述の画面から「Start developing in the IDE」を選ぶとコードエディタが表示されます。最初はInitialize dbt projectでプロジェクトを初期化し、コミットする必要があります。

まずはmodel配下に新しいファイルを作成しましょう。modelディレクトリの3点リーダからCreate fileを選択し、sqlファイルを作成します。

20250205_dbtcloud_athena_newmodel

ゴールはAthenaで事前に作っておいたcustomersテーブルとordersテーブルを結合し、新たなテーブルを作成することです。sqlファイルを2つ作成し、customersとordersから必要なカラムをSELECTするクエリを記載します。

stg_customers.sql
select
    id as customer_id,
    first_name,
    last_name
from default.customers
stg_orders.sql
select
    id as order_id,
    user_id as customer_id,
    order_date,
    status
from default.orders

上記でSELECTしたデータを結合するためのクエリを別のsqlファイルで作成します。stg_customersstg_ordersは上述で作成したものを参照するように記載しています。

customers.sql
{{
 config(
        materialized='incremental',
        table_type='iceberg',
        unique_key='customer_id',
        incremental_strategy='merge'
    )
}}

with stg_customers as (
    select * from {{ ref('stg_customers') }}
),
stg_orders as (
    select * from {{ ref('stg_orders') }}
),
customer_orders as (
    select
        customer_id,
        min(order_date) as first_order_date,
        max(order_date) as most_recent_order_date,
        count(order_id) as number_of_orders
    from stg_orders
    group by 1
),
final as (
    select
        customer_id,
        first_name,
        last_name,
        first_order_date,
        most_recent_order_date,
        coalesce(number_of_orders, 0) as number_of_orders

    from stg_customers
    left join customer_orders using (customer_id)
)
select * from final

先頭のconfig部分は以下の意味を持ちます。

  • Icebergテーブルとして作成
  • 新規または変更されたデータのみを処理するincremental更新を採用
  • customer_idをユニークキーとして使用
{{
 config(
        materialized='incremental',
        table_type='iceberg',
        unique_key='customer_id',
        incremental_strategy='merge'
    )
}}

モデルの実行をする

実行前にテストする

dbtには、データソースや変換処理後のデータが意図した通りになっているかを検証する機能があります。models配下にschema.ymlを作成し、それぞれのスキーマを定義しておきます。

今回はクイックスタートに記載されているyamlファイルをそのまま利用します。ポイントは以下の通り。

  • customersテーブル

    • customer_id: 一意、NOT NULL

    • first_order_date: 初回注文日(NULL許容)

  • stg_customersテーブル

    • customer_id: 一意、NOT NULL
  • stg_ordersテーブル

    • order_id: 一意、NOT NULL

    • status: 5つの値のみ許容(placed, shipped, completed, return_pending, returned)

    • customer_id: NOT NULL、stg_customersテーブルに存在する値のみ許容

schema.yml
version: 2

models:
  - name: customers
    description: One record per customer
    columns:
      - name: customer_id
        description: Primary key
        tests:
          - unique
          - not_null
      - name: first_order_date
        description: NULL when a customer has not yet placed an order.

  - name: stg_customers
    description: This model cleans up customer data
    columns:
      - name: customer_id
        description: Primary key
        tests:
          - unique
          - not_null

  - name: stg_orders
    description: This model cleans up order data
    columns:
      - name: order_id
        description: Primary key
        tests:
          - unique
          - not_null
      - name: status
        tests:
          - accepted_values:
              values: ['placed', 'shipped', 'completed', 'return_pending', 'returned']
      - name: customer_id
        tests:
          - not_null
          - relationships:
              to: ref('stg_customers')
              field: customer_id

dbt Cloudでは画面下部にコマンドを入力する箇所があります。ここでdbt testコマンドを実行することでテスト可能です。

20250205_dbtcloud_dbttest

モデルを実行してテーブルを作成する

テスト結果に問題がないことが確認できたら今度は作成したモデルを実行してみましょう。dbt runを実行します。

20250205_dbtcloud_dbtrun

Athenaを確認すると、作成したモデル3つのテーブルが確認できました。

20250205_dbtcloud_athenaresult

2回目の実行では、Query Historyでmerge文が実行されていることが確認できます。

20250205_dbtcloud_athenamerge

ドキュメントを生成する

dbtでは、作成したモデルやデータソースの情報を元に自動でドキュメントを生成する機能があります。データカタログのように利用できて非常に便利ですので、生成しておきましょう。

方法はdbt docs generateを実行するだけです。

20250205_dbtcloud_generatedbtdoc

ブランチ名の隣にある本のマークをクリックするとドキュメントへアクセス可能です。

20250205_dbtcloud_dbtdocs1

アクセスすると各データのカラムやリネージが確認できるようになりました。

dbt Coreと違って自分でどこにホスティングするかを考える必要がなく、dbt Cloudでホスティングしてくれるので運用もとても楽ですね。

20250205_dbtcloud_dbtdocs2

あとは変更内容をコミット・デプロイし、スケジュールされたジョブをdbt Cloud上で作成します。この部分はどのデータソースでも変わらない操作となりますので、以下のブログを参照してください。

https://dev.classmethod.jp/articles/quickstart-for-dbt-cloud-and-bigquery/#toc-14-

最後に

dbt CloudでついにAthenaのアダプタが利用できるようになったのでクイックスタートを元に利用してみました。これまではdbt Coreでしか使えなかったため、ECSとStep FunctionsやEC2などを組み合わせて実行するなどの工夫が必要でした。dbt Cloudだとホスティングやジョブの実行も全てマネージド環境で行えるので、考えることが減ってとても便利です。また、Icebergテーブルと組み合わせることでデータ更新にmergeを利用でき、コストを抑えつつもデータウェアハウスのように扱える利点もあります。

dbt Cloudは14日間のフリートライアルや、条件付きではありますが無料で使えるDeveloperプランがあります。

https://www.getdbt.com/pricing

dbt CoreでAthenaを利用する際のベストプラクティスについては以下をご参照ください。

https://dev.classmethod.jp/articles/dbt-core-dbt-athena-aws-dbt-knowledge-share/

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.