pg_lake・pg_incremental を使って Snowflake Postgres の Iceberg テーブルへの増分同期を試してみた
はじめに
Snowflake Postgres の pg_lake・pg_incremental 拡張を使って、PostgreSQL のテーブルデータを Iceberg 形式で増分同期し、Snowflake 側からの参照を試してみた内容を記事としました。
Snowflake Postgres の概要
本構成については以下に記載があります。
Snowflake Postgres は Snowflake が提供するフルマネージドの PostgreSQL サービスです。pg_lake・pg_incremental 拡張を組み合わせることで、PostgreSQL のテーブルデータを Iceberg 形式でオブジェクトストレージに書き出し、Snowflake 側から Iceberg テーブルとして読み取ることができます。
- pg_lake:PostgreSQL のデータを Iceberg 形式で S3 に書き出す Snowflake 提供の拡張。ストレージは Snowflake が管理するため、S3 バケット・IAM ロール・外部ボリュームの設定は不要
- pg_incremental:pg_cron をベースに構築されている。pg_lake と組み合わせて「前回処理済みのところから新しい分だけ」を処理するパイプラインを作ることができる
データフローは以下のようになります。
[Snowflake Postgres]
通常テーブル
↓ pg_incremental(time_interval ごとに増分を書き出し)
Iceberg テーブル(S3 ※Snowflake 管理)
↓ カタログ統合(REFRESH_INTERVAL_SECONDS ごとにメタデータを更新)
[Snowflake]
Iceberg テーブル(AUTO_REFRESH = TRUE)※読み取り専用
ETL ツール不要で PostgreSQL → Snowflake の増分同期が完結するのが最大の特徴です。
公式のクイックスタートも提供されています。
前提条件
検証環境
以下の環境を使用しています。
- Snowflake トライアルアカウント
- Snowflake Postgres インスタンス
- コンピュートファミリー:STANDARD_M
- コア数:1 / メモリ:4 GB / ストレージ:100 GB
- PostgreSQL バージョン:18.4
以下に記載がありますが、pg_lake を使用するには STANDARD または HIGH MEMORY のコンピュートが必要です。BURSTABLE は非対応です。
事前準備
Snowflake Postgres インスタンスを Snowsight から作成します。作成には、アカウントレベルのCREATE POSTGRES INSTANCE権限が必要です。

インスタンス作成後、ローカルから psql で接続できることを確認しておきます。
export PGHOST=<エンドポイント>.postgres.snowflake.app
export PGPORT=5432
export PGUSER=snowflake_admin
export PGPASSWORD=<パスワード>
export PGDATABASE=postgres
psql -c "SELECT version();"
version
-----------------------------------------------------------------------------------------------------------------
PostgreSQL 18.4 on aarch64-unknown-linux-gnu, compiled by gcc (GCC) 11.5.0 20240719 (Red Hat 11.5.0-14), 64-bit
使用する拡張が利用可能か確認しておきます。
SELECT name, default_version, installed_version
FROM pg_available_extensions
WHERE name IN ('pg_lake', 'pg_incremental', 'pg_cron');
name | default_version | installed_version
----------------+-----------------+-------------------
pg_incremental | 1.5 |
pg_lake | 3.3 |
pg_cron | 1.6 |
installed_versionが空欄の場合はまだインストールされていない状態です。以降の手順でインストールします。
同期元のサンプルテーブルを作成しておきます。
CREATE TABLE orders (
id BIGSERIAL PRIMARY KEY,
customer_id INT NOT NULL,
amount NUMERIC(10,2) NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
INSERT INTO orders (customer_id, amount, status)
SELECT
(random() * 1000)::INT,
(random() * 10000)::NUMERIC(10,2),
(ARRAY['pending','processing','completed','cancelled'])[floor(random()*4+1)]
FROM generate_series(1, 10000);
pg_lake 拡張と Iceberg テーブルの作成
pg_lake を有効化し、同期先となる Iceberg テーブルを作成します。
-- pg_lake 有効化(CASCADE で依存拡張も一括インストール)
CREATE EXTENSION pg_lake CASCADE;
CASCADEを指定することで、複数の依存拡張がまとめてインストールされます。
-- Iceberg テーブル作成(orders の同期先)
CREATE TABLE orders_iceberg (
id BIGINT,
customer_id INT,
amount NUMERIC(10,2),
status TEXT,
created_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ
) USING iceberg;
USING icebergで作成したテーブルは通常のテーブルではなく Foreign Tableとして作成されます。\dtでは表示されないため、\dEで確認します。
\dE
List of relations
Schema | Name | Type | Owner
--------+----------------+---------------+-----------------
public | orders_iceberg | foreign table | snowflake_admin
Iceberg ファイルの保存先 S3 パスは Snowflake が自動的に割り当てるため、ユーザー側での設定は不要です。
pg_incremental パイプラインの作成
pg_incremental を使って、updated_atをキーとした増分同期パイプラインを作成します。
-- pg_cron / pg_incremental 有効化
CREATE EXTENSION pg_cron;
CREATE EXTENSION pg_incremental;
-- パイプライン作成(1分ごとに updated_at ベースで増分コピー)
SELECT incremental.create_time_interval_pipeline(
pipeline_name := 'sync_orders_to_iceberg',
time_interval := '1 minute',
source_table_name := 'orders',
start_time := (SELECT MIN(updated_at) FROM orders),
command := $$
INSERT INTO orders_iceberg
SELECT id, customer_id, amount, status, created_at, updated_at
FROM orders
WHERE updated_at >= $1 AND updated_at < $2
$$
);
各引数の設定は以下のとおりです。
| 引数 | 値 | 説明 |
|---|---|---|
pipeline_name |
'sync_orders_to_iceberg' |
パイプラインの名称 |
time_interval |
'1 minute' |
1回の実行で処理する時間の幅。pg_cron で1分ごとに起動し、この幅で区切って順番に処理する |
source_table_name |
'orders' |
処理対象のテーブル名 |
start_time |
MIN(updated_at) |
パイプラインが最初に処理を始める時刻。最古のタイムスタンプを指定することで既存データのバックフィルが可能 |
command |
$$...$$ |
各時間窓で実行される SQL。$1が窓の開始時刻、$2が終了時刻に自動で置き換えられる |
start_timeから現在時刻まで1分単位の時間窓を区切りながら処理を進め、現在時刻に追いついたら次の窓が来るまで待機します。
ポイントとして、pg_incremental は append-only(追記のみ)を前提としています。
パイプラインが正常に登録されたかは以下で確認できます。
-- パイプライン一覧
SELECT * FROM incremental.pipelines;
-- time_interval パイプラインの詳細
SELECT * FROM incremental.time_interval_pipelines;
pipeline_name | time_interval | batched | min_delay | last_processed_time
------------------------+---------------+---------+-----------+------------------------
sync_orders_to_iceberg | 00:01:00 | t | 00:00:30 | 2026-06-28 06:36:00+00
min_delay(ここでは 30秒)は、過去の時間窓を処理するまでの待機時間です。
last_processed_timeが現在時刻に追いついたら初期バックフィル完了です。
Snowflake 側の設定
Snowflake から Postgres Iceberg カタログを参照するようにカタログ統合を作成します。
CREATE OR REPLACE CATALOG INTEGRATION postgres_iceberg_integration
CATALOG_SOURCE = SNOWFLAKE_POSTGRES
TABLE_FORMAT = ICEBERG
CATALOG_NAMESPACE = 'public'
REST_CONFIG = (
POSTGRES_INSTANCE = '<インスタンス名>'
CATALOG_NAME = 'postgres'
ACCESS_DELEGATION_MODE = VENDED_CREDENTIALS
)
ENABLED = TRUE;
ACCESS_DELEGATION_MODE = VENDED_CREDENTIALSを指定することで、Snowflake が S3 へのアクセス認証情報を自動発行します。外部ボリュームや IAM ロールの設定は不要です。
続けて Snowflake 側の Iceberg テーブルを作成します。
CREATE DATABASE IF NOT EXISTS pglake;
USE DATABASE pglake;
CREATE OR REPLACE ICEBERG TABLE orders_from_postgres
CATALOG = 'postgres_iceberg_integration'
CATALOG_TABLE_NAME = 'orders_iceberg'
AUTO_REFRESH = TRUE;
ここでは個別の Iceberg テーブルとして作成しましたが、Postgres データベースからテーブルを自動的に検出して同期するカタログリンクデータベースとしても作成できます。
作成後、データが参照できることを確認します。
SELECT COUNT(*) FROM orders_from_postgres;
SELECT * FROM orders_from_postgres LIMIT 5;
レイテンシの計測
簡単ではありますが、以降に記載の手順で Postgres 側で追加したレコードを Snowflake 側で参照できるまでの時間を計測しました。
計測方法
計測手順は以下のとおりです。
- サンプルテーブルの
status列にバッチ識別子('batch_10k'等)を入れて INSERT し、完了時刻を記録 - Snowflake 側で
COUNT(*)を手動で繰り返し実行し、件数が揃った時刻を記録
ここでは「手順2の可視化時刻 − 手順1の INSERT 完了時刻」をラグと定義しました。
計測時点のパイプライン設定は以下のとおりです。
| 項目 | 値 |
|---|---|
time_interval |
1分 |
min_delay |
30秒 |
REFRESH_INTERVAL_SECONDS |
30秒 |
| 理論上のラグ | 約120秒(1分 + 30秒 + 30秒) |
今回は 1万・5万・10万行の3パターンで計測しました。以降のクエリ例は 1万行(batch_10k)の場合です。
-- 【Postgres】INSERT(例:10,000 行)
\timing on
> INSERT INTO orders (customer_id, amount, status, created_at, updated_at)
SELECT
(random() * 1000)::INT,
(random() * 10000)::NUMERIC(10,2),
'batch_10k',
NOW(),
NOW()
FROM generate_series(1, 10000);
INSERT 0 10000
Time: 34.695 ms
-- 【Postgres】INSERT 完了時刻を記録(updated_at = NOW() で挿入したため、レコードの updated_at と一致する)
> SELECT NOW() AS insert_time, COUNT(*) FROM orders WHERE status = 'batch_10k';
insert_time | count
-------------------------------+-------
2026-06-28 16:17:17.087108+09 | 10000
-- 【Snowflake】反映確認
> SELECT COUNT(*) AS sf_count FROM orders_from_postgres WHERE status = 'batch_10k';
+----------+
| SF_COUNT |
|----------|
| 10000 |
+----------+
> SELECT CURRENT_TIMESTAMP() AS visible_time;
+-------------------------------+
| VISIBLE_TIME |
|-------------------------------|
| 2026-06-28 16:19:27.165 +0900 |
+-------------------------------+
計測結果
その他(5万・10万行)も同様に計測した結果は以下の通りでした。
| バッチ | 投入行数 | INSERT 所要時間 | Snowflake 反映ラグ |
|---|---|---|---|
| batch_10k | 10,000 | 35 ms | 130秒 |
| batch_50k | 50,000 | 146 ms | 117秒 |
| batch_100k | 100,000 | 431 ms | 114秒 |
いずれも 114〜130秒程度 に収まりました。INSERT 行数が増えてもラグがほぼ変わらないことから、今回のラグの主要因はデータ量ではなくパイプラインのタイミングであることが推測できます。
INSERT 自体は行数に概ね比例しており、10万行でも 431ms でした。
さいごに
Snowflake Postgres の pg_lake・pg_incremental 拡張を使った PostgreSQL → Snowflake Iceberg テーブルへの増分同期を試してみました。
外部ボリューム・IAM・ETL ツールが一切不要で構成できる点が大きな特徴と思います。
こちらの内容がどなたかの参考になれば幸いです。






