Delta Direct による Delta Lake テーブルの参照を試してみた #SnowflakeDB

Delta Direct による Delta Lake テーブルの参照を試してみた #SnowflakeDB

2026.06.27

はじめに

Snowflake の Delta Direct 機能を使った Delta Lake テーブルの参照と、その Delta Direct テーブルのダイナミックテーブルから参照を試してみた内容を記事としました。

Delta Direct の概要

Delta Direct とは、オブジェクトストレージ上に格納された Delta Lake テーブルのトランザクションログ(_delta_log/)を Snowflake が直接読み取り、Iceberg テーブルとして公開する機能です。Apache Iceberg への変換処理は不要で、Delta フォーマットのままアクセスできます。

本機能については以下に記載があります。

https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table-delta

https://docs.snowflake.com/en/user-guide/tables-iceberg-configure-catalog-integration-object-storage#label-tables-iceberg-create-cat-int-delta

主なポイントは以下の通りです。

  • Catalog Integration でTABLE_FORMAT = DELTAを指定することで Delta テーブルを参照できる
  • AUTO_REFRESH = TRUEを設定するとストレージイベント通知経由でメタデータが自動更新される
  • Delta テーブルのルートディレクトリ(_delta_log/を含むパス)をBASE_LOCATIONに指定する

前提条件

検証環境

以下の環境を使用しています。

  • Snowflake:トライアルアカウント
  • Databricks:Free Edition
  • ストレージ:Amazon S3

事前準備

Databricks 側で、Unity Catalog の外部ロケーションを使用した Delta テーブルを作成しておきます。外部ロケーションの作成手順は以下の記事をご参照ください。

https://dev.classmethod.jp/articles/iceberg-delta-snowflake-snowflake-iceberg-try/

外部ロケーションを確認した上で、検証用のカタログ・スキーマ・テーブルを作成します。

-- 検証用カタログの作成
CREATE CATALOG IF NOT EXISTS sf_delta_direct;

-- 既存の S3 外部ロケーションを使用した検証用スキーマの作成
CREATE SCHEMA IF NOT EXISTS sf_delta_direct.test_schema
  MANAGED LOCATION 's3://<バケット>/sf-delta-direct/';

-- Delta テーブルの作成
CREATE TABLE sf_delta_direct.test_schema.customer_info
  (
    id INT,
    name STRING
  )
  USING DELTA;

-- サンプルデータの追加
INSERT INTO sf_delta_direct.test_schema.customer_info
VALUES
  (1, '山田太郎'),
  (2, '佐藤花子'),
  (3, '鈴木一郎');

Delta テーブルのストレージパスはDESCRIBE DETAILで確認できます。後続の手順で Snowflake 側のBASE_LOCATIONで指定します。

DESCRIBE DETAIL sf_delta_direct.test_schema.customer_info;

locationカラムに、s3://<バケット>/sf-delta-direct/__unitystorage/... のようなパスが表示されます。

Delta Direct テーブルの作成

External Volume の作成

Delta テーブルのデータが格納されている S3 バケットへのアクセスに使用する External Volume を作成します。

CREATE OR REPLACE EXTERNAL VOLUME delta_external_volume
  STORAGE_LOCATIONS = (
    (
      NAME = 'my-s3-delta-location'
      STORAGE_PROVIDER = 'S3'
      STORAGE_BASE_URL = 's3://<バケット>/sf-delta-direct/'
      STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::xxxxxxxxxxxx:role/<IAMロール>'
    )
  );

DESCRIBE EXTERNAL VOLUME delta_external_volume;

DESCRIBEの出力からSTORAGE_AWS_IAM_USER_ARNSTORAGE_AWS_EXTERNAL_IDを確認し IAM ロールの信頼ポリシーに設定します。

https://docs.snowflake.com/ja/sql-reference/sql/create-external-volume

Catalog Integration の作成

Delta Direct 用の Catalog Integration を作成します。CATALOG_SOURCE = OBJECT_STORETABLE_FORMAT = DELTAの指定が必要です。

CREATE OR REPLACE CATALOG INTEGRATION delta_catalog_integration
  CATALOG_SOURCE = OBJECT_STORE
  TABLE_FORMAT = DELTA
  ENABLED = TRUE;

https://docs.snowflake.com/en/sql-reference/sql/create-catalog-integration

Iceberg テーブルの作成

事前準備で確認した Delta テーブルのパスをBASE_LOCATIONに指定し、Iceberg テーブルを作成します。AUTO_REFRESH = TRUEを指定することで、Delta テーブル側の変更が自動的に反映されます。

CREATE DATABASE IF NOT EXISTS test_db;
CREATE SCHEMA IF NOT EXISTS test_db.my_delta_table;

CREATE ICEBERG TABLE test_db.my_delta_table.customer_info
  EXTERNAL_VOLUME = 'delta_external_volume'
  CATALOG = 'delta_catalog_integration'
  BASE_LOCATION = '__unitystorage/schemas/fdf4133b-96ba-42cb-b118-b5a78fb8ee0d/tables/aefd89ce-e389-4758-abee-fbc0e9ab6e89/'
  AUTO_REFRESH = TRUE;

https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table-delta

データの確認

作成したテーブルをクエリすると、Databricks 側で挿入したデータが参照できます。

SELECT * FROM test_db.my_delta_table.customer_info;
+----+----------+
| ID | NAME     |
|----+----------|
|  1 | 山田太郎 |
|  2 | 佐藤花子 |
|  3 | 鈴木一郎 |
+----+----------+

データ変更の反映確認

Databricks 側でデータを追加

Databricks 側で追加のレコードを挿入します。

INSERT INTO sf_delta_direct.test_schema.customer_info
VALUES
  (4, '田中美咲'),
  (5, '高橋健太'),
  (6, '伊藤さくら'),
  (7, '渡辺大輔'),
  (8, '中村愛');

Snowflake 側での反映確認

AUTO_REFRESH = TRUEを指定しているため、ストレージ側のイベント通知経由でメタデータが自動更新されます。Snowflake 側でクエリすると追加分が反映されていることが確認できます。

SELECT * FROM test_db.my_delta_table.customer_info;
+----+------------+
| ID | NAME       |
|----+------------|
|  1 | 山田太郎   |
|  2 | 佐藤花子   |
|  3 | 鈴木一郎   |
|  4 | 田中美咲   |
|  5 | 高橋健太   |
|  6 | 伊藤さくら |
|  7 | 渡辺大輔   |
|  8 | 中村愛     |
+----+------------+

同様に注文テーブルも作成し、Databricks 側でデータを挿入した後に Delta Direct テーブルとして定義します。

-- Databricks 側:注文テーブルの作成とデータ挿入
CREATE TABLE sf_delta_direct.test_schema.orders
  (
    order_id INT,
    customer_id INT,
    order_date DATE,
    total_amount DECIMAL(10, 2)
  )
  USING DELTA;

INSERT INTO sf_delta_direct.test_schema.orders
VALUES
  (1001, 1, '2024-01-15', 123000.00),
  (1002, 2, '2024-01-18', 11000.00),
  (1003, 3, '2024-01-20', 35000.00),
  (1004, 1, '2024-02-05', 8500.00),
  (1005, 4, '2024-02-10', 45200.00),
  (1006, 5, '2024-02-12', 1500.00),
  (1007, 2, '2024-02-15', 6800.00),
  (1008, 6, '2024-03-01', 127500.00),
  (1009, 7, '2024-03-05', 4200.00),
  (1010, 3, '2024-03-10', 2500.00);

DESCRIBE DETAILでパスを確認し、Snowflake 側でテーブルを定義します。

-- Snowflake 側:orders テーブルの Delta Direct 定義
CREATE ICEBERG TABLE test_db.my_delta_table.orders
  EXTERNAL_VOLUME = 'delta_external_volume'
  CATALOG = 'delta_catalog_integration'
  BASE_LOCATION = '__unitystorage/schemas/fdf4133b-96ba-42cb-b118-b5a78fb8ee0d/tables/5c075caa-e504-486c-a851-809e390ddb4e/'
  AUTO_REFRESH = TRUE;

SELECT * FROM test_db.my_delta_table.orders;
+----------+-------------+------------+--------------+
| ORDER_ID | CUSTOMER_ID | ORDER_DATE | TOTAL_AMOUNT |
|----------+-------------+------------+--------------|
|     1001 |           1 | 2024-01-15 |    123000.00 |
|     1002 |           2 | 2024-01-18 |     11000.00 |
|     1003 |           3 | 2024-01-20 |     35000.00 |
|     1004 |           1 | 2024-02-05 |      8500.00 |
|     1005 |           4 | 2024-02-10 |     45200.00 |
|     1006 |           5 | 2024-02-12 |      1500.00 |
|     1007 |           2 | 2024-02-15 |      6800.00 |
|     1008 |           6 | 2024-03-01 |    127500.00 |
|     1009 |           7 | 2024-03-05 |      4200.00 |
|     1010 |           3 | 2024-03-10 |      2500.00 |
+----------+-------------+------------+--------------+

Dynamic Table で Delta Direct テーブルを参照する

2026年6月のアップデートでダイナミックテーブルから Delta Direct テーブルを参照する機能が一般提供となったので、こちらも試してみます。

https://docs.snowflake.com/en/release-notes/2026/10_22#dynamic-tables-supported-on-delta-direct-tables-general-availability

Dynamic Table の作成

Delta Direct テーブルを参照するダイナミックテーブルを定義します。ここではcustomer_infoordersを結合し、顧客ごとの注文合計を集計するダイナミックテーブルを作成します。

CREATE OR REPLACE DYNAMIC TABLE test_db.my_delta_table.dt_orders_with_customers
  TARGET_LAG = '1 minutes'
  WAREHOUSE = compute_wh
  REFRESH_MODE = INCREMENTAL
AS
  SELECT
    c.id,
    c.name,
    COUNT(o.order_id) AS order_count,
    SUM(o.total_amount) AS total_spent
  FROM test_db.my_delta_table.customer_info c
  LEFT JOIN test_db.my_delta_table.orders o
    ON c.id = o.customer_id
  GROUP BY c.id, c.name;

データの確認

作成直後にクエリすると、初期データが反映されていることが確認できます。

SELECT * FROM test_db.my_delta_table.dt_orders_with_customers;
+----+------------+-------------+-------------+
| ID | NAME       | ORDER_COUNT | TOTAL_SPENT |
|----+------------+-------------+-------------|
|  1 | 山田太郎   |           2 |   131500.00 |
|  2 | 佐藤花子   |           2 |    17800.00 |
|  3 | 鈴木一郎   |           2 |    37500.00 |
|  4 | 田中美咲   |           1 |    45200.00 |
|  5 | 高橋健太   |           1 |     1500.00 |
|  6 | 伊藤さくら |           1 |   127500.00 |
|  7 | 渡辺大輔   |           1 |     4200.00 |
|  8 | 中村愛     |           0 |        NULL |
+----+------------+-------------+-------------+

データ追加後の反映確認

Databricks 側でordersテーブルにレコードを追加します。

INSERT INTO sf_delta_direct.test_schema.orders
VALUES
  (1011, 8, '2024-03-15', 12000.00),
  (1012, 8, '2024-04-01', 6800.00);

TARGET_LAG = '1 minutes'で設定しているため、約1分後に Dynamic Table の内容が更新されます。id=8 の集計値が更新されていることが確認できます。

SELECT * FROM test_db.my_delta_table.dt_orders_with_customers
WHERE id = 8;
+----+--------+-------------+-------------+
| ID | NAME   | ORDER_COUNT | TOTAL_SPENT |
|----+--------+-------------+-------------|
|  8 | 中村愛 |           2 |    18800.00 |
+----+--------+-------------+-------------+

リフレッシュ履歴の確認

ダイナミックテーブルの更新履歴を確認すると、REFRESH_MODE = INCREMENTALが指定した Dynamic Table で増分リフレッシュが実行されていることが分かります。

SELECT
  name,
  state,
  data_timestamp,
  refresh_start_time,
  refresh_end_time,
  refresh_action,
  refresh_trigger,
  statistics
FROM TABLE(
  INFORMATION_SCHEMA.DYNAMIC_TABLE_REFRESH_HISTORY(
    NAME => 'TEST_DB.MY_DELTA_TABLE.DT_ORDERS_WITH_CUSTOMERS'
  )
)
WHERE refresh_action = 'INCREMENTAL'
ORDER BY refresh_start_time DESC
LIMIT 5;
+--------------------------+-----------+-------------------------------+-------------------------------+-------------------------------+----------------+-----------------+------------------------------+
| NAME                     | STATE     | DATA_TIMESTAMP                | REFRESH_START_TIME            | REFRESH_END_TIME              | REFRESH_ACTION | REFRESH_TRIGGER | STATISTICS                   |
|--------------------------+-----------+-------------------------------+-------------------------------+-------------------------------+----------------+-----------------+------------------------------|
| DT_ORDERS_WITH_CUSTOMERS | SUCCEEDED | 2026-06-27 03:22:22.064 -0700 | 2026-06-27 03:22:23.249 -0700 | 2026-06-27 03:22:25.265 -0700 | INCREMENTAL    | SCHEDULED       | {                            |
|                          |           |                               |                               |                               |                |                 |   "compilationTimeMs": 1370, |
|                          |           |                               |                               |                               |                |                 |   "executionTimeMs": 670,    |
|                          |           |                               |                               |                               |                |                 |   "numAddedPartitions": 2,   |
|                          |           |                               |                               |                               |                |                 |   "numCopiedRows": 7,        |
|                          |           |                               |                               |                               |                |                 |   "numDeletedRows": 1,       |
|                          |           |                               |                               |                               |                |                 |   "numInsertedRows": 1,      |
|                          |           |                               |                               |                               |                |                 |   "numRemovedPartitions": 1, |
|                          |           |                               |                               |                               |                |                 |   "queuedTimeMs": 0          |
|                          |           |                               |                               |                               |                |                 | }                            |
| DT_ORDERS_WITH_CUSTOMERS | SUCCEEDED | 2026-06-27 02:55:07.859 -0700 | 2026-06-27 02:55:08.033 -0700 | 2026-06-27 02:55:09.140 -0700 | INCREMENTAL    | CREATION        | {                            |
|                          |           |                               |                               |                               |                |                 |   "compilationTimeMs": 729,  |
|                          |           |                               |                               |                               |                |                 |   "executionTimeMs": 434,    |
|                          |           |                               |                               |                               |                |                 |   "numAddedPartitions": 1,   |
|                          |           |                               |                               |                               |                |                 |   "numCopiedRows": 0,        |
|                          |           |                               |                               |                               |                |                 |   "numDeletedRows": 0,       |
|                          |           |                               |                               |                               |                |                 |   "numInsertedRows": 8,      |
|                          |           |                               |                               |                               |                |                 |   "numRemovedPartitions": 0, |
|                          |           |                               |                               |                               |                |                 |   "queuedTimeMs": 0          |
|                          |           |                               |                               |                               |                |                 | }                            |
+--------------------------+-----------+-------------------------------+-------------------------------+-------------------------------+----------------+-----------------+------------------------------+

refresh_actionINCREMENTALstateSUCCEEDEDとなっており、statisticsnumInsertedRowsnumDeletedRowsからは差分のみが処理されていることも確認できます。

さいごに

Delta Direct を使ったオブジェクトストレージ上の Delta Lake テーブルの参照と、その Delta Direct テーブルのダイナミックテーブルからの参照を試してみました。
こちらの内容がどなたかの参考になれば幸いです。


Snowflakeの導入支援はクラスメソッドに!

クラスメソッドでは Snowflake の導入を支援しております。
製品の詳細や支援の内容についてお気軽にお問い合わせください。

Snowflakeの詳細を見る

この記事をシェアする

関連記事