SnowflakeのデータをDatabricks Lakeflow ConnectのQuery-based ingestionで取り込む手順を試してみた

SnowflakeのデータをDatabricks Lakeflow ConnectのQuery-based ingestionで取り込む手順を試してみた

2026.05.26

かわばたです。

SnowflakeからDatabricksへデータを定期的に取り込みたいというニーズに対して、Databricks Lakeflow ConnectのQuery-based ingestionを使うことで、カスタムETLパイプラインを構築せずに差分取り込みを実現できないか試してみました。

https://docs.databricks.com/aws/en/ingestion/lakeflow-connect/query-based-overview

https://docs.databricks.com/aws/en/ingestion/lakeflow-connect/query-based-pipeline

https://docs.databricks.com/aws/en/ingestion/lakeflow-connect/query-based-reference

https://docs.databricks.com/aws/en/ingestion/lakeflow-connect/query-based-limits

手順と確認結果をまとめます。

背景・課題

SnowflakeにあるデータをDatabricks側に取り込み、Lakehouse上のテーブルとして利用したいケースがあります。たとえば以下のような状況です。

  • SnowflakeのマスタデータをDatabricksの分析基盤に取り込みたい
  • Snowflake側で更新されたデータを定期的にDatabricks側に反映したい
  • できるだけカスタムETLやサードパーティツールに依存せず、Databricksの機能で完結させたい

こうしたケースでは、差分取り込み(incremental ingestion)の仕組みが必要になります。自前でETLパイプラインを構築・運用するのは開発・保守コストがかかるため、Databricksの標準機能で実現できないか検討しました。

技術的アプローチ

今回はDatabricks Lakeflow ConnectのQuery-based ingestionを使って、SnowflakeからDatabricksへの差分データ取り込みを検証します。

Query-based ingestionは、ソースデータベースに対して直接クエリを実行し、updated_atのような**cursor column(カーソル列)**を基準に前回実行時より新しい行を取り込む方式です。

今回の全体構成は以下のとおりです。

制限事項

2026年5月25日時点での制限事項です。

  • Query-based ingestionはPublic Previewの機能です。GAまでに仕様が変わる可能性があります
  • cursor columnは単一列のみ指定可能です。composite cursor(複数列の組み合わせ)はサポートされていません
  • cursor columnの値は単調増加する必要があります。timestamp/date型、数値型、バイナリ型、文字列型に対応しています
  • cursor columnがNULLの行は取り込まれません
  • continuous ingestion(継続的な取り込み)には対応していません。スケジュール実行でソースをクエリする方式です。低レイテンシが必要な場合はmanaged CDC database connectorの利用を検討してください
  • 以下の機能はAPI経由でのみ利用可能です(UIからは設定できません)
    • 行フィルタリング
    • ソフト削除トラッキング
    • ハード削除トラッキング(Beta)
    • APPEND_ONLYモード
    • マルチ宛先カタログ・スキーマ

コスト

  • Databricks側: サーバレスパイプラインのDBUが発生します。2026年5月25日時点では公式ドキュメントにQuery-based ingestion固有のコストに関する詳細な記載はありませんでした
  • Snowflake側: パイプライン実行時にはSnowflake側でクエリが実行されるため、warehouseが起動または稼働し、Snowflake側のクレジットが発生します。スケジュール間隔を短くしすぎるとコストが増加する点にご注意ください

事前準備

Snowflake側のユーザー・ロール・権限の準備

まずSnowflake側で、Databricksから読み取り専用でアクセスするためのユーザーとロールを作成します。

USE ROLE ACCOUNTADMIN;

-- Databricks接続用ロールの作成
CREATE ROLE IF NOT EXISTS DATABRICKS_READ_ROLE;

-- Databricks接続用ユーザーの作成(パスワード認証の場合。検証用途)
CREATE USER IF NOT EXISTS DATABRICKS_USER
  PASSWORD = '<STRONG_PASSWORD>'
  DEFAULT_ROLE = DATABRICKS_READ_ROLE
  DEFAULT_WAREHOUSE = <SNOWFLAKE_WAREHOUSE_NAME>
  MUST_CHANGE_PASSWORD = FALSE;

GRANT ROLE DATABRICKS_READ_ROLE TO USER DATABRICKS_USER;

-- Warehouse使用権限の付与
GRANT USAGE ON WAREHOUSE <SNOWFLAKE_WAREHOUSE_NAME>
  TO ROLE DATABRICKS_READ_ROLE;

-- Database / Schema / Table の読み取り権限の付与
GRANT USAGE ON DATABASE <SNOWFLAKE_DATABASE_NAME>
  TO ROLE DATABRICKS_READ_ROLE;

GRANT USAGE ON SCHEMA <SNOWFLAKE_DATABASE_NAME>.<SNOWFLAKE_SCHEMA_NAME>
  TO ROLE DATABRICKS_READ_ROLE;

GRANT SELECT ON ALL TABLES IN SCHEMA <SNOWFLAKE_DATABASE_NAME>.<SNOWFLAKE_SCHEMA_NAME>
  TO ROLE DATABRICKS_READ_ROLE;

GRANT SELECT ON FUTURE TABLES IN SCHEMA <SNOWFLAKE_DATABASE_NAME>.<SNOWFLAKE_SCHEMA_NAME>
  TO ROLE DATABRICKS_READ_ROLE;

PEM private key認証を使う場合は、PASSWORDの代わりにRSA_PUBLIC_KEYを設定します。

-- Databricks接続用ユーザーの作成(PEM private key認証、本番向け)
CREATE USER IF NOT EXISTS DATABRICKS_USER
  TYPE = SERVICE
  RSA_PUBLIC_KEY = '<PUBLIC_KEY_WITHOUT_HEADER_FOOTER>'
  DEFAULT_ROLE = DATABRICKS_READ_ROLE
  DEFAULT_WAREHOUSE = <SNOWFLAKE_WAREHOUSE_NAME>;

Databricks Secretsの準備

Snowflakeの認証情報はSQLに平文で書かず、Databricks Secretsに保存します。

https://docs.databricks.com/aws/en/security/secrets/

Databricks Secretsの管理にはDatabricks CLI、Secrets API、SDKを利用できます。
本記事ではDatabricks CLIを使用します。

https://docs.databricks.com/aws/en/dev-tools/cli/install

# Secret Scopeの作成
databricks secrets create-scope snowflake-prod
# ユーザー名の保存
databricks secrets put-secret snowflake-prod sf-user

先ほど作成したユーザー名を入力
2026-05-26_11h59_36

# パスワードの保存
databricks secrets put-secret snowflake-prod sf-password

Secret ScopeとSecret keyが作成されていることを確認できました。
2026-05-26_13h46_30

PEM private keyを使う場合は以下も保存します。

databricks secrets put-secret snowflake-prod sf-pem-private-key

Snowflake Connectionの作成

Databricks側でSnowflakeへのConnectionを作成します。

https://docs.databricks.com/aws/en/query-federation/snowflake-basic-auth

https://docs.databricks.com/aws/en/query-federation/snowflake-pem

UIで作成する場合

  1. Databricks Workspaceを開く
  2. 左メニューからCatalogを開く
  3. 画面上部のAddアイコンをクリック
  4. Create a connectionを選択

2026-05-26_13h51_47
5. Connection nameを入力(例: snowflake_prod_conn
6. Connection typeSnowflakeを選択
7. 認証方式を選択(Username and passwordまたはPEM private key
2026-05-26_13h53_36
8. 接続情報を入力

2026-05-26_13h56_40

項目 入力例
Host <account_identifier>.snowflakecomputing.com
Port 443
User DATABRICKS_USER
Password Databricks Secretに保存した値
Snowflake warehouse <SNOWFLAKE_WAREHOUSE_NAME>
Snowflake role DATABRICKS_READ_ROLE
  1. Test connectionをクリック

接続テストが成功すればOKです。

SQLで作成する場合: Username / Password(クリックで展開)

Databricks SQL EditorまたはNotebookで実行します。

CREATE CONNECTION snowflake_prod_conn TYPE snowflake
OPTIONS (
  host '<account_identifier>.snowflakecomputing.com',
  port '443',
  sfWarehouse '<SNOWFLAKE_WAREHOUSE_NAME>',
  user secret ('snowflake-prod', 'sf-user'),
  password secret ('snowflake-prod', 'sf-password'),
  sfRole 'DATABRICKS_READ_ROLE'
);
SQLで作成する場合: PEM private key(クリックで展開)
CREATE CONNECTION snowflake_prod_conn TYPE snowflake
OPTIONS (
  host '<account_identifier>.snowflakecomputing.com',
  port '443',
  sfWarehouse '<SNOWFLAKE_WAREHOUSE_NAME>',
  user secret ('snowflake-prod', 'sf-user'),
  pem_private_key secret ('snowflake-prod', 'sf-pem-private-key'),
  expires_in_secs '3600',
  sfRole 'DATABRICKS_READ_ROLE'
);

PEM private keyを使う場合、Snowflake JDBCドライバーは暗号化された秘密鍵での認証に対応していないため、-nocryptオプション付きで鍵を生成する必要があります。

Foreign Catalogの作成

次に、SnowflakeのdatabaseをDatabricks Unity Catalog上のForeign Catalogとして登録します。

https://docs.databricks.com/aws/en/query-federation/foreign-catalogs

UIで作成する場合

Connection作成ウィザードの後続画面でForeign Catalogを作成できます。
2026-05-26_14h40_38

項目 入力例
Catalog name sf_prod
Connection snowflake_prod_conn
Database <SNOWFLAKE_DATABASE_NAME>

2026-05-26_14h44_07

SQLで作成する場合

CREATE FOREIGN CATALOG sf_prod
USING CONNECTION snowflake_prod_conn
OPTIONS (
  database = '<SNOWFLAKE_DATABASE_NAME>'
);

作成後、Databricks上では以下のように参照できます。

SELECT * FROM sf_prod.<snowflake_schema_name>.<snowflake_table_name>

Snowflake上のデータをDatabricksで参照することができました。
2026-05-26_14h51_30

取り込み先catalog/schemaの作成

Databricks側で取り込み先を用意します。

CREATE CATALOG IF NOT EXISTS bronze;
CREATE SCHEMA IF NOT EXISTS bronze.snowflake_sales;

イベントログ用のschemaも作成しておきます。

CREATE CATALOG IF NOT EXISTS ops;
CREATE SCHEMA IF NOT EXISTS ops.lakeflow_event_logs;

2026-05-26_15h23_01

試してみた

Data Ingestionからパイプラインを作成する

Databricks Workspaceの左メニューからData Ingestionを開き、Add data画面でSnowflakeを選択します。

2026-05-26_15h24_57

パイプライン名を入力します。

ingest_snowflake_sales_prod

イベントログの場所で、事前に作成したcatalog/schemaを選択します。

項目
catalog ops
schema lakeflow_event_logs

2026-05-26_15h26_06

Connection typeとForeign Catalogを選択する

次の画面で以下を設定します。

項目 設定値
Connection type Foreign catalog
Foreign catalog sf_prod
Destination catalog bronze

2026-05-26_15h27_29

取り込み対象のschema/tableを選択する

Sourceページで、Snowflake側のschema/tableを選択します。

2026-05-26_15h33_33

cursor columnとprimary keyを設定する

各テーブルに対してcursor columnを指定します。cursor columnは差分取り込みの基準となる列で、値が単調増加する必要があります。

テーブル cursor column
raw_orders UPDATED_AT

2026-05-26_15h40_40

cursor columnの選び方

良い例:

  • updated_atlast_modified_atmodified_timestamp — 更新があるテーブルに最適
  • created_atidrow_id — append-only(追記のみ)のテーブルなら使えます

避けるべき例:

  • statuscategorycustomer_id — 単調増加しない列は使えません
  • created_at更新があるテーブルでは使わないでください。既存レコードの更新を検知できません

履歴追跡を選択する

用途に応じて履歴追跡モードを選択します。

下記ドキュメントより、SCD_TYPE_2の履歴追跡の設定が可能です。

https://docs.databricks.com/aws/en/ingestion/lakeflow-connect/query-based-pipeline#configure-history-tracking-scd

【SCD_TYPE_2について】

https://docs.databricks.com/aws/en/ingestion/lakeflow-connect/scd

取り込み先を設定する

Destinationページで、Databricks側の取り込み先を指定します。宛先テーブル名を明示的に変更しない場合、ソーステーブル名がそのまま使われます。今回はソーステーブル名と同じraw_ordersとして作成される構成にしています。

項目
Destination catalog bronze
Destination schema snowflake_sales

2026-05-26_15h48_17

スケジュールと通知を設定する

Settingsページでスケジュールを設定します。

最初は手動実行から始め、問題なければ1時間ごとに設定するのが安全です。

通知設定では、本番運用では失敗時の通知を必ず設定することを推奨します。
2026-05-26_15h51_47

パイプラインを保存・実行する

Save and run pipelineをクリックします。

2026-05-26_16h07_01

実行結果を確認する

取り込みが完了したら、Databricks SQL Editorで確認します。

SHOW TABLES IN bronze.snowflake_sales;
SELECT COUNT(*)
FROM bronze.snowflake_sales.raw_orders;

2026-05-26_16h16_25

Snowflake側と件数を比較します。

Snowflake側:

SELECT COUNT(*)
FROM <DATABASE>.<SCHEMA>.raw_orders;

初回はfull loadになるため、Snowflake側と件数が一致して確認できました。2回目以降はcursor columnに基づいて増分取り込みされます。

2026-05-26_16h17_25

最後に

Lakeflow ConnectのQuery-based ingestionを使って、SnowflakeからDatabricksへの差分データ取り込みパイプラインを構築できることを確認しました。

便利だと感じた部分は、cursor columnを指定するだけで差分取り込みが実装できてパイプラインの構築が容易にできる部分が素晴らしいと思いました。

まずは小規模なテーブルで手動実行から試してみて、cursor columnやprimary keyの設計を検証してから本番導入を進めるのがおすすめです。

この記事が何かの参考になれば幸いです!


Snowflakeの導入支援はクラスメソッドに!

クラスメソッドでは Snowflake の導入を支援しております。
製品の詳細や支援の内容についてお気軽にお問い合わせください。

Snowflakeの詳細を見る

この記事をシェアする

関連記事