[新機能] Snowflake Dynamic TablesのCustom Incrementalがパブリックプレビューとなったので試してみた

[新機能] Snowflake Dynamic TablesのCustom Incrementalがパブリックプレビューとなったので試してみた

2026.05.29

かわばたです。

2026年5月26日にSnowflake Dynamic Tablesの新機能として、Custom Incremental(ドキュメント上ではCustom incrementalization、REFRESH_MODE = CUSTOM_INCREMENTAL)がPublic Previewでリリースされました。

従来のDynamic TablesではSELECT文でリフレッシュロジックを定義し、増分処理はSnowflakeに一任する仕組みでしたが、Custom IncrementalではREFRESH USING句にMERGE INTO SELFINSERT INTO SELFのDML文を自分で定義できます。

https://docs.snowflake.com/en/user-guide/dynamic-tables/custom-incrementalization

これにより、ストリーム+タスクで実現していた複雑なパイプラインをDynamic Tableに統合できるようになります。スケジューリング・リトライ・トランザクション保証はSnowflakeが管理してくれるため、運用負荷を下げつつ柔軟な増分処理が書けるのが魅力です。

機能概要

Custom Incrementalでは、Dynamic Tableの定義にREFRESH USING句を追加し、リフレッシュ時に実行されるDML文(MERGE INTO SELFまたはINSERT INTO SELF)を記述します。

従来のDynamic Tableとの違いは以下のとおりです。

項目 従来のDynamic Table Custom Incremental
リフレッシュロジック SELECT文で定義。増分処理はSnowflakeに一任 MERGE/INSERT DMLをユーザーが定義
変更データの取得 Snowflakeが自動管理 CHANGES()句でストリーム相当のセマンティクスを利用
対応パターン シンプルな変換・集計 ソフトデリート、ステートフル集計、変更データと静的テーブルの結合など
スケジューリング・リトライ Snowflake管理 Snowflake管理(同じ)

ただし、Custom Incrementalでは出力セマンティクスをユーザー自身が定義するため、SELECTベースのDynamic TableのようにSnowflakeがクエリ結果との同等性(Delayed-view equivalence)を自動保証するわけではありません。MERGE/INSERTロジックの正しさはユーザー側で担保する必要があります。

主要な構文要素

REFRESH USING: リフレッシュ時に実行するDML文を定義します。MERGE INTO SELF(既存行の更新・削除が必要な場合)またはINSERT INTO SELF(追記のみの場合)を使用します。SELFはDynamic Table自身を参照するキーワードです。

CHANGES(): ベーステーブルの変更データを取得します。ストリームと同様のセマンティクスで、前回リフレッシュ以降の変更を返します。時間境界はSnowflakeが自動管理するため、ユーザー側で指定する必要はありません。

  • INFORMATION => APPEND_ONLY: INSERTされた行のみ返す
  • INFORMATION => DEFAULT: INSERT、UPDATE、DELETEすべての変更を返す

メタデータカラム: CHANGES()句でDEFAULTモードを使用した場合、以下のメタデータカラムが利用できます。

カラム 説明
METADATA$ACTION 'INSERT'または'DELETE'。UPDATE操作はDELETE+INSERTのペアとして表現される
METADATA$ISUPDATE 行がUPDATE操作の一部の場合TRUE

BACKFILL FROM: Dynamic Table作成時の初期データを既存テーブルからクローンで投入します。詳細は後述の初期化戦略のセクションで確認します。

START AT: CUSTOM_INCREMENTALかつBACKFILL FROM指定時に使用します。BACKFILL FROMと組み合わせて、増分処理の開始地点を指定します。TIMESTAMPSTATEMENT(クエリID)、STREAM(既存ストリームのオフセット)、OFFSET(現在時刻からの負の秒数)が使用可能です。BACKFILL FROMSTART ATは作成時のみ指定可能で、後からALTER DYNAMIC TABLEで追加・変更できません。

コスト

2026年5月27日時点では、公式DocにCustom Incremental固有のコストに関する記載はありませんでした。通常のDynamic Tableと同様、リフレッシュ時のウェアハウスコストが発生します。

検証環境

  • Snowflake: AWSリージョン、Enterpriseエディション
  • ベーステーブルの要件: CHANGE_TRACKING = TRUEが必要

事前準備

検証用データベース・スキーマ・ウェアハウスの作成

USE ROLE SYSADMIN;

-- データベース・スキーマの作成
CREATE DATABASE IF NOT EXISTS DT_CUSTOM_INCR_DEMO;
USE DATABASE DT_CUSTOM_INCR_DEMO;
CREATE SCHEMA IF NOT EXISTS PUBLIC;
USE SCHEMA PUBLIC;

-- ウェアハウスの作成
CREATE WAREHOUSE IF NOT EXISTS WH_DT_XS
  WAREHOUSE_SIZE = 'XSMALL'
  AUTO_SUSPEND = 60
  AUTO_RESUME = TRUE;

USE WAREHOUSE WH_DT_XS;

ベーステーブルの作成とデータ投入

Custom IncrementalではCHANGES()句を使用するため、ベーステーブルでChange Trackingが有効である必要があります。Dynamic Table作成時にSnowflakeが自動で有効化を試みますが、ここでは明示的にCHANGE_TRACKING = TRUEを設定します。

-- 検証用テーブル(ECサイトの注文データ)
-- CHANGE_TRACKING = TRUEが必須
CREATE OR REPLACE TABLE orders (
    order_id INT,
    customer_name STRING,
    product_name STRING,
    order_date DATE,
    quantity INT,
    unit_price DECIMAL(10,2),
    total_amount DECIMAL(10,2),
    status STRING,
    region STRING
)
CHANGE_TRACKING = TRUE;
-- 初期データ(20件)
INSERT INTO orders VALUES
    (1, 'Taro Yamada', 'Wireless Mouse', '2026-05-01', 2, 25.00, 50.00, 'Shipped', 'Tokyo'),
    (2, 'Hanako Sato', 'USB Keyboard', '2026-05-01', 1, 45.00, 45.00, 'Delivered', 'Osaka'),
    (3, 'Jiro Tanaka', 'Monitor Stand', '2026-05-02', 1, 89.00, 89.00, 'Shipped', 'Tokyo'),
    (4, 'Yuki Suzuki', 'Webcam HD', '2026-05-02', 3, 35.00, 105.00, 'Processing', 'Nagoya'),
    (5, 'Ken Watanabe', 'USB Hub', '2026-05-03', 2, 18.00, 36.00, 'Shipped', 'Tokyo'),
    (6, 'Mika Ito', 'Laptop Bag', '2026-05-03', 1, 55.00, 55.00, 'Delivered', 'Osaka'),
    (7, 'Ryo Kimura', 'HDMI Cable', '2026-05-04', 4, 12.00, 48.00, 'Shipped', 'Fukuoka'),
    (8, 'Aoi Hayashi', 'Wireless Mouse', '2026-05-04', 1, 25.00, 25.00, 'Processing', 'Tokyo'),
    (9, 'Sho Nakamura', 'USB Keyboard', '2026-05-05', 2, 45.00, 90.00, 'Shipped', 'Osaka'),
    (10, 'Eri Yamamoto', 'Monitor Stand', '2026-05-05', 1, 89.00, 89.00, 'Delivered', 'Nagoya'),
    (11, 'Daiki Kobayashi', 'Webcam HD', '2026-05-06', 1, 35.00, 35.00, 'Shipped', 'Tokyo'),
    (12, 'Nana Yoshida', 'USB Hub', '2026-05-06', 3, 18.00, 54.00, 'Shipped', 'Osaka'),
    (13, 'Taku Matsumoto', 'Laptop Bag', '2026-05-07', 1, 55.00, 55.00, 'Processing', 'Fukuoka'),
    (14, 'Mai Inoue', 'HDMI Cable', '2026-05-07', 2, 12.00, 24.00, 'Delivered', 'Tokyo'),
    (15, 'Sota Kato', 'Wireless Mouse', '2026-05-08', 1, 25.00, 25.00, 'Shipped', 'Nagoya'),
    (16, 'Hina Ogawa', 'USB Keyboard', '2026-05-08', 1, 45.00, 45.00, 'Shipped', 'Osaka'),
    (17, 'Yuta Murakami', 'Monitor Stand', '2026-05-09', 2, 89.00, 178.00, 'Processing', 'Tokyo'),
    (18, 'Sakura Saito', 'Webcam HD', '2026-05-09', 1, 35.00, 35.00, 'Shipped', 'Fukuoka'),
    (19, 'Kaito Takahashi', 'USB Hub', '2026-05-10', 5, 18.00, 90.00, 'Delivered', 'Tokyo'),
    (20, 'Rena Morita', 'Laptop Bag', '2026-05-10', 1, 55.00, 55.00, 'Shipped', 'Osaka');

20件投入されていれば問題ありません。

SELECT COUNT(*) AS row_count FROM orders;

試してみた

INSERT INTO SELFパターン: 新規注文の追記

まずは最もシンプルなパターンとして、ベーステーブルに追加された新規行をDynamic Tableへそのまま追記するINSERT INTO SELFパターンを試します。

-- INSERT INTO SELFパターンのDynamic Tableを作成
CREATE OR REPLACE DYNAMIC TABLE dt_orders_append (
    order_id INT,
    customer_name STRING,
    product_name STRING,
    order_date DATE,
    quantity INT,
    unit_price DECIMAL(10,2),
    total_amount DECIMAL(10,2),
    status STRING,
    region STRING
)
  TARGET_LAG = '1 minute'
  WAREHOUSE = WH_DT_XS
  BACKFILL FROM orders
  REFRESH USING (
    INSERT INTO SELF
    SELECT
        order_id,
        customer_name,
        product_name,
        order_date,
        quantity,
        unit_price,
        total_amount,
        status,
        region
    FROM orders CHANGES(INFORMATION => APPEND_ONLY)
  );

初期データが投入されていることを確認します。

SELECT COUNT(*) AS row_count FROM dt_orders_append;

20件であればOKです。

2026-05-29_13h45_33

次に、ベーステーブルに新しいデータを追加します。

-- 3件追加
INSERT INTO orders VALUES
    (21, 'Takeshi Honda', 'Wireless Mouse', '2026-05-11', 1, 25.00, 25.00, 'Shipped', 'Tokyo'),
    (22, 'Ayaka Nishimura', 'USB Keyboard', '2026-05-11', 2, 45.00, 90.00, 'Processing', 'Osaka'),
    (23, 'Ryota Endo', 'Webcam HD', '2026-05-12', 1, 35.00, 35.00, 'Shipped', 'Nagoya');

TARGET_LAG = '1 minute'は目標遅延であり、厳密に1分ごとにリフレッシュが実行されることを保証するものではありません。本検証では、リフレッシュが完了するまで1分程度待ってから確認しています。

SELECT COUNT(*) AS row_count FROM dt_orders_append;

23件に増えていればOKです。INSERT INTO SELFによって、追加された3件のみが増分で追記されました。

このDynamic Tableは追記専用のため、ベーステーブル側のUPDATEDELETEは反映されません。注文の現在状態を同期したい場合は、CHANGES(INFORMATION => DEFAULT)MERGE INTO SELFを使ったロジックが必要です。

2026-05-29_13h47_32

リフレッシュ履歴も確認しておきます。

SELECT
    name,
    refresh_action,
    refresh_trigger,
    state
FROM TABLE(INFORMATION_SCHEMA.DYNAMIC_TABLE_REFRESH_HISTORY(
    NAME => 'DT_CUSTOM_INCR_DEMO.PUBLIC.DT_ORDERS_APPEND'
))
ORDER BY data_timestamp DESC
LIMIT 5;

2026-05-29_13h48_13

MERGE INTO SELFパターン: リージョン別売上の増分集計

次に、MERGE INTO SELFパターンを試します。リージョン別の注文件数と売上合計を、MERGEで増分集計するDynamic Tableを作成します。

既存リージョンの場合はUPDATE(累積加算)、新規リージョンの場合はINSERTという動作になります。

公式ドキュメント上、BACKFILL FROMの構文はBACKFILL FROM <table_name>であり、サブクエリは指定できません。そのため、集計済みのバックフィル用テーブルを事前に作成します。

-- バックフィル用の集計テーブルを作成
CREATE OR REPLACE TABLE region_sales_backfill AS
SELECT
    region,
    COUNT(*) AS order_count,
    SUM(total_amount) AS total_sales
FROM orders
GROUP BY region;
-- MERGE INTO SELFパターンのDynamic Tableを作成
CREATE OR REPLACE DYNAMIC TABLE dt_region_sales (
    region STRING,
    order_count INT,
    total_sales DECIMAL(12,2)
)
  TARGET_LAG = '1 minute'
  WAREHOUSE = WH_DT_XS
  BACKFILL FROM region_sales_backfill
  REFRESH USING (
    MERGE INTO SELF AS tgt
    USING (
        SELECT
            region,
            COUNT(*) AS new_order_count,
            SUM(total_amount) AS new_total_sales
        FROM orders CHANGES(INFORMATION => APPEND_ONLY)
        GROUP BY region
    ) AS src
    ON tgt.region = src.region
    WHEN MATCHED THEN UPDATE SET
        tgt.order_count = tgt.order_count + src.new_order_count,
        tgt.total_sales = tgt.total_sales + src.new_total_sales
    WHEN NOT MATCHED THEN INSERT (region, order_count, total_sales)
        VALUES (src.region, src.new_order_count, src.new_total_sales)
  );

初期データを確認します。

SELECT * FROM dt_region_sales ORDER BY region;

この時点では、ステップ1で追加した order_id = 21〜23 もordersに含まれているため、dt_region_salesの初期集計は23件分のデータを元に作成されます。各リージョン(Tokyo, Osaka, Nagoya, Fukuoka)の集計値が表示されればOKです。

2026-05-29_14h10_14

まず、既存リージョンへのINSERTで集計値が更新されることを確認します。

-- 既存リージョン(TokyoとOsaka)にデータ追加
INSERT INTO orders VALUES
    (24, 'Shota Fujita', 'HDMI Cable', '2026-05-13', 2, 12.00, 24.00, 'Shipped', 'Tokyo'),
    (25, 'Rina Shimizu', 'Laptop Bag', '2026-05-13', 1, 55.00, 55.00, 'Delivered', 'Osaka');

約1分待ってから集計結果を確認します。

SELECT * FROM dt_region_sales ORDER BY region;

2026-05-29_14h12_28

TokyoとOsakaのorder_counttotal_salesがそれぞれ+1件分増加していれば問題ありません。MERGE INTO SELFWHEN MATCHED THEN UPDATEが正しく動作していることを確認できます。

次に、新規リージョンのINSERTで新しい行が追加されることを確認します。

-- 新規リージョン(Sapporo)のデータを追加
INSERT INTO orders VALUES
    (26, 'Yuki Aoyama', 'Monitor Stand', '2026-05-14', 1, 89.00, 89.00, 'Shipped', 'Sapporo');

約1分待ってから確認します。

SELECT * FROM dt_region_sales ORDER BY region;

Sapporoが新しい行として追加されていれば問題ありません。MERGE INTO SELFのWHEN NOT MATCHED THEN INSERTが正しく動作しています。

2026-05-29_14h17_21

METADATA$ACTIONMETADATA$ISUPDATEの確認

CHANGES()句でINFORMATION => DEFAULTを指定した場合、メタデータカラムMETADATA$ACTIONMETADATA$ISUPDATEが利用できます。これを使うことで、INSERT・UPDATE・DELETEを区別した処理が書けます。

実際にベーステーブルにUPDATE・DELETEを行い、メタデータカラムの値を確認してみます。

まず、CHANGES()で取得する変更区間の開始地点として、現在時刻をセッション変数に保存します。その後、UPDATE・DELETEを実行します。

-- 変更取得の開始地点を保存
SET ts_before_changes = (SELECT CURRENT_TIMESTAMP());

-- UPDATE: order_id=4のステータスを変更
UPDATE orders SET status = 'Delivered', total_amount = 110.00 WHERE order_id = 4;

-- DELETE: order_id=20を削除
DELETE FROM orders WHERE order_id = 20;

-- INSERT: 新規データ追加
INSERT INTO orders VALUES
    (27, 'Kenji Arai', 'USB Hub', '2026-05-15', 1, 18.00, 18.00, 'Shipped', 'Tokyo');

CHANGES()句で変更データを直接確認してみます。

-- DEFAULT(INSERT/UPDATE/DELETE全て)で変更を確認
SELECT
    METADATA$ACTION,
    METADATA$ISUPDATE,
    order_id,
    customer_name,
    status,
    total_amount
FROM orders
  CHANGES(INFORMATION => DEFAULT)
  AT(TIMESTAMP => $ts_before_changes)
ORDER BY order_id, METADATA$ACTION;

以下のような結果が確認できるはずです。

  • UPDATE(order_id=4): METADATA$ACTION = 'DELETE' + METADATA$ISUPDATE = TRUE(更新前の行)とMETADATA$ACTION = 'INSERT' + METADATA$ISUPDATE = TRUE(更新後の行)のペア
  • DELETE(order_id=20): METADATA$ACTION = 'DELETE' + METADATA$ISUPDATE = FALSE
  • INSERT(order_id=27): METADATA$ACTION = 'INSERT' + METADATA$ISUPDATE = FALSE

2026-05-29_14h37_14

この仕組みを活用すると、たとえば削除された行のみを記録する監査ログ用のDynamic Tableも作成できます。

-- 削除監査ログ用のDynamic Table(参考例)
CREATE OR REPLACE DYNAMIC TABLE dt_deletions_log (
    order_id INT,
    customer_name STRING,
    product_name STRING,
    status STRING
)
  TARGET_LAG = '1 minute'
  WAREHOUSE = WH_DT_XS
  INITIALIZE = ON_SCHEDULE
  REFRESH USING (
    INSERT INTO SELF
    SELECT
        order_id,
        customer_name,
        product_name,
        status
    FROM orders CHANGES(INFORMATION => DEFAULT)
    WHERE NOT METADATA$ISUPDATE AND METADATA$ACTION = 'DELETE'
  );

BACKFILL FROMとSTART ATで初期化戦略を確認する

BACKFILL FROMを指定しない場合、初回リフレッシュではREFRESH USING句のDMLによって、ソーステーブルの既存行がINSERTとして処理されます。そのため、大きなテーブルでは初回処理のコストや時間に注意が必要です。

一方、BACKFILL FROMを指定すると、既存データは指定したテーブルからクローンで投入され、REFRESH USINGのロジックはバックフィル後の変更から処理されます。

ここでは、BACKFILL FROMSTART ATを組み合わせて、作成時点以降のINSERTのみを取り込む構成を試します。空のバックフィル用テーブルを使い、START ATで増分処理の開始地点を明示します。

-- 空のバックフィル用テーブルを作成
CREATE OR REPLACE TABLE empty_orders LIKE orders;

-- 増分処理の開始地点を保存
SET start_ts = (SELECT CURRENT_TIMESTAMP());
-- 空テーブル + START ATで作成時点以降のみ取り込む構成
CREATE OR REPLACE DYNAMIC TABLE dt_orders_from_now (
    order_id INT,
    customer_name STRING,
    product_name STRING,
    order_date DATE,
    quantity INT,
    unit_price DECIMAL(10,2),
    total_amount DECIMAL(10,2),
    status STRING,
    region STRING
)
  TARGET_LAG = '1 minute'
  WAREHOUSE = WH_DT_XS
  REFRESH_MODE = CUSTOM_INCREMENTAL
  BACKFILL FROM empty_orders
  START AT (TIMESTAMP => $start_ts)
  REFRESH USING (
    INSERT INTO SELF
    SELECT
        order_id,
        customer_name,
        product_name,
        order_date,
        quantity,
        unit_price,
        total_amount,
        status,
        region
    FROM orders CHANGES(INFORMATION => APPEND_ONLY)
  );

初回リフレッシュ完了後、件数を確認します。空のテーブルからバックフィルし、START AT以降の変更のみを処理するため、0件であればOKです。

SELECT COUNT(*) AS row_count FROM dt_orders_from_now;

2026-05-29_14h49_38

新しいデータを追加して、以降のINSERTは取り込まれることを確認します。

INSERT INTO orders VALUES
    (28, 'Haruka Tanabe', 'HDMI Cable', '2026-05-16', 3, 12.00, 36.00, 'Shipped', 'Osaka');

約1分待ってから確認します。

SELECT COUNT(*) AS row_count FROM dt_orders_from_now;
SELECT * FROM dt_orders_from_now;

作成後にINSERTした行のみが取り込まれていればOKです。

2026-05-29_14h51_04

最後に

Snowflake Dynamic Tablesの新機能であるCustom Incrementalを試しました。

従来のストリーム+タスクで実現していた「増分データを取得してMERGE/INSERTする」というパイプラインを、Dynamic Tableとして宣言的に定義できるようになったのはありがたい機能だと思います。
特に今まで「宣言的に正しい結果をSnowflakeに任せる」アプローチが、ユーザー側で制御を行うことができるようになった点は選択肢が増えて良いと感じました

この記事が、Dynamic TablesのCustom Incrementalを試す際の参考になれば幸いです。


Snowflakeの導入支援はクラスメソッドに!

クラスメソッドでは Snowflake の導入を支援しております。
製品の詳細や支援の内容についてお気軽にお問い合わせください。

Snowflakeの詳細を見る

この記事をシェアする

関連記事