Amazon Athenaに対してローカル環境からdbtを使ってみた

2023.10.06

こんちには。

データアナリティクス事業本部 インテグレーション部 機械学習チームの中村です。

今回はdbtをAthenaに対して、ローカルマシン環境から使ってみたいと思います。

なお本記事の環境構築や操作は以下の記事を参考にしております。併せてご覧ください。

AWS環境上での準備

最初に元となるテーブルを作成しておきます。

以下のように準備していきます。

  • S3にcsvをアップロード
  • S3のデータを元にAthenaでテーブルを作成

S3にcsvファイルをアップロード

Irisデータを使います。以下からダウンロードします。(Kaggleのアカウントが必要です)

ダウンロードしたcsvは以下のようになっています。

Id,SepalLengthCm,SepalWidthCm,PetalLengthCm,PetalWidthCm,Species
1,5.1,3.5,1.4,0.2,Iris-setosa
2,4.9,3.0,1.4,0.2,Iris-setosa
3,4.7,3.2,1.3,0.2,Iris-setosa
4,4.6,3.1,1.5,0.2,Iris-setosa

こちらをS3バケットを作成して以下のパスに配置します。(あくまで一例なのでご自由に設定してください)

  • s3://{任意の指定したバケット名}/dataset/iris/Iris.csv

Athenaでテーブルを作成

Athenaのクエリエディタで以下2つを実行します。(データベース名は任意に設定可能ですが以降の記述と合わせる必要があります)

CREATE DATABASE `cm_nakamura`
CREATE EXTERNAL TABLE `cm_nakamura`.`iris`(
  `id` string COMMENT 'from deserializer', 
  `sepallengthcm` string COMMENT 'from deserializer', 
  `sepalwidthcm` string COMMENT 'from deserializer', 
  `petallengthcm` string COMMENT 'from deserializer', 
  `petalwidthcm` string COMMENT 'from deserializer', 
  `species` string COMMENT 'from deserializer')
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.serde2.OpenCSVSerde' 
WITH SERDEPROPERTIES ( 
  'separatorChar'=',') 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://{任意の指定したバケット名}/dataset/iris/'
TBLPROPERTIES (
  'classification'='csv', 
  'skip.header.line.count'='1')

ここまででAWS環境上での準備ができました。この時点でAthenaは以下のようにirisというテーブルだけが作られている状態になります。

ローカルでの環境構築

dbtのインストール

基本的には以下に沿って、dbt環境を構築していきますが、Python環境の管理にRyeを使っていますので適宜読み替えていきます。ここはご自分のPython環境に従ってください。

まずは独立したPythonの仮想環境を準備します。

rye init sample-dbt-core

次にPython環境を指定して構築します。

cd sample-dbt-core
rye pin 3.10
rye sync

dbt-coreとdbt-athena-communityを入れます。

rye add dbt-core dbt-athena-community
rye sync

パッケージ名がdbt-athena-communityであることに注意されてください。以下がdbtの公式ドキュメントでのAthena向けのセットアップの記載となります。

インストールが終わったら仮想環境に入り、以下を実行して動作確認します。

rye shell
dbt --version

# Core:
#   - installed: 1.6.5
#   - latest:    1.6.5 - Up to date!
# 
# Plugins:
#   - athena: 1.6.2 - Ahead of latest version!

現時点でインストールされたdbt関連のバージョンは以下となります。

dbt-athena-community==1.6.2
dbt-core==1.6.5
dbt-extractor==0.4.1
dbt-semantic-interfaces==0.2.1

なお、今回はAthenaで行うためにdbt-athena-communityをインストールしましたが、それ以外にも様々なconnectionsが使用できるようになっています。

その他のconnectionについては以下を参照ください。

dbtプロジェクトの作成

ここまでで、dbt-coreのコマンドが実行できるようになっているため、dbtのプロジェクトを作成していきます。

dbt init sample_project

# 08:19:06  Running with dbt=1.6.5
# 08:19:07  
# Your new dbt project "sample_project" was created!
# 
# For more information on how to configure the profiles.yml file,
# please consult the dbt documentation here:
# 
#   https://docs.getdbt.com/docs/configure-your-profile
# 
# One more thing:
# 
# Need help? Don't hesitate to reach out to us via GitHub issues or on Slack:
# 
#   https://community.getdbt.com/
# 
# Happy modeling!
# 
# 08:19:07  Setting up your profile.
# Which database would you like to use?
# [1] athena
# 
# (Don't see the one you want? https://docs.getdbt.com/docs/available-adapters)
# 
# Enter a number: 1
# s3_staging_dir (S3 location to store Athena query results and metadata, e.g. s3://athena_query_result/prefix/): s3://{任意の指定したバケット名}/dbt/sample_project/athena_query_result
# s3_data_dir (S3 location where to store data/tables, e.g. s3://bucket_name/prefix/): s3://{任意の指定したバケット名}/dbt/sample_project/tables
# region_name (AWS region of your Athena instance): ap-northeast-1
# schema (Specify the schema (Athena database) to build models into (lowercase only)): cm_nakamura
# database (Specify the database (Data catalog) to build models into (lowercase only)) [awsdatacatalog]: 
# seed_s3_upload_args (Specify any extra arguments to use in the S3 Upload, e.g. ACL, SSEKMSKeyId): {}
# threads (1 or more) [1]:

Enter a number以降はインタラクティブな操作の結果となります。

この操作を終えると、sample_projectというフォルダが作成され、以下のようなファイルが配置されます。

sample_project
│
│  .gitignore
│  dbt_project.yml
│  README.md
│
├─analyses
│      .gitkeep
│
├─macros
│      .gitkeep
│
├─models
│  └─example
│          my_first_dbt_model.sql
│          my_second_dbt_model.sql
│          schema.yml
│
├─seeds
│      .gitkeep
│
├─snapshots
│      .gitkeep
│
└─tests
        .gitkeep

また、~/.dbt/profiles.ymlにもプロジェクト名で設定が格納されます。

sample_project:
  outputs:
    dev:
      database: awsdatacatalog
      region_name: ap-northeast-1
      s3_data_dir: s3://{任意の指定したバケット名}/dbt/sample_project/tables
      s3_staging_dir: s3://{任意の指定したバケット名}/dbt/sample_project/athena_query_result
      schema: cm_nakamura
      seed_s3_upload_args: '{}'
      threads: 1
      type: athena
  target: dev

このままではseed_s3_upload_argsの部分で動かなかったので、削除するか以下のように修正する必要がありました。

      seed_s3_upload_args: {}

profiles.ymlについて

profiles.ymlについては接続先の情報となっていて、以下に説明があります。

outputsは複数指定可能で、initの時点ではdevが作成されており、target: devでこのdevがデフォルトとして設定されているということになります。

具体的な設定値については、各connectionについて見る必要があります。dbt-athenaの場合、以下のGitHubのREADMEに情報が記載されています。

s3_staging_dirは、Athenaの「クエリの結果の場所」に該当します。

s3_data_dirはdbtでテーブルを作成した際にそのデータが格納される場所となります。

schemaはAthenaでCREATE DATABASEしたデータベースを入れておきます。

dbt_project.ymlについて

dbt_project.ymlに接続先以外のプロジェクトの設定が記載されています。ここはコメントで丁寧に説明が記載されています。

# Name your project! Project names should contain only lowercase characters
# and underscores. A good package name should reflect your organization's
# name or the intended use of these models
name: 'sample_project'
version: '1.0.0'
config-version: 2

# This setting configures which "profile" dbt uses for this project.
profile: 'sample_project'

# These configurations specify where dbt should look for different types of files.
# The `model-paths` config, for example, states that models in this project can be
# found in the "models/" directory. You probably won't need to change these!
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]

clean-targets:         # directories to be removed by `dbt clean`
  - "target"
  - "dbt_packages"


# Configuring models
# Full documentation: https://docs.getdbt.com/docs/configuring-models

# In this example config, we tell dbt to build all models in the example/
# directory as views. These settings can be overridden in the individual model
# files using the `{{ config(...) }}` macro.
models:
  sample_project:
    # Config indicated by + and applies to all files under models/example/
    example:
      +materialized: view

今回は特にmodelsに着目します。ここの説明は以下に記載があります。

詳細は上記を参照して頂くとして、+で始まる項目が様々な設定値となっています。

設定値は、models配下に格納されているsqlファイル毎にフォルダレベルで設定を変更していくことが可能で、深い階層側が優先されているようです。

init直後は、models配下は以下のようになっています。

├─models
│  └─example
│          my_first_dbt_model.sql
│          my_second_dbt_model.sql
│          schema.yml

schema.ymlはモデルをテストする場合のルールが記載できるようです。今回は詳細を割愛します。

メインはsqlファイルによるモデル定義となっており、dbt_project.ymlの設定がこれらのsqlファイルに影響を与えます。

具体的にはYAML構造上は以下のように「models」→「プロジェクト名(今回はsample_project)」→「フォルダ上のmodels配下のサブフォルダ」、という階層構造となっており、実際のフォルダ構成と比較すると、YAMLでは間にプロジェクト名が入ることに注意が必要そうです。

models:
  sample_project:
    # Config indicated by + and applies to all files under models/example/
    example:
      +materialized: view

つまり上記の記述では、models/example配下のsqlファイルに対して、+materialized: viewが設定されるということとなります。

モデル定義の編集

まず今回は、initで作成されたフォルダ構造を使用せずにdbt_project.ymlを以下のようにしておきます。

models:
  sample_project:
    +materialized: view

そしてmodels/配下を削除し、models/iris_dbt.sqlというファイルを作成して以下を記述します。

with iris_dbt as (
    select
        id,
        sepallengthcm,
        sepalwidthcm,
        petallengthcm,
        petalwidthcm,
        species
    from iris
)
select * from iris_dbt

実行してみる

dbt runで実行してみます。この際、AWS環境への権限があるプロファイルを有効にしておいてください。

(こちらはそれぞれの環境次第ですが通常AWS CLIを使う際の設定がされていればOKです)

私はaws-vaultで構築しているので、以下のように実行します。

aws-vault exec {AWSプロファイル名} -- dbt run

# 09:55:13  Running with dbt=1.6.5
# 09:55:14  Registered adapter: athena=1.6.2
# 09:55:14  Unable to do partial parsing because a project config has changed
# 09:55:14  Found 1 model, 0 sources, 0 exposures, 0 metrics, 378 macros, 0 groups, 0 semantic models
# 09:55:14  
# 09:55:16  Concurrency: 1 threads (target='dev')
# 09:55:16  
# 09:55:16  1 of 1 START sql view model cm_nakamura.iris_dbt ............................... [RUN]
# 09:55:18  1 of 1 OK created sql view model cm_nakamura.iris_dbt .......................... [OK -1 in 1.64s]
# 09:55:18  
# 09:55:18  Finished running 1 view model in 0 hours 0 minutes and 3.41 seconds (3.41s).
# 09:55:18  
# 09:55:18  Completed successfully
# 09:55:18  
# 09:55:18  Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1

マネコンの画面でビューが作成されていることが確認できました。

ビューではなくテーブルとしたい場合は、前述のdbt_project.ymlの設定を変えても良いのですが、sqlファイルの先頭に以下を入れることによって、個別に変更することも可能です。

{{ config(materialized='table') }}

-- 以降略

詳しくは、以下の前述のリンクの「Config block」の時の記載方法をご確認ください。

ソースの明示的な指定

またソースのテーブルを明示的に指定することもできます。

models/sources.ymlに以下のような内容で記載します。

version: 2

sources:
  - name: cm_nakamura
    database: awsdatacatalog
    schema: cm_nakamura
    tables:
      - name: iris

そしてsqlファイルは以下のように修正します。

with iris_dbt as (
    select
        id,
        sepallengthcm,
        sepalwidthcm,
        petallengthcm,
        petalwidthcm,
        species
    from {{ source('cm_nakamura','iris') }}
)
select * from iris_dbt

こちらでもdbt runができることを確認しました。

aws-vault exec {AWSプロファイル名} -- dbt run

外部のデータソース等を使用する場合は、models/sources.ymlの記載を工夫することで実現するようです。

まとめ

いかがでしたでしょうか。

今回はdbtを使ってAWS環境上のAthenaテーブルを操作してみました。

本記事がdbt-coreを使い始める方のご参考になれば幸いです。