Delta Direct による Delta Lake テーブルの参照を試してみた #SnowflakeDB
はじめに
Snowflake の Delta Direct 機能を使った Delta Lake テーブルの参照と、その Delta Direct テーブルのダイナミックテーブルから参照を試してみた内容を記事としました。
Delta Direct の概要
Delta Direct とは、オブジェクトストレージ上に格納された Delta Lake テーブルのトランザクションログ(_delta_log/)を Snowflake が直接読み取り、Iceberg テーブルとして公開する機能です。Apache Iceberg への変換処理は不要で、Delta フォーマットのままアクセスできます。
本機能については以下に記載があります。
主なポイントは以下の通りです。
- Catalog Integration で
TABLE_FORMAT = DELTAを指定することで Delta テーブルを参照できる AUTO_REFRESH = TRUEを設定するとストレージイベント通知経由でメタデータが自動更新される- Delta テーブルのルートディレクトリ(
_delta_log/を含むパス)をBASE_LOCATIONに指定する
前提条件
検証環境
以下の環境を使用しています。
- Snowflake:トライアルアカウント
- Databricks:Free Edition
- ストレージ:Amazon S3
事前準備
Databricks 側で、Unity Catalog の外部ロケーションを使用した Delta テーブルを作成しておきます。外部ロケーションの作成手順は以下の記事をご参照ください。
外部ロケーションを確認した上で、検証用のカタログ・スキーマ・テーブルを作成します。
-- 検証用カタログの作成
CREATE CATALOG IF NOT EXISTS sf_delta_direct;
-- 既存の S3 外部ロケーションを使用した検証用スキーマの作成
CREATE SCHEMA IF NOT EXISTS sf_delta_direct.test_schema
MANAGED LOCATION 's3://<バケット>/sf-delta-direct/';
-- Delta テーブルの作成
CREATE TABLE sf_delta_direct.test_schema.customer_info
(
id INT,
name STRING
)
USING DELTA;
-- サンプルデータの追加
INSERT INTO sf_delta_direct.test_schema.customer_info
VALUES
(1, '山田太郎'),
(2, '佐藤花子'),
(3, '鈴木一郎');
Delta テーブルのストレージパスはDESCRIBE DETAILで確認できます。後続の手順で Snowflake 側のBASE_LOCATIONで指定します。
DESCRIBE DETAIL sf_delta_direct.test_schema.customer_info;
locationカラムに、s3://<バケット>/sf-delta-direct/__unitystorage/... のようなパスが表示されます。
Delta Direct テーブルの作成
External Volume の作成
Delta テーブルのデータが格納されている S3 バケットへのアクセスに使用する External Volume を作成します。
CREATE OR REPLACE EXTERNAL VOLUME delta_external_volume
STORAGE_LOCATIONS = (
(
NAME = 'my-s3-delta-location'
STORAGE_PROVIDER = 'S3'
STORAGE_BASE_URL = 's3://<バケット>/sf-delta-direct/'
STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::xxxxxxxxxxxx:role/<IAMロール>'
)
);
DESCRIBE EXTERNAL VOLUME delta_external_volume;
DESCRIBEの出力からSTORAGE_AWS_IAM_USER_ARNとSTORAGE_AWS_EXTERNAL_IDを確認し IAM ロールの信頼ポリシーに設定します。
Catalog Integration の作成
Delta Direct 用の Catalog Integration を作成します。CATALOG_SOURCE = OBJECT_STOREとTABLE_FORMAT = DELTAの指定が必要です。
CREATE OR REPLACE CATALOG INTEGRATION delta_catalog_integration
CATALOG_SOURCE = OBJECT_STORE
TABLE_FORMAT = DELTA
ENABLED = TRUE;
Iceberg テーブルの作成
事前準備で確認した Delta テーブルのパスをBASE_LOCATIONに指定し、Iceberg テーブルを作成します。AUTO_REFRESH = TRUEを指定することで、Delta テーブル側の変更が自動的に反映されます。
CREATE DATABASE IF NOT EXISTS test_db;
CREATE SCHEMA IF NOT EXISTS test_db.my_delta_table;
CREATE ICEBERG TABLE test_db.my_delta_table.customer_info
EXTERNAL_VOLUME = 'delta_external_volume'
CATALOG = 'delta_catalog_integration'
BASE_LOCATION = '__unitystorage/schemas/fdf4133b-96ba-42cb-b118-b5a78fb8ee0d/tables/aefd89ce-e389-4758-abee-fbc0e9ab6e89/'
AUTO_REFRESH = TRUE;
データの確認
作成したテーブルをクエリすると、Databricks 側で挿入したデータが参照できます。
SELECT * FROM test_db.my_delta_table.customer_info;
+----+----------+
| ID | NAME |
|----+----------|
| 1 | 山田太郎 |
| 2 | 佐藤花子 |
| 3 | 鈴木一郎 |
+----+----------+
データ変更の反映確認
Databricks 側でデータを追加
Databricks 側で追加のレコードを挿入します。
INSERT INTO sf_delta_direct.test_schema.customer_info
VALUES
(4, '田中美咲'),
(5, '高橋健太'),
(6, '伊藤さくら'),
(7, '渡辺大輔'),
(8, '中村愛');
Snowflake 側での反映確認
AUTO_REFRESH = TRUEを指定しているため、ストレージ側のイベント通知経由でメタデータが自動更新されます。Snowflake 側でクエリすると追加分が反映されていることが確認できます。
SELECT * FROM test_db.my_delta_table.customer_info;
+----+------------+
| ID | NAME |
|----+------------|
| 1 | 山田太郎 |
| 2 | 佐藤花子 |
| 3 | 鈴木一郎 |
| 4 | 田中美咲 |
| 5 | 高橋健太 |
| 6 | 伊藤さくら |
| 7 | 渡辺大輔 |
| 8 | 中村愛 |
+----+------------+
同様に注文テーブルも作成し、Databricks 側でデータを挿入した後に Delta Direct テーブルとして定義します。
-- Databricks 側:注文テーブルの作成とデータ挿入
CREATE TABLE sf_delta_direct.test_schema.orders
(
order_id INT,
customer_id INT,
order_date DATE,
total_amount DECIMAL(10, 2)
)
USING DELTA;
INSERT INTO sf_delta_direct.test_schema.orders
VALUES
(1001, 1, '2024-01-15', 123000.00),
(1002, 2, '2024-01-18', 11000.00),
(1003, 3, '2024-01-20', 35000.00),
(1004, 1, '2024-02-05', 8500.00),
(1005, 4, '2024-02-10', 45200.00),
(1006, 5, '2024-02-12', 1500.00),
(1007, 2, '2024-02-15', 6800.00),
(1008, 6, '2024-03-01', 127500.00),
(1009, 7, '2024-03-05', 4200.00),
(1010, 3, '2024-03-10', 2500.00);
DESCRIBE DETAILでパスを確認し、Snowflake 側でテーブルを定義します。
-- Snowflake 側:orders テーブルの Delta Direct 定義
CREATE ICEBERG TABLE test_db.my_delta_table.orders
EXTERNAL_VOLUME = 'delta_external_volume'
CATALOG = 'delta_catalog_integration'
BASE_LOCATION = '__unitystorage/schemas/fdf4133b-96ba-42cb-b118-b5a78fb8ee0d/tables/5c075caa-e504-486c-a851-809e390ddb4e/'
AUTO_REFRESH = TRUE;
SELECT * FROM test_db.my_delta_table.orders;
+----------+-------------+------------+--------------+
| ORDER_ID | CUSTOMER_ID | ORDER_DATE | TOTAL_AMOUNT |
|----------+-------------+------------+--------------|
| 1001 | 1 | 2024-01-15 | 123000.00 |
| 1002 | 2 | 2024-01-18 | 11000.00 |
| 1003 | 3 | 2024-01-20 | 35000.00 |
| 1004 | 1 | 2024-02-05 | 8500.00 |
| 1005 | 4 | 2024-02-10 | 45200.00 |
| 1006 | 5 | 2024-02-12 | 1500.00 |
| 1007 | 2 | 2024-02-15 | 6800.00 |
| 1008 | 6 | 2024-03-01 | 127500.00 |
| 1009 | 7 | 2024-03-05 | 4200.00 |
| 1010 | 3 | 2024-03-10 | 2500.00 |
+----------+-------------+------------+--------------+
Dynamic Table で Delta Direct テーブルを参照する
2026年6月のアップデートでダイナミックテーブルから Delta Direct テーブルを参照する機能が一般提供となったので、こちらも試してみます。
Dynamic Table の作成
Delta Direct テーブルを参照するダイナミックテーブルを定義します。ここではcustomer_infoとordersを結合し、顧客ごとの注文合計を集計するダイナミックテーブルを作成します。
CREATE OR REPLACE DYNAMIC TABLE test_db.my_delta_table.dt_orders_with_customers
TARGET_LAG = '1 minutes'
WAREHOUSE = compute_wh
REFRESH_MODE = INCREMENTAL
AS
SELECT
c.id,
c.name,
COUNT(o.order_id) AS order_count,
SUM(o.total_amount) AS total_spent
FROM test_db.my_delta_table.customer_info c
LEFT JOIN test_db.my_delta_table.orders o
ON c.id = o.customer_id
GROUP BY c.id, c.name;
データの確認
作成直後にクエリすると、初期データが反映されていることが確認できます。
SELECT * FROM test_db.my_delta_table.dt_orders_with_customers;
+----+------------+-------------+-------------+
| ID | NAME | ORDER_COUNT | TOTAL_SPENT |
|----+------------+-------------+-------------|
| 1 | 山田太郎 | 2 | 131500.00 |
| 2 | 佐藤花子 | 2 | 17800.00 |
| 3 | 鈴木一郎 | 2 | 37500.00 |
| 4 | 田中美咲 | 1 | 45200.00 |
| 5 | 高橋健太 | 1 | 1500.00 |
| 6 | 伊藤さくら | 1 | 127500.00 |
| 7 | 渡辺大輔 | 1 | 4200.00 |
| 8 | 中村愛 | 0 | NULL |
+----+------------+-------------+-------------+
データ追加後の反映確認
Databricks 側でordersテーブルにレコードを追加します。
INSERT INTO sf_delta_direct.test_schema.orders
VALUES
(1011, 8, '2024-03-15', 12000.00),
(1012, 8, '2024-04-01', 6800.00);
TARGET_LAG = '1 minutes'で設定しているため、約1分後に Dynamic Table の内容が更新されます。id=8 の集計値が更新されていることが確認できます。
SELECT * FROM test_db.my_delta_table.dt_orders_with_customers
WHERE id = 8;
+----+--------+-------------+-------------+
| ID | NAME | ORDER_COUNT | TOTAL_SPENT |
|----+--------+-------------+-------------|
| 8 | 中村愛 | 2 | 18800.00 |
+----+--------+-------------+-------------+
リフレッシュ履歴の確認
ダイナミックテーブルの更新履歴を確認すると、REFRESH_MODE = INCREMENTALが指定した Dynamic Table で増分リフレッシュが実行されていることが分かります。
SELECT
name,
state,
data_timestamp,
refresh_start_time,
refresh_end_time,
refresh_action,
refresh_trigger,
statistics
FROM TABLE(
INFORMATION_SCHEMA.DYNAMIC_TABLE_REFRESH_HISTORY(
NAME => 'TEST_DB.MY_DELTA_TABLE.DT_ORDERS_WITH_CUSTOMERS'
)
)
WHERE refresh_action = 'INCREMENTAL'
ORDER BY refresh_start_time DESC
LIMIT 5;
+--------------------------+-----------+-------------------------------+-------------------------------+-------------------------------+----------------+-----------------+------------------------------+
| NAME | STATE | DATA_TIMESTAMP | REFRESH_START_TIME | REFRESH_END_TIME | REFRESH_ACTION | REFRESH_TRIGGER | STATISTICS |
|--------------------------+-----------+-------------------------------+-------------------------------+-------------------------------+----------------+-----------------+------------------------------|
| DT_ORDERS_WITH_CUSTOMERS | SUCCEEDED | 2026-06-27 03:22:22.064 -0700 | 2026-06-27 03:22:23.249 -0700 | 2026-06-27 03:22:25.265 -0700 | INCREMENTAL | SCHEDULED | { |
| | | | | | | | "compilationTimeMs": 1370, |
| | | | | | | | "executionTimeMs": 670, |
| | | | | | | | "numAddedPartitions": 2, |
| | | | | | | | "numCopiedRows": 7, |
| | | | | | | | "numDeletedRows": 1, |
| | | | | | | | "numInsertedRows": 1, |
| | | | | | | | "numRemovedPartitions": 1, |
| | | | | | | | "queuedTimeMs": 0 |
| | | | | | | | } |
| DT_ORDERS_WITH_CUSTOMERS | SUCCEEDED | 2026-06-27 02:55:07.859 -0700 | 2026-06-27 02:55:08.033 -0700 | 2026-06-27 02:55:09.140 -0700 | INCREMENTAL | CREATION | { |
| | | | | | | | "compilationTimeMs": 729, |
| | | | | | | | "executionTimeMs": 434, |
| | | | | | | | "numAddedPartitions": 1, |
| | | | | | | | "numCopiedRows": 0, |
| | | | | | | | "numDeletedRows": 0, |
| | | | | | | | "numInsertedRows": 8, |
| | | | | | | | "numRemovedPartitions": 0, |
| | | | | | | | "queuedTimeMs": 0 |
| | | | | | | | } |
+--------------------------+-----------+-------------------------------+-------------------------------+-------------------------------+----------------+-----------------+------------------------------+
refresh_actionがINCREMENTAL、stateがSUCCEEDEDとなっており、statisticsのnumInsertedRowsやnumDeletedRowsからは差分のみが処理されていることも確認できます。
さいごに
Delta Direct を使ったオブジェクトストレージ上の Delta Lake テーブルの参照と、その Delta Direct テーブルのダイナミックテーブルからの参照を試してみました。
こちらの内容がどなたかの参考になれば幸いです。








