Snowflakeでステージングしたファイルをdbt経由でロードしてみた #SnowflakeDB
※本エントリは、Snowflakeをもっと使いこなそう! Advent Calendar 2022の22日目の記事となります。
さがらです。
Snowflakeでステージングしたファイルをdbt経由でロードしてみたので、その内容をまとめてみます。
ステージングしたファイルをdbt経由でロードするメリット・デメリット
まず、なぜ「ステージングしたファイルをdbt経由でロード」することが良いのか、そのメリットを記します。
- 「データのロード➟ロードしたデータの加工」を一連のジョブとしてdbt側で実行できる
- 何かしらのワークフローツールが無い限り、「SnowflakeでのCOPYコマンドやSnowpipeを用いたロード」と「ロードしたデータをdbtで加工」はそれぞれ独立しているため、ロードを行った直後にdbtで加工ができない
- データロード~dbtによる加工に関するコードを、全てdbt上で一元管理できる
- dbt経由ならば、自然とGitによるバージョン管理も可能
一方で、下記のようなデメリットもあると思っているので、一概に全てdbt経由でステージングしたファイルをロードするのが正解とは思いません。ユースケースに応じて使い分けていきましょう!
COPY INTO
コマンドを用いたロードの場合は、対象のステージにエラーを起こすファイルがあったときの挙動をON_ERROR
オプションで変えることができるが、dbt経由でロードしてエラーを起こすファイルがあった場合、何もロード出来ずに終わってしまうON_ERROR
オプションについては、こちらの記事も併せてご覧ください。
- dbt経由ではSnowpipeやサーバーレスタスクなど、Snowflakeのサーバーレス機能を使用できないため、若干コストが高くなる可能性あり(2022/12/12時点)
- 2022/12/12時点で、Snowpipeのファイル数に関するコストは「0.06 credits per 1,000 files」のため、対象のファイル数によってはSnowpipeの方が通常のウェアハウスよりも割高になる可能性もあります。一概にコストが高くなると言えない点だけ注意が必要です
- dbt Cloudのデフォルト設定だと、各開発ユーザーのスキーマ、本番のスキーマ、それぞれに対象のデータがロードされてしまう
- 「ステージングされたファイルのロード先のテーブル」と「ロードされたテーブルを加工したテーブル」で、出力先となるデータベースやスキーマを変更する場合、Custom SchemasやCustom Databasesを使用しないといけない
- 2点目と3点目のスキーマ関係のデメリットについては、
generate_schema_name
のマクロ定義を書き換えないと思い通りの形で運用できない可能性が高いと思います。本記事ではgenerate_schema_name
について少しだけ書き換えて試しております。
- 2点目と3点目のスキーマ関係のデメリットについては、
おまけ:あれこの手法どこかで見た気が…
このステージングしたファイルをdbt経由でロードする方法ですが、ぺい(近森 淳平)氏が「みんなの考えた最強のデータアーキテクチャ」で発表されていた方法です!
この発表を見て、「こんな方法があるのか!」と私も感じたので、今回改めて自分でも検証しブログにさせて頂きました。
本イベントでの発表資料はぜひ下記からご覧ください。
また、本イベントについてのレポートブログも書いておりますので、併せてぜひこちらも…
試してみた
データロード用のスキーマの作成
まずはじめに、生データと加工データを区分けするためにも、dbtでデータロードを行うテーブルを格納するスキーマを作成しておきます。
use role sagara_admin_role; use database sagara_rawdata_db; create schema citibike_from_dbt;
ステージとファイルフォーマットの定義
続いて、事前にSnowflake上でステージとファイルフォーマットの定義をしておく必要があります。
ステージについては、ハンズオンでおなじみCitibikeのデータが格納されているAWSのS3のURLを指定します。実際の業務環境のS3ではStorage Integrationを作成した上でステージの定義をすると思いますので、その場合はこちらのブログを参考にしてください。
create stage citibike_trips url = 's3://snowflake-workshop-lab/citibike-trips-csv/';
ファイルフォーマットについては、Snowflake公式のQuickStartからの引用ですが、下記のクエリを実行して定義します。
create or replace file format csv type='csv' compression = 'auto' field_delimiter = ',' record_delimiter = '\n' skip_header = 0 field_optionally_enclosed_by = '\042' trim_space = false error_on_column_count_mismatch = false escape = 'none' escape_unenclosed_field = '\134' date_format = 'auto' timestamp_format = 'auto' null_if = ('') comment = 'file format for ingesting data for zero to snowflake';
ステージングされたファイルへのクエリサンプル
続いて、Snowflakeからステージングされたファイルに対してどのようにクエリを投げることができるのか、簡単に確認してみます。
内容としては、下記のようなクエリを実行すればOKです。@citibike_trips
が先程定義したS3を参照するステージで、どのようにデータをスキャンするかについても先程定義したファイルフォーマットをfile_format=
と言う形で指定しています。対象のバケットに様々な名称・拡張子のファイルがある場合には、pattern
で指定しましょう。
select t.$1 from @citibike_trips (file_format => 'CSV', pattern=>'.*csv.*') t;
generate_schema_nameマクロを変更
generate_schema_name
マクロは、デフォルトで定義されているマクロであり、dbtがスキーマを生成する時のロジックが記述されています。今回はデータロード用のスキーマを作成したため、そのスキーマにロードさせるようにマクロの内容を書き換えてみます。
※今回はあくまで検証のため非常に簡易的に変更しています。このコードをそのまま運用するとデータロードの用途以外でカスタムスキーマを設定した場合に、開発環境と本番環境、どの環境からビルドした際も同じスキーマにテーブルが作成されてしまいますのでご注意ください。
ここでは、通例に習ってget_custom_schema.sql
というファイルを対象のdbt projectのmacros
フォルダ直下に作成し、下記のように記述しgenerate_schema_name
マクロの定義を変更します。
{% macro generate_schema_name(custom_schema_name, node) -%} {%- set default_schema = target.schema -%} {%- if custom_schema_name is none -%} {{ default_schema }} {%- else -%} {{ custom_schema_name | trim }} {%- endif -%} {%- endmacro %}
カスタムスキーマについては非常に奥が深いので、カスタムスキーマを使用する場合にはぜひ事前に様々な検証を行って皆様の運用にマッチする形でご利用ください。(以下、参考になるリンクを載せておきます。)
dbtでロードしてみた
では、前述しましたステージングされたファイルに対するクエリをdbt上で実行して、ロードをしてみます!
まず、rawデータに関するmodelを定義するので、専用の新しいフォルダを作成しておきます。models/raw
として作成し、dbt_project.yml
も変更します。
# dbt_prokect.ymlの内容で変更点があるところだけ models: <project名>: raw: materialized: table
このあと、以下のクエリをmodels/raw
フォルダのtrips.sql
として記述します。ポイントは、ただSQLを記述するだけではなく、config
で使用するロール・ウェアハウス・データベース・スキーマの変更、テーブルを作成するデータベース・スキーマの指定まで行っているのがポイントです。
特に、ステージとファイルフォーマットを格納しているデータベース・スキーマを事前にuse
コマンドで指定しないと、対象のdbt projectのデフォルト値であるデータベースとスキーマに対してステージとファイルフォーマットを探しにいってしまい、エラーとなるため注意が必要です。
{{ config( pre_hook=[ "use role sagara_admin_role", "use warehouse sagara_dataload_wh", "use database sagara_rawdata_db", "use schema citibike_from_dbt" ], database="SAGARA_RAWDATA_DB", schema="CITIBIKE_FROM_DBT" ) }} select t.$1::integer as tripduraton, t.$2::timestamp as starttime, t.$3::timestamp as stoptime, t.$4::integer as start_station_id, t.$5::string as start_station_name, t.$6::float as start_station_latitude, t.$7::float as start_station_longitude, t.$8::integer as end_station_id, t.$9::string as end_station_name, t.$10::float as end_station_latitude, t.$11::float as end_station_longitude, t.$12::integer as bikeid, t.$13::string as membership_type, t.$14::string as usertype, t.$15::integer as birth_year, t.$16::integer as gender from @citibike_trips (file_format => 'CSV', pattern=>'.*csv.*') t
では、このmodelを実行してみます!サイズXSのシングルクラスタウェアハウスを使っていたので時間はかかりましたが、成功しました。
実際にSnowflakeの画面からも確認してみると、無事に指定したスキーマに対して、テーブルが作成されていることを確認できました!
考慮すべき点
上述の例では、都度対象のステージに格納されたファイルをフルスキャンしてしまいます。そのため、以下はほんの一例ですが都度フルスキャンしないように考慮する必要があります。
- データロード先を開発用と本番用でスキーマを分ける場合
- target = devなどで条件分岐を行う記述をJinjaにより実装し、WHERE句でスキャンするデータ量を変更する
- データロード先を開発本番問わず特定のスキーマだけにする場合
METADATA$FILENAME
などを使用し、Incremental modelを適用する
最後に
Snowflakeでステージングしたファイルをdbt経由でロードすることを試してみました。
考慮すべき点は色々ありますが、データロードから加工まで一貫して管理できるのは魅力的ですよね!ぜひユースケースに合致する場合には、ご活用ください。