SnowflakeのSnowpipeとDynamic Tableで重複削除・差分更新を行うデータパイプラインを構築してみた
さがらです。
SnowflakeのSnowpipeとDynamic Tableで重複削除・差分更新を行うデータパイプラインを構築してみたので、その内容をまとめてみます。
※先日執筆した以下のブログの内容を、SnowpipeとDynamic Tableに置き換えて実装した場合にどう変わるかを試してみたブログとなります。タスク自体の説明はこちらの記事を参考にして頂ければと思います。
SnowflakeのSnowpipeとは
SnowflakeのSnowpipeについて簡単に説明します。
- 概要
- S3に新しいデータが置かれたというイベント通知をトリガーに、COPY INTOコマンドを実行できる機能
- 設定方法
- create pipeコマンドで作成
- S3バケット等でイベント通知の設定を行う(テーブル1つごとに必要)
- 使用するコンピュートリソース
- Snowflake管理のリソースを利用し、実行時間の分だけ、コストが発生する
- 加えて、処理するファイル数に応じての課金も発生する ※0.06 Credits per 1000 files
- エラー通知
- ERROR_INTEGRATIONパラメータでAmazon SNSと連携した通知が可能
また、Snowpipeに関しては以下の記事も非常に参考になりますので、併せてぜひご覧ください。
SnowflakeのDynamic Tableとは
SnowflakeのDynamic Tableについて簡単に説明します。
- 概要
- SELECT文とTARGET_LAGの値を指定するだけで、自動で更新を行ってくれるテーブルを実装できる機能
- 更新方法は全件更新(Full Refresh)か増分更新(Incremental Refresh)
- ただし、増分更新が行えるのはこのドキュメントの条件に当てはまるクエリのみ。一般的なJOIN・GROUP BYなどはサポートしている
- 使用するコンピュートリソース
- ユーザー側で作成したウェアハウス
- エラー通知
- イベントテーブルとアラートを用いた設定が可能
Dynamic Table利用時の注意点
Dynamic Tableは非常に便利な機能なのですが、以下の点に注意が必要です。
- UDF、外部関数、PIVOT関数・UNPIVOT関数、などがDynamic Tableの定義時にあるとエラーになる
- current_timestampやCortex LLM Functionなど、全件更新(Full Refresh)でないと利用できないクエリがある
- Dynamic Tableの定義時にFROM句に指定したテーブルのChange Trackingが有効化される(そのため、FROM句に指定したテーブルの
DATA_RETENTION_TIME_IN_DAYS
パラメータを0に設定することはできない) - Dynamic Tableの定義時にFROM句に指定したテーブルの
MAX_DATA_EXTENSION_TIME_IN_DAYS
の期間内に更新されない場合、古くなったDynamic Tableの更新を再開するためにDynamic Tableの再作成が必要となる可能性がある ※MAX_DATA_EXTENSION_TIME_IN_DAYS
のデフォルト値は14日(参考Docs) - 1つのSnowflakeアカウントで50000個までDynamic Tableを定義可能
他にも様々な制約がありますので、他の制約について知りたい場合は以下のドキュメントをご覧ください。
パイプラインの構築
では早速、SnowpipeとDynamic Tableでパイプライン構築をしていきます。
今回はあくまで検証のため、全てSYSADMIN
ロールで実行します。
前提条件
以下の3層構成でパイプラインを作成します。
- RAW層:ステージからロードしたデータをそのままの形で保持するレイヤー
- STG層:RAW層でロードしたデータでは主キーごとに重複が発生するため、その重複を削除するレイヤー
- MART層:STG層で重複削除を行ったデータを元に、エンドユーザーが利用するデータを管理するレイヤー
事前準備:Storage Integrationの作成
事前にS3バケットに対するStorage Integrationを作成しておきます。今回はsagara_s3_int
という名称で作成しました。
作成の際は、下記の公式Docsが参考になると思います。
必要なデータベース・スキーマ・ウェアハウスの作成
-- データベースの作成
create database sagara_test_db;
-- スキーマの作成
use database sagara_test_db;
create schema raw;
create schema stg;
create schema mart;
-- ウェアハウスの作成
create warehouse if not exists compute_wh
with
warehouse_size = 'xs'
auto_suspend = 60
auto_resume = true;
外部ステージとファイルフォーマットの作成
-- 名前付き外部ステージの作成
create or replace stage sagara_test_db.raw.raw_stage
url = 's3://your-bucket-name/path/'
storage_integration = sagara_s3_int;
-- 日本語データ用のCSVファイルフォーマットの作成(メタデータ対応)
create or replace file format sagara_test_db.raw.csv_format_with_metadata
type = 'csv'
parse_header = true
error_on_column_count_mismatch = false
field_delimiter = ','
encoding = 'UTF8';
RAW層のテーブル定義
-- RAW層のテーブル作成(メタデータカラム追加)
create or replace table sagara_test_db.raw.raw_customer (
"顧客ID" int,
"氏名" varchar(100),
"年齢" int,
"住所" varchar(200),
"登録日" date,
"メールアドレス" varchar(100),
"メタデータ_ファイル名" varchar(255),
"メタデータ_ファイル更新日時" timestamp_ntz,
"メタデータ_ファイルロード日時" timestamp_ntz
);
create or replace table sagara_test_db.raw.raw_product (
"商品ID" int,
"商品名" varchar(100),
"カテゴリ" varchar(50),
"単価" decimal(10,2),
"在庫数" int,
"メーカー" varchar(100),
"メタデータ_ファイル名" varchar(255),
"メタデータ_ファイル更新日時" timestamp_ntz,
"メタデータ_ファイルロード日時" timestamp_ntz
);
create or replace table sagara_test_db.raw.raw_purchase (
"購入明細ID" int,
"顧客ID" int,
"商品ID" int,
"購入日" date,
"数量" int,
"購入金額" decimal(10,2),
"支払方法" varchar(50),
"配送状況" varchar(50),
"メタデータ_ファイル名" varchar(255),
"メタデータ_ファイル更新日時" timestamp_ntz,
"メタデータ_ファイルロード日時" timestamp_ntz
);
RAW層へのデータロード用Snowpipeの定義
-- 顧客データ用のSnowpipe
create or replace pipe sagara_test_db.raw.pipe_raw_customer
auto_ingest = true
as
copy into sagara_test_db.raw.raw_customer
from @sagara_test_db.raw.raw_stage/customer
file_format = sagara_test_db.raw.csv_format_with_metadata
match_by_column_name = case_insensitive
include_metadata = (
"メタデータ_ファイル名" = metadata$filename,
"メタデータ_ファイル更新日時" = metadata$file_last_modified,
"メタデータ_ファイルロード日時" = metadata$start_scan_time
);
-- 商品データ用のSnowpipe
create or replace pipe sagara_test_db.raw.pipe_raw_product
auto_ingest = true
as
copy into sagara_test_db.raw.raw_product
from @sagara_test_db.raw.raw_stage/product
file_format = sagara_test_db.raw.csv_format_with_metadata
match_by_column_name = case_insensitive
include_metadata = (
"メタデータ_ファイル名" = metadata$filename,
"メタデータ_ファイル更新日時" = metadata$file_last_modified,
"メタデータ_ファイルロード日時" = metadata$start_scan_time
);
-- 購入データ用のSnowpipe
create or replace pipe sagara_test_db.raw.pipe_raw_purchase
auto_ingest = true
as
copy into sagara_test_db.raw.raw_purchase
from @sagara_test_db.raw.raw_stage/purchase
file_format = sagara_test_db.raw.csv_format_with_metadata
match_by_column_name = case_insensitive
include_metadata = (
"メタデータ_ファイル名" = metadata$filename,
"メタデータ_ファイル更新日時" = metadata$file_last_modified,
"メタデータ_ファイルロード日時" = metadata$start_scan_time
);
この設定をした後に、各pipeに対して以下のクエリで出力されるnotificationChannelName
からSQSキューのARNを確認し、下記のドキュメントに沿ってS3イベント通知を設定します。
-- 各Snowpipeの通知チャネルARNを取得(外部ストレージ連携用)
select system$pipe_status('sagara_test_db.raw.pipe_raw_customer');
select system$pipe_status('sagara_test_db.raw.pipe_raw_product');
select system$pipe_status('sagara_test_db.raw.pipe_raw_purchase');
STG層のテーブル定義 (Dynamic Table)
STG層のDynamic Tableを定義します。
target_lag = 'downstream'
とすることで、MART層で設定したtarget_lag
に応じて自動更新が行われるようになります。
refresh_mode = auto
としていますが、このSTG層のクエリは増分更新が可能なクエリのため、増分更新で更新が行われます。
-- STG層のDynamic Table作成
create or replace dynamic table sagara_test_db.stg.stg_customer
target_lag = 'downstream'
warehouse = compute_wh
refresh_mode = auto
as
select
"顧客ID",
"氏名",
"年齢",
"住所",
"登録日",
"メールアドレス",
"メタデータ_ファイル名",
"メタデータ_ファイル更新日時",
"メタデータ_ファイルロード日時"
from (
select
"顧客ID",
"氏名",
"年齢",
"住所",
"登録日",
"メールアドレス",
"メタデータ_ファイル名",
"メタデータ_ファイル更新日時",
"メタデータ_ファイルロード日時",
row_number() over (partition by "顧客ID" order by "メタデータ_ファイルロード日時" desc) as rn
from sagara_test_db.raw.raw_customer
)
where rn = 1;
create or replace dynamic table sagara_test_db.stg.stg_product
target_lag = 'downstream'
warehouse = compute_wh
refresh_mode = auto
as
select
"商品ID",
"商品名",
"カテゴリ",
"単価",
"在庫数",
"メーカー",
"メタデータ_ファイル名",
"メタデータ_ファイル更新日時",
"メタデータ_ファイルロード日時"
from (
select
"商品ID",
"商品名",
"カテゴリ",
"単価",
"在庫数",
"メーカー",
"メタデータ_ファイル名",
"メタデータ_ファイル更新日時",
"メタデータ_ファイルロード日時",
row_number() over (partition by "商品ID" order by "メタデータ_ファイルロード日時" desc) as rn
from sagara_test_db.raw.raw_product
)
where rn = 1;
create or replace dynamic table sagara_test_db.stg.stg_purchase
target_lag = 'downstream'
warehouse = compute_wh
refresh_mode = auto
as
select
"購入明細ID",
"顧客ID",
"商品ID",
"購入日",
"数量",
"購入金額",
"支払方法",
"配送状況",
"メタデータ_ファイル名",
"メタデータ_ファイル更新日時",
"メタデータ_ファイルロード日時"
from (
select
"購入明細ID",
"顧客ID",
"商品ID",
"購入日",
"数量",
"購入金額",
"支払方法",
"配送状況",
"メタデータ_ファイル名",
"メタデータ_ファイル更新日時",
"メタデータ_ファイルロード日時",
row_number() over (partition by "購入明細ID" order by "メタデータ_ファイルロード日時" desc) as rn
from sagara_test_db.raw.raw_purchase
)
where rn = 1;
実際に作成したテーブルを見ると、refresh_mode = auto
としていましたがRefresh Mode
がIncremental
と表示されていました。
MART層のテーブル定義 (Dynamic Table)
MART層のDynamic Tableを定義します。
MART層ではtarget_lag = '5 minutes'
のように、各テーブルごとに何分以内に更新されてほしいかを定義しておきます。
refresh_mode = auto
としていますが、公式Docsにも記載があるcurrent_timestamp
関数を使用しているので、全件更新のみ対応となります。
MART層のクエリにおいては、クエリの内容によって増分更新と全件更新、どちらが使えるかはクエリの内容次第になると思います。以下の公式Docsも参考に、各テーブルがどの更新方法となるか確認することをおすすめします。
-- MART層のDynamic Table作成
create or replace dynamic table sagara_test_db.mart.mart_customer_purchase
target_lag = '5 minutes'
warehouse = compute_wh
refresh_mode = auto
as
with customer_purchase_summary as (
select
c."顧客ID",
c."氏名",
c."年齢",
c."住所",
count(p."購入明細ID") as "購入回数",
sum(p."購入金額") as "合計購入金額",
max(p."購入日") as "最終購入日"
from sagara_test_db.stg.stg_customer c
left join sagara_test_db.stg.stg_purchase p on c."顧客ID" = p."顧客ID"
group by c."顧客ID", c."氏名", c."年齢", c."住所"
),
category_frequency as (
select
p."顧客ID",
prod."カテゴリ",
count(*) as category_count,
row_number() over (partition by p."顧客ID" order by count(*) desc) as rn
from sagara_test_db.stg.stg_purchase p
join sagara_test_db.stg.stg_product prod on p."商品ID" = prod."商品ID"
group by p."顧客ID", prod."カテゴリ"
),
payment_frequency as (
select
p."顧客ID",
p."支払方法",
count(*) as payment_count,
row_number() over (partition by p."顧客ID" order by count(*) desc) as rn
from sagara_test_db.stg.stg_purchase p
group by p."顧客ID", p."支払方法"
)
select
cps."顧客ID",
cps."氏名",
cps."年齢",
cps."住所",
cps."購入回数",
cps."合計購入金額",
cps."最終購入日",
cf."カテゴリ" as "最頻購入カテゴリ",
pf."支払方法" as "最頻支払方法",
current_timestamp() as "メタデータ_レコード更新日時"
from customer_purchase_summary cps
left join category_frequency cf on cps."顧客ID" = cf."顧客ID" and cf.rn = 1
left join payment_frequency pf on cps."顧客ID" = pf."顧客ID" and pf.rn = 1;
create or replace dynamic table sagara_test_db.mart.mart_product_sales
target_lag = '5 minutes'
warehouse = compute_wh
refresh_mode = auto
as
select
p."商品ID",
p."商品名",
p."カテゴリ",
p."メーカー",
sum(pur."数量") as "販売数量",
sum(pur."購入金額") as "販売金額",
count(distinct pur."顧客ID") as "購入顧客数",
avg(c."年齢") as "平均購入年齢",
current_timestamp() as "メタデータ_レコード更新日時"
from sagara_test_db.stg.stg_product p
left join sagara_test_db.stg.stg_purchase pur on p."商品ID" = pur."商品ID"
left join sagara_test_db.stg.stg_customer c on pur."顧客ID" = c."顧客ID"
group by p."商品ID", p."商品名", p."カテゴリ", p."メーカー";
create or replace dynamic table sagara_test_db.mart.mart_sales_summary
target_lag = '5 minutes'
warehouse = compute_wh
refresh_mode = auto
as
with current_month_sales as (
select
date_trunc('month', p."購入日") as month_date,
prod."カテゴリ",
sum(p."数量") as total_quantity,
sum(p."購入金額") as total_amount,
count(distinct p."顧客ID") as customer_count
from sagara_test_db.stg.stg_purchase p
join sagara_test_db.stg.stg_product prod on p."商品ID" = prod."商品ID"
group by month_date, prod."カテゴリ"
),
previous_month_sales as (
select
dateadd('month', 1, month_date) as next_month,
"カテゴリ",
total_amount as prev_amount
from current_month_sales
)
select
cms.month_date as "集計日",
cms."カテゴリ",
cms.total_quantity as "販売数量",
cms.total_amount as "販売金額",
cms.customer_count as "購入顧客数",
case
when pms.prev_amount is null or pms.prev_amount = 0 then null
else (cms.total_amount - pms.prev_amount) / pms.prev_amount * 100
end as "前月比",
current_timestamp() as "メタデータ_レコード更新日時"
from current_month_sales cms
left join previous_month_sales pms on cms.month_date = pms.next_month and cms."カテゴリ" = pms."カテゴリ";
実際に作成したテーブルを見ると、refresh_mode = auto
としていましたがRefresh Mode
がFull
と表示されていました。
参考:このクエリにより作成されるGraphタブ
Dynamic Tableを作成すると、Graph
タブから各テーブルの依存関係を可視化してみることが出来ます。
以下はSTG_CUSTOMER
テーブルからGraph
タブを見た際のスクリーンショットです。
CSVファイルを1つずつ格納してパイプラインの動作確認
投入するデータ
作成した外部ステージと紐づくS3バケットに対して、CSVファイルを各テーブルごとに1つずつ格納します。
- customer_data_20230601.csv(raw_customerへロード)
顧客ID,氏名,年齢,住所,登録日,メールアドレス
1,田中太郎,35,東京都新宿区,2023-01-01,tanaka@example.com
2,佐藤花子,28,大阪府大阪市,2023-01-15,sato@example.com
3,鈴木一郎,42,北海道札幌市,2023-01-20,suzuki@example.com
4,高橋みどり,31,福岡県福岡市,2023-02-01,takahashi@example.com
5,伊藤健太,39,愛知県名古屋市,2023-02-10,ito@example.com
- product_data_20230601.csv(raw_productへロード)
商品ID,商品名,カテゴリ,単価,在庫数,メーカー
101,高級腕時計,アクセサリー,85000.00,12,スイス時計
102,スマートフォン,電子機器,65000.00,35,テックコーポ
103,ノートパソコン,電子機器,120000.00,8,コンピュータ社
104,ダイニングテーブル,家具,45000.00,5,木工房
105,ゴルフセット,スポーツ,78000.00,15,スポーツギア
- purchase_data_20230601.csv(raw_purchaseへロード)
購入明細ID,顧客ID,商品ID,購入日,数量,購入金額,支払方法,配送状況
1001,1,102,2023-01-15,1,65000.00,クレジットカード,配送済み
1002,2,107,2023-02-20,1,25000.00,銀行振込,配送済み
1003,3,101,2023-01-30,1,85000.00,クレジットカード,配送済み
1004,4,106,2023-03-05,1,55000.00,電子マネー,配送中
1005,5,104,2023-02-10,1,45000.00,クレジットカード,配送済み
データの確認
今回はMART層でtarget_lag = '5 minutes'
としていたので、5分ほど待った後にデータがどうなったかを確認してみます。
以下、RAWとSTGレイヤーでテーブルを1つ、MARTレイヤーでは3つ全て、テーブルの中身を確認してみます。※生成AIで作ったデータファイルのため、まだ商品IDが登録されていないデータも存在しますが、後述の2回目以降の処理時に更新されます。
RAW_CUSTOMER
テーブル
STG_CUSTOMER
テーブル
MART_CUSTOMER_PURCHASE
テーブル
MART_PRODUCT_SALES
テーブル
MART_SALES_SUMMARY
テーブル
customerに対してのみ、CSVファイルを2つ格納してパイプラインの動作確認
ここから追加のデータを入れたときの挙動確認をするのですが、まずはcustomerに関するデータだけを入れたときに、MART層の更新が行われるのかを確認してみます。
投入するデータ
作成した外部ステージと紐づくS3バケットに対して、CSVファイルを2つ格納します。
これらのCSVファイルでは、以下のように各IDごとにデータのアップデートが行われています。
-
customerデータの重複と変更点
- 顧客ID 3: 年齢が42から43に更新され、メールアドレスも変更
- 顧客ID 5: 年齢が39から40に更新され、住所が詳細化され、メールアドレスも変更
- 顧客ID 4, 6, 7: 複数のファイルに同じデータが存在
-
customer_data_20230701.csv(raw_customerへロード)
顧客ID,氏名,年齢,住所,登録日,メールアドレス
3,鈴木一郎,43,北海道札幌市,2023-01-20,suzuki_new@example.com
4,高橋みどり,31,福岡県福岡市,2023-02-01,takahashi@example.com
5,伊藤健太,39,愛知県名古屋市,2023-02-10,ito@example.com
6,渡辺裕子,25,京都府京都市,2023-06-15,watanabe@example.com
7,山本和彦,45,神奈川県横浜市,2023-06-20,yamamoto@example.com
- customer_data_20230801.csv(raw_customerへロード)
顧客ID,氏名,年齢,住所,登録日,メールアドレス
5,伊藤健太,40,愛知県名古屋市東区,2023-02-10,ito_updated@example.com
6,渡辺裕子,25,京都府京都市,2023-06-15,watanabe@example.com
7,山本和彦,45,神奈川県横浜市,2023-06-20,yamamoto@example.com
8,中村真由美,33,埼玉県さいたま市,2023-07-05,nakamura@example.com
9,小林誠,37,千葉県千葉市,2023-07-15,kobayashi@example.com
データの確認
今回はMART層でtarget_lag = '5 minutes'
としていたので、5分ほど待った後にデータがどうなったかを確認してみます。
以下、RAWとSTGレイヤーでcustomer関係のテーブルを1つずつ、MARTレイヤーでは3つ全て、テーブルの中身を確認してみます。
結果として、STG層もMART層も関係する全てのテーブルが更新されていることがわかりました。逆に言うと、MART_SALES_SUMMARY
テーブルだけはcustomer関係のテーブルを使用していないため、更新が行われていませんでした。必要な更新だけ行ってくれるのが素晴らしいですね!
RAW_CUSTOMER
テーブル
STG_CUSTOMER
テーブル
MART_CUSTOMER_PURCHASE
テーブル
MART_PRODUCT_SALES
テーブル
MART_SALES_SUMMARY
テーブル ※今回は更新なし
productとpurchaseに対して、CSVファイルを2つずつ格納してパイプラインの動作確認
最後に、残りのproductとpurchaseについてもデータをS3バケットに入れたときのパイプラインの動作確認をしてみます。
投入するデータ
作成した外部ステージと紐づくS3バケットに対して、CSVファイルを各テーブルごとに2つずつ格納します。
これらのCSVファイルでは、以下のように各IDごとにデータのアップデートが行われています。
-
productデータの重複と変更点
- 商品ID 103: 単価が120,000円から125,000円に値上げされ、在庫数が減少
- 商品ID 105: 商品名が「ゴルフセット」から「ゴルフセット プロモデル」に変更され、単価も上昇
- 商品ID 104, 106, 107: 在庫数のみ変更
-
purchaseデータの重複と変更点
- 購入明細ID 1004, 1005: 同じデータが複数ファイルに存在
- 購入明細ID 1006: 配送状況が「準備中」から「配送済み」に更新
- 購入明細ID 1008: 数量が1から2に変更され、購入金額も倍増
-
product_data_20230701.csv(raw_productへロード)
商品ID,商品名,カテゴリ,単価,在庫数,メーカー
103,ノートパソコン,電子機器,125000.00,5,コンピュータ社
104,ダイニングテーブル,家具,45000.00,3,木工房
105,ゴルフセット,スポーツ,78000.00,15,スポーツギア
106,高級バッグ,ファッション,55000.00,20,ラグジュアリーブランド
107,電子レンジ,家電,25000.00,40,家電メーカー
- product_data_20230801.csv(raw_productへロード)
商品ID,商品名,カテゴリ,単価,在庫数,メーカー
105,ゴルフセット プロモデル,スポーツ,85000.00,10,スポーツギア
106,高級バッグ,ファッション,55000.00,15,ラグジュアリーブランド
107,電子レンジ,家電,25000.00,35,家電メーカー
108,コーヒーメーカー,家電,18000.00,25,キッチン工房
109,ヨガマット,スポーツ,5000.00,50,フィットネス社
- purchase_data_20230701.csv(raw_purchaseへロード)
購入明細ID,顧客ID,商品ID,購入日,数量,購入金額,支払方法,配送状況
1004,4,106,2023-03-05,1,55000.00,電子マネー,配送済み
1005,5,104,2023-02-10,1,45000.00,クレジットカード,配送済み
1006,1,103,2023-04-05,1,125000.00,ローン,準備中
1007,3,105,2023-03-15,1,78000.00,クレジットカード,配送済み
1008,2,102,2023-04-10,1,65000.00,クレジットカード,配送中
- purchase_data_20230801.csv(raw_purchaseへロード)
購入明細ID,顧客ID,商品ID,購入日,数量,購入金額,支払方法,配送状況
1006,1,103,2023-04-05,1,125000.00,ローン,配送済み
1008,2,102,2023-04-10,2,130000.00,クレジットカード,配送済み
1009,6,107,2023-07-01,1,25000.00,クレジットカード,配送済み
1010,7,105,2023-07-10,1,85000.00,銀行振込,配送中
1011,5,108,2023-07-15,1,18000.00,電子マネー,配送済み
データの確認
今回はMART層でtarget_lag = '5 minutes'
としていたので、5分ほど待った後にデータがどうなったかを確認してみます。
以下、RAWとSTGレイヤーで今回更新していないcustomerテーブルを1つずつ、MARTレイヤーでは3つ全て、テーブルの中身を確認してみます。
結果として、今回データを更新していないSTG_CUSTOMER
テーブルでは差分チェックは行われたもののウェアハウスを用いた更新は行われておらず、今回S3に追加したproductとpurchaseのデータが更新されMART層も3つのテーブル全てが更新されたことがわかります。
RAW_CUSTOMER
テーブル ※今回は更新なし
STG_CUSTOMER
テーブル ※今回は更新なし
MART_CUSTOMER_PURCHASE
テーブル
MART_PRODUCT_SALES
テーブル
MART_SALES_SUMMARY
テーブル
最後に
SnowflakeのSnowpipeとDynamic Tableで重複削除・差分更新を行うデータパイプラインを構築してみたので、その内容をまとめてみました。
私は1年4ヶ月前にもDynamic Tableを一度試してブログにしているのですが、現在はどのタイミングでウェアハウスを使ったのかがSnowsight上でわかったり、自動でChange Trackingを有効化してくれたり、利便性が向上しているなと感じました!
SnowpipeとDynamic Tableの組み合わせは、2025年3月時点ではSnowflakeだけでデータパイプラインを構築する際の最有力の方法だと思いますので、ぜひ活用してみてください!