SnowflakeのSnowpipeとDynamic Tableで重複削除・差分更新を行うデータパイプラインを構築してみた

SnowflakeのSnowpipeとDynamic Tableで重複削除・差分更新を行うデータパイプラインを構築してみた

Clock Icon2025.03.22

さがらです。

SnowflakeのSnowpipeとDynamic Tableで重複削除・差分更新を行うデータパイプラインを構築してみたので、その内容をまとめてみます。

※先日執筆した以下のブログの内容を、SnowpipeとDynamic Tableに置き換えて実装した場合にどう変わるかを試してみたブログとなります。タスク自体の説明はこちらの記事を参考にして頂ければと思います。

https://dev.classmethod.jp/articles/snowflake-make-data-pipeline-with-task/

SnowflakeのSnowpipeとは

SnowflakeのSnowpipeについて簡単に説明します。

https://docs.snowflake.com/en/user-guide/data-load-snowpipe-intro

  • 概要
    • S3に新しいデータが置かれたというイベント通知をトリガーに、COPY INTOコマンドを実行できる機能
  • 設定方法
    1. create pipeコマンドで作成
    2. S3バケット等でイベント通知の設定を行う(テーブル1つごとに必要
  • 使用するコンピュートリソース
    • Snowflake管理のリソースを利用し、実行時間の分だけ、コストが発生する
    • 加えて、処理するファイル数に応じての課金も発生する ※0.06 Credits per 1000 files
  • エラー通知
    • ERROR_INTEGRATIONパラメータでAmazon SNSと連携した通知が可能

また、Snowpipeに関しては以下の記事も非常に参考になりますので、併せてぜひご覧ください。

https://zenn.dev/dataheroes/articles/snowpipe-data-recovery-guide

SnowflakeのDynamic Tableとは

SnowflakeのDynamic Tableについて簡単に説明します。

https://docs.snowflake.com/en/user-guide/dynamic-tables-intro

  • 概要
    • SELECT文とTARGET_LAGの値を指定するだけで、自動で更新を行ってくれるテーブルを実装できる機能
    • 更新方法は全件更新(Full Refresh)か増分更新(Incremental Refresh)
    • ただし、増分更新が行えるのはこのドキュメントの条件に当てはまるクエリのみ。一般的なJOIN・GROUP BYなどはサポートしている
  • 使用するコンピュートリソース
    • ユーザー側で作成したウェアハウス
  • エラー通知
    • イベントテーブルとアラートを用いた設定が可能

Dynamic Table利用時の注意点

Dynamic Tableは非常に便利な機能なのですが、以下の点に注意が必要です。

  • UDF、外部関数、PIVOT関数・UNPIVOT関数、などがDynamic Tableの定義時にあるとエラーになる
  • current_timestampやCortex LLM Functionなど、全件更新(Full Refresh)でないと利用できないクエリがある
  • Dynamic Tableの定義時にFROM句に指定したテーブルのChange Trackingが有効化される(そのため、FROM句に指定したテーブルのDATA_RETENTION_TIME_IN_DAYSパラメータを0に設定することはできない)
  • Dynamic Tableの定義時にFROM句に指定したテーブルのMAX_DATA_EXTENSION_TIME_IN_DAYSの期間内に更新されない場合、古くなったDynamic Tableの更新を再開するためにDynamic Tableの再作成が必要となる可能性がある ※MAX_DATA_EXTENSION_TIME_IN_DAYSのデフォルト値は14日(参考Docs
  • 1つのSnowflakeアカウントで50000個までDynamic Tableを定義可能

他にも様々な制約がありますので、他の制約について知りたい場合は以下のドキュメントをご覧ください。

https://docs.snowflake.com/en/user-guide/dynamic-tables-limitations

パイプラインの構築

では早速、SnowpipeとDynamic Tableでパイプライン構築をしていきます。

今回はあくまで検証のため、全てSYSADMINロールで実行します。

前提条件

以下の3層構成でパイプラインを作成します。

  • RAW層:ステージからロードしたデータをそのままの形で保持するレイヤー
  • STG層:RAW層でロードしたデータでは主キーごとに重複が発生するため、その重複を削除するレイヤー
  • MART層:STG層で重複削除を行ったデータを元に、エンドユーザーが利用するデータを管理するレイヤー

事前準備:Storage Integrationの作成

事前にS3バケットに対するStorage Integrationを作成しておきます。今回はsagara_s3_intという名称で作成しました。

作成の際は、下記の公式Docsが参考になると思います。

https://docs.snowflake.com/ja/user-guide/data-load-s3-config-storage-integration

必要なデータベース・スキーマ・ウェアハウスの作成

-- データベースの作成
create database sagara_test_db;

-- スキーマの作成
use database sagara_test_db;
create schema raw;
create schema stg;
create schema mart;

-- ウェアハウスの作成
create warehouse if not exists compute_wh
with
  warehouse_size = 'xs'
  auto_suspend = 60
  auto_resume = true;

外部ステージとファイルフォーマットの作成

-- 名前付き外部ステージの作成
create or replace stage sagara_test_db.raw.raw_stage
    url = 's3://your-bucket-name/path/'
    storage_integration = sagara_s3_int;

-- 日本語データ用のCSVファイルフォーマットの作成(メタデータ対応)
create or replace file format sagara_test_db.raw.csv_format_with_metadata
    type = 'csv'
    parse_header = true
    error_on_column_count_mismatch = false
    field_delimiter = ','
    encoding = 'UTF8';

RAW層のテーブル定義

-- RAW層のテーブル作成(メタデータカラム追加)
create or replace table sagara_test_db.raw.raw_customer (
    "顧客ID" int,
    "氏名" varchar(100),
    "年齢" int,
    "住所" varchar(200),
    "登録日" date,
    "メールアドレス" varchar(100),
    "メタデータ_ファイル名" varchar(255),
    "メタデータ_ファイル更新日時" timestamp_ntz,
    "メタデータ_ファイルロード日時" timestamp_ntz
);

create or replace table sagara_test_db.raw.raw_product (
    "商品ID" int,
    "商品名" varchar(100),
    "カテゴリ" varchar(50),
    "単価" decimal(10,2),
    "在庫数" int,
    "メーカー" varchar(100),
    "メタデータ_ファイル名" varchar(255),
    "メタデータ_ファイル更新日時" timestamp_ntz,
    "メタデータ_ファイルロード日時" timestamp_ntz
);

create or replace table sagara_test_db.raw.raw_purchase (
    "購入明細ID" int,
    "顧客ID" int,
    "商品ID" int,
    "購入日" date,
    "数量" int,
    "購入金額" decimal(10,2),
    "支払方法" varchar(50),
    "配送状況" varchar(50),
    "メタデータ_ファイル名" varchar(255),
    "メタデータ_ファイル更新日時" timestamp_ntz,
    "メタデータ_ファイルロード日時" timestamp_ntz
);

RAW層へのデータロード用Snowpipeの定義

-- 顧客データ用のSnowpipe
create or replace pipe sagara_test_db.raw.pipe_raw_customer
auto_ingest = true
as
copy into sagara_test_db.raw.raw_customer
from @sagara_test_db.raw.raw_stage/customer
file_format = sagara_test_db.raw.csv_format_with_metadata
match_by_column_name = case_insensitive
include_metadata = (
    "メタデータ_ファイル名" = metadata$filename,
    "メタデータ_ファイル更新日時" = metadata$file_last_modified,
    "メタデータ_ファイルロード日時" = metadata$start_scan_time
);

-- 商品データ用のSnowpipe
create or replace pipe sagara_test_db.raw.pipe_raw_product
auto_ingest = true
as
copy into sagara_test_db.raw.raw_product
from @sagara_test_db.raw.raw_stage/product
file_format = sagara_test_db.raw.csv_format_with_metadata
match_by_column_name = case_insensitive
include_metadata = (
    "メタデータ_ファイル名" = metadata$filename,
    "メタデータ_ファイル更新日時" = metadata$file_last_modified,
    "メタデータ_ファイルロード日時" = metadata$start_scan_time
);

-- 購入データ用のSnowpipe
create or replace pipe sagara_test_db.raw.pipe_raw_purchase
auto_ingest = true
as
copy into sagara_test_db.raw.raw_purchase
from @sagara_test_db.raw.raw_stage/purchase
file_format = sagara_test_db.raw.csv_format_with_metadata
match_by_column_name = case_insensitive
include_metadata = (
    "メタデータ_ファイル名" = metadata$filename,
    "メタデータ_ファイル更新日時" = metadata$file_last_modified,
    "メタデータ_ファイルロード日時" = metadata$start_scan_time
);

この設定をした後に、各pipeに対して以下のクエリで出力されるnotificationChannelNameからSQSキューのARNを確認し、下記のドキュメントに沿ってS3イベント通知を設定します。

-- 各Snowpipeの通知チャネルARNを取得(外部ストレージ連携用)
select system$pipe_status('sagara_test_db.raw.pipe_raw_customer');
select system$pipe_status('sagara_test_db.raw.pipe_raw_product');
select system$pipe_status('sagara_test_db.raw.pipe_raw_purchase');

https://docs.snowflake.com/ja/user-guide/data-load-snowpipe-auto-s3#determining-the-correct-option

STG層のテーブル定義 (Dynamic Table)

STG層のDynamic Tableを定義します。

target_lag = 'downstream'とすることで、MART層で設定したtarget_lagに応じて自動更新が行われるようになります。

refresh_mode = autoとしていますが、このSTG層のクエリは増分更新が可能なクエリのため、増分更新で更新が行われます。

-- STG層のDynamic Table作成
create or replace dynamic table sagara_test_db.stg.stg_customer
    target_lag = 'downstream'
    warehouse = compute_wh
    refresh_mode = auto
as
    select
        "顧客ID",
        "氏名",
        "年齢",
        "住所",
        "登録日",
        "メールアドレス",
        "メタデータ_ファイル名",
        "メタデータ_ファイル更新日時",
        "メタデータ_ファイルロード日時"
    from (
        select
            "顧客ID",
            "氏名",
            "年齢",
            "住所",
            "登録日",
            "メールアドレス",
            "メタデータ_ファイル名",
            "メタデータ_ファイル更新日時",
            "メタデータ_ファイルロード日時",
            row_number() over (partition by "顧客ID" order by "メタデータ_ファイルロード日時" desc) as rn
        from sagara_test_db.raw.raw_customer
    )
    where rn = 1;

create or replace dynamic table sagara_test_db.stg.stg_product
    target_lag = 'downstream'
    warehouse = compute_wh
    refresh_mode = auto
as
    select
        "商品ID",
        "商品名",
        "カテゴリ",
        "単価",
        "在庫数",
        "メーカー",
        "メタデータ_ファイル名",
        "メタデータ_ファイル更新日時",
        "メタデータ_ファイルロード日時"
    from (
        select
            "商品ID",
            "商品名",
            "カテゴリ",
            "単価",
            "在庫数",
            "メーカー",
            "メタデータ_ファイル名",
            "メタデータ_ファイル更新日時",
            "メタデータ_ファイルロード日時",
            row_number() over (partition by "商品ID" order by "メタデータ_ファイルロード日時" desc) as rn
        from sagara_test_db.raw.raw_product
    )
    where rn = 1;

create or replace dynamic table sagara_test_db.stg.stg_purchase
    target_lag = 'downstream'
    warehouse = compute_wh
    refresh_mode = auto
as
    select
        "購入明細ID",
        "顧客ID",
        "商品ID",
        "購入日",
        "数量",
        "購入金額",
        "支払方法",
        "配送状況",
        "メタデータ_ファイル名",
        "メタデータ_ファイル更新日時",
        "メタデータ_ファイルロード日時"
    from (
        select
            "購入明細ID",
            "顧客ID",
            "商品ID",
            "購入日",
            "数量",
            "購入金額",
            "支払方法",
            "配送状況",
            "メタデータ_ファイル名",
            "メタデータ_ファイル更新日時",
            "メタデータ_ファイルロード日時",
            row_number() over (partition by "購入明細ID" order by "メタデータ_ファイルロード日時" desc) as rn
        from sagara_test_db.raw.raw_purchase
    )
    where rn = 1;

実際に作成したテーブルを見ると、refresh_mode = autoとしていましたがRefresh ModeIncrementalと表示されていました。

2025-03-22_21h44_34

MART層のテーブル定義 (Dynamic Table)

MART層のDynamic Tableを定義します。

MART層ではtarget_lag = '5 minutes'のように、各テーブルごとに何分以内に更新されてほしいかを定義しておきます。

refresh_mode = autoとしていますが、公式Docsにも記載があるcurrent_timestamp関数を使用しているので、全件更新のみ対応となります。

MART層のクエリにおいては、クエリの内容によって増分更新と全件更新、どちらが使えるかはクエリの内容次第になると思います。以下の公式Docsも参考に、各テーブルがどの更新方法となるか確認することをおすすめします。

https://docs.snowflake.com/ja/user-guide/dynamic-tables-refresh

-- MART層のDynamic Table作成
create or replace dynamic table sagara_test_db.mart.mart_customer_purchase
    target_lag = '5 minutes'
    warehouse = compute_wh
    refresh_mode = auto
as
    with customer_purchase_summary as (
        select
            c."顧客ID",
            c."氏名",
            c."年齢",
            c."住所",
            count(p."購入明細ID") as "購入回数",
            sum(p."購入金額") as "合計購入金額",
            max(p."購入日") as "最終購入日"
        from sagara_test_db.stg.stg_customer c
        left join sagara_test_db.stg.stg_purchase p on c."顧客ID" = p."顧客ID"
        group by c."顧客ID", c."氏名", c."年齢", c."住所"
    ),
    category_frequency as (
        select
            p."顧客ID",
            prod."カテゴリ",
            count(*) as category_count,
            row_number() over (partition by p."顧客ID" order by count(*) desc) as rn
        from sagara_test_db.stg.stg_purchase p
        join sagara_test_db.stg.stg_product prod on p."商品ID" = prod."商品ID"
        group by p."顧客ID", prod."カテゴリ"
    ),
    payment_frequency as (
        select
            p."顧客ID",
            p."支払方法",
            count(*) as payment_count,
            row_number() over (partition by p."顧客ID" order by count(*) desc) as rn
        from sagara_test_db.stg.stg_purchase p
        group by p."顧客ID", p."支払方法"
    )
    select
        cps."顧客ID",
        cps."氏名",
        cps."年齢",
        cps."住所",
        cps."購入回数",
        cps."合計購入金額",
        cps."最終購入日",
        cf."カテゴリ" as "最頻購入カテゴリ",
        pf."支払方法" as "最頻支払方法",
        current_timestamp() as "メタデータ_レコード更新日時"
    from customer_purchase_summary cps
    left join category_frequency cf on cps."顧客ID" = cf."顧客ID" and cf.rn = 1
    left join payment_frequency pf on cps."顧客ID" = pf."顧客ID" and pf.rn = 1;

create or replace dynamic table sagara_test_db.mart.mart_product_sales
    target_lag = '5 minutes'
    warehouse = compute_wh
    refresh_mode = auto
as
    select
        p."商品ID",
        p."商品名",
        p."カテゴリ",
        p."メーカー",
        sum(pur."数量") as "販売数量",
        sum(pur."購入金額") as "販売金額",
        count(distinct pur."顧客ID") as "購入顧客数",
        avg(c."年齢") as "平均購入年齢",
        current_timestamp() as "メタデータ_レコード更新日時"
    from sagara_test_db.stg.stg_product p
    left join sagara_test_db.stg.stg_purchase pur on p."商品ID" = pur."商品ID"
    left join sagara_test_db.stg.stg_customer c on pur."顧客ID" = c."顧客ID"
    group by p."商品ID", p."商品名", p."カテゴリ", p."メーカー";

create or replace dynamic table sagara_test_db.mart.mart_sales_summary
    target_lag = '5 minutes'
    warehouse = compute_wh
    refresh_mode = auto
as
    with current_month_sales as (
        select
            date_trunc('month', p."購入日") as month_date,
            prod."カテゴリ",
            sum(p."数量") as total_quantity,
            sum(p."購入金額") as total_amount,
            count(distinct p."顧客ID") as customer_count
        from sagara_test_db.stg.stg_purchase p
        join sagara_test_db.stg.stg_product prod on p."商品ID" = prod."商品ID"
        group by month_date, prod."カテゴリ"
    ),
    previous_month_sales as (
        select
            dateadd('month', 1, month_date) as next_month,
            "カテゴリ",
            total_amount as prev_amount
        from current_month_sales
    )
    select
        cms.month_date as "集計日",
        cms."カテゴリ",
        cms.total_quantity as "販売数量",
        cms.total_amount as "販売金額",
        cms.customer_count as "購入顧客数",
        case 
            when pms.prev_amount is null or pms.prev_amount = 0 then null
            else (cms.total_amount - pms.prev_amount) / pms.prev_amount * 100 
        end as "前月比",
        current_timestamp() as "メタデータ_レコード更新日時"
    from current_month_sales cms
    left join previous_month_sales pms on cms.month_date = pms.next_month and cms."カテゴリ" = pms."カテゴリ";

実際に作成したテーブルを見ると、refresh_mode = autoとしていましたがRefresh ModeFullと表示されていました。

2025-03-22_21h42_19

参考:このクエリにより作成されるGraphタブ

Dynamic Tableを作成すると、Graphタブから各テーブルの依存関係を可視化してみることが出来ます。

以下はSTG_CUSTOMERテーブルからGraphタブを見た際のスクリーンショットです。

2025-03-22_22h26_09

CSVファイルを1つずつ格納してパイプラインの動作確認

投入するデータ

作成した外部ステージと紐づくS3バケットに対して、CSVファイルを各テーブルごとに1つずつ格納します。

  • customer_data_20230601.csv(raw_customerへロード)
顧客ID,氏名,年齢,住所,登録日,メールアドレス
1,田中太郎,35,東京都新宿区,2023-01-01,tanaka@example.com
2,佐藤花子,28,大阪府大阪市,2023-01-15,sato@example.com
3,鈴木一郎,42,北海道札幌市,2023-01-20,suzuki@example.com
4,高橋みどり,31,福岡県福岡市,2023-02-01,takahashi@example.com
5,伊藤健太,39,愛知県名古屋市,2023-02-10,ito@example.com
  • product_data_20230601.csv(raw_productへロード)
商品ID,商品名,カテゴリ,単価,在庫数,メーカー
101,高級腕時計,アクセサリー,85000.00,12,スイス時計
102,スマートフォン,電子機器,65000.00,35,テックコーポ
103,ノートパソコン,電子機器,120000.00,8,コンピュータ社
104,ダイニングテーブル,家具,45000.00,5,木工房
105,ゴルフセット,スポーツ,78000.00,15,スポーツギア
  • purchase_data_20230601.csv(raw_purchaseへロード)
購入明細ID,顧客ID,商品ID,購入日,数量,購入金額,支払方法,配送状況
1001,1,102,2023-01-15,1,65000.00,クレジットカード,配送済み
1002,2,107,2023-02-20,1,25000.00,銀行振込,配送済み
1003,3,101,2023-01-30,1,85000.00,クレジットカード,配送済み
1004,4,106,2023-03-05,1,55000.00,電子マネー,配送中
1005,5,104,2023-02-10,1,45000.00,クレジットカード,配送済み

データの確認

今回はMART層でtarget_lag = '5 minutes'としていたので、5分ほど待った後にデータがどうなったかを確認してみます。

以下、RAWとSTGレイヤーでテーブルを1つ、MARTレイヤーでは3つ全て、テーブルの中身を確認してみます。※生成AIで作ったデータファイルのため、まだ商品IDが登録されていないデータも存在しますが、後述の2回目以降の処理時に更新されます。

  • RAW_CUSTOMERテーブル

2025-03-22_22h03_54

  • STG_CUSTOMERテーブル

2025-03-22_22h04_36

2025-03-22_22h07_16

  • MART_CUSTOMER_PURCHASEテーブル

2025-03-22_22h05_46

2025-03-22_22h06_39

  • MART_PRODUCT_SALESテーブル

2025-03-22_22h07_45

2025-03-22_22h08_21

  • MART_SALES_SUMMARYテーブル

2025-03-22_22h09_51

2025-03-22_22h10_21

customerに対してのみ、CSVファイルを2つ格納してパイプラインの動作確認

ここから追加のデータを入れたときの挙動確認をするのですが、まずはcustomerに関するデータだけを入れたときに、MART層の更新が行われるのかを確認してみます。

投入するデータ

作成した外部ステージと紐づくS3バケットに対して、CSVファイルを2つ格納します。

これらのCSVファイルでは、以下のように各IDごとにデータのアップデートが行われています。

  • customerデータの重複と変更点

    • 顧客ID 3: 年齢が42から43に更新され、メールアドレスも変更
    • 顧客ID 5: 年齢が39から40に更新され、住所が詳細化され、メールアドレスも変更
    • 顧客ID 4, 6, 7: 複数のファイルに同じデータが存在
  • customer_data_20230701.csv(raw_customerへロード)

顧客ID,氏名,年齢,住所,登録日,メールアドレス
3,鈴木一郎,43,北海道札幌市,2023-01-20,suzuki_new@example.com
4,高橋みどり,31,福岡県福岡市,2023-02-01,takahashi@example.com
5,伊藤健太,39,愛知県名古屋市,2023-02-10,ito@example.com
6,渡辺裕子,25,京都府京都市,2023-06-15,watanabe@example.com
7,山本和彦,45,神奈川県横浜市,2023-06-20,yamamoto@example.com
  • customer_data_20230801.csv(raw_customerへロード)
顧客ID,氏名,年齢,住所,登録日,メールアドレス
5,伊藤健太,40,愛知県名古屋市東区,2023-02-10,ito_updated@example.com
6,渡辺裕子,25,京都府京都市,2023-06-15,watanabe@example.com
7,山本和彦,45,神奈川県横浜市,2023-06-20,yamamoto@example.com
8,中村真由美,33,埼玉県さいたま市,2023-07-05,nakamura@example.com
9,小林誠,37,千葉県千葉市,2023-07-15,kobayashi@example.com

データの確認

今回はMART層でtarget_lag = '5 minutes'としていたので、5分ほど待った後にデータがどうなったかを確認してみます。

以下、RAWとSTGレイヤーでcustomer関係のテーブルを1つずつ、MARTレイヤーでは3つ全て、テーブルの中身を確認してみます。

結果として、STG層もMART層も関係する全てのテーブルが更新されていることがわかりました。逆に言うと、MART_SALES_SUMMARYテーブルだけはcustomer関係のテーブルを使用していないため、更新が行われていませんでした。必要な更新だけ行ってくれるのが素晴らしいですね!

  • RAW_CUSTOMERテーブル

2025-03-22_22h17_24

  • STG_CUSTOMERテーブル

2025-03-22_22h17_52

2025-03-22_22h18_19

  • MART_CUSTOMER_PURCHASEテーブル

2025-03-22_22h19_00

2025-03-22_22h19_27

  • MART_PRODUCT_SALESテーブル

2025-03-22_22h19_58

2025-03-22_22h20_31

  • MART_SALES_SUMMARYテーブル ※今回は更新なし

2025-03-22_22h21_10

2025-03-22_22h21_30

productとpurchaseに対して、CSVファイルを2つずつ格納してパイプラインの動作確認

最後に、残りのproductとpurchaseについてもデータをS3バケットに入れたときのパイプラインの動作確認をしてみます。

投入するデータ

作成した外部ステージと紐づくS3バケットに対して、CSVファイルを各テーブルごとに2つずつ格納します。

これらのCSVファイルでは、以下のように各IDごとにデータのアップデートが行われています。

  • productデータの重複と変更点

    • 商品ID 103: 単価が120,000円から125,000円に値上げされ、在庫数が減少
    • 商品ID 105: 商品名が「ゴルフセット」から「ゴルフセット プロモデル」に変更され、単価も上昇
    • 商品ID 104, 106, 107: 在庫数のみ変更
  • purchaseデータの重複と変更点

    • 購入明細ID 1004, 1005: 同じデータが複数ファイルに存在
    • 購入明細ID 1006: 配送状況が「準備中」から「配送済み」に更新
    • 購入明細ID 1008: 数量が1から2に変更され、購入金額も倍増
  • product_data_20230701.csv(raw_productへロード)

商品ID,商品名,カテゴリ,単価,在庫数,メーカー
103,ノートパソコン,電子機器,125000.00,5,コンピュータ社
104,ダイニングテーブル,家具,45000.00,3,木工房
105,ゴルフセット,スポーツ,78000.00,15,スポーツギア
106,高級バッグ,ファッション,55000.00,20,ラグジュアリーブランド
107,電子レンジ,家電,25000.00,40,家電メーカー
  • product_data_20230801.csv(raw_productへロード)
商品ID,商品名,カテゴリ,単価,在庫数,メーカー
105,ゴルフセット プロモデル,スポーツ,85000.00,10,スポーツギア
106,高級バッグ,ファッション,55000.00,15,ラグジュアリーブランド
107,電子レンジ,家電,25000.00,35,家電メーカー
108,コーヒーメーカー,家電,18000.00,25,キッチン工房
109,ヨガマット,スポーツ,5000.00,50,フィットネス社
  • purchase_data_20230701.csv(raw_purchaseへロード)
購入明細ID,顧客ID,商品ID,購入日,数量,購入金額,支払方法,配送状況
1004,4,106,2023-03-05,1,55000.00,電子マネー,配送済み
1005,5,104,2023-02-10,1,45000.00,クレジットカード,配送済み
1006,1,103,2023-04-05,1,125000.00,ローン,準備中
1007,3,105,2023-03-15,1,78000.00,クレジットカード,配送済み
1008,2,102,2023-04-10,1,65000.00,クレジットカード,配送中
  • purchase_data_20230801.csv(raw_purchaseへロード)
購入明細ID,顧客ID,商品ID,購入日,数量,購入金額,支払方法,配送状況
1006,1,103,2023-04-05,1,125000.00,ローン,配送済み
1008,2,102,2023-04-10,2,130000.00,クレジットカード,配送済み
1009,6,107,2023-07-01,1,25000.00,クレジットカード,配送済み
1010,7,105,2023-07-10,1,85000.00,銀行振込,配送中
1011,5,108,2023-07-15,1,18000.00,電子マネー,配送済み

データの確認

今回はMART層でtarget_lag = '5 minutes'としていたので、5分ほど待った後にデータがどうなったかを確認してみます。

以下、RAWとSTGレイヤーで今回更新していないcustomerテーブルを1つずつ、MARTレイヤーでは3つ全て、テーブルの中身を確認してみます。

結果として、今回データを更新していないSTG_CUSTOMERテーブルでは差分チェックは行われたもののウェアハウスを用いた更新は行われておらず、今回S3に追加したproductとpurchaseのデータが更新されMART層も3つのテーブル全てが更新されたことがわかります。

  • RAW_CUSTOMERテーブル ※今回は更新なし

2025-03-22_22h27_18

  • STG_CUSTOMERテーブル ※今回は更新なし

2025-03-22_22h28_15

2025-03-22_22h28_38

  • MART_CUSTOMER_PURCHASEテーブル

2025-03-22_22h33_36

2025-03-22_22h34_00

  • MART_PRODUCT_SALESテーブル

2025-03-22_22h34_46

2025-03-22_22h35_25

  • MART_SALES_SUMMARYテーブル

2025-03-22_22h36_02

2025-03-22_22h36_24

最後に

SnowflakeのSnowpipeとDynamic Tableで重複削除・差分更新を行うデータパイプラインを構築してみたので、その内容をまとめてみました。

私は1年4ヶ月前にもDynamic Tableを一度試してブログにしているのですが、現在はどのタイミングでウェアハウスを使ったのかがSnowsight上でわかったり、自動でChange Trackingを有効化してくれたり、利便性が向上しているなと感じました!

SnowpipeとDynamic Tableの組み合わせは、2025年3月時点ではSnowflakeだけでデータパイプラインを構築する際の最有力の方法だと思いますので、ぜひ活用してみてください!

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.