BigQuery tables for Apache Icebergで定義されたテーブルをSnowflakeのIceberg Tableとしてクエリできるようにしてみた

BigQuery tables for Apache Icebergで定義されたテーブルをSnowflakeのIceberg Tableとしてクエリできるようにしてみた

Clock Icon2025.02.13

さがらです。

先日、海外のMediumで下記の記事が投稿されていました。端的に述べると「BigQueryでのみエクスポート可能なFirebaseなどのデータをSnowflakeにもロードしたい時、Icebergを介すことでパイプラインを効率化」という内容の記事です。(画像はリンク先より引用)

https://medium.com/snowflake/eliminating-redundancies-in-etl-with-iceberg-tables-on-snowflake-cff678d49bd3

2025-02-09_08h03_55

この記事の内容を参考に、自分も実際にやってみたので、その内容をまとめてみます。

BigQuery tables for Apache Icebergでテーブル定義

まず、BigQuery tables for Apache Icebergでテーブル定義を作成する必要があります。

今回は下記の記事を参考に、下図のようなテーブルを定義しておきました。

https://dev.classmethod.jp/articles/apache-iceberg-bigquery-cm_google_cloud_adcal_2024/

2025-02-09_08h07_22

2025-02-09_08h07_45

2025-02-09_08h08_21

GCSのバケットの内容は下図のようになっています。

2025-02-09_08h09_02

2025-02-09_08h09_40

2025-02-09_08h10_23

この状態で、以下のEXPORT TABLE METADATAコマンドを実行します。これにより、metadataフォルダが更新され最新の状態が反映されます。(このEXPORT TABLE METADATAコマンドに関する公式Docs

EXPORT TABLE METADATA FROM <作成したテーブル名>;

2025-02-13_06h50_57

SnowflakeからGCSに対するExternal Volumeの定義

次に、先ほどBigQueryで作成したIcebergテーブルに対してアクセスできるように、SnowflakeからGCSに対するExternal Volumeを定義していきます。

https://docs.snowflake.com/en/user-guide/tables-iceberg-configure-external-volume-gcs

まず、以下のクエリを実行してGCSへのExternal Volumeを定義します。

CREATE EXTERNAL VOLUME <任意のExternal Volume名>
  STORAGE_LOCATIONS =
    (
      (
        NAME = '<Snowflakeアカウント内で一意となる任意の識別子>'
        STORAGE_PROVIDER = 'GCS'
        STORAGE_BASE_URL = 'gcs://<バケット名>/<フォルダ>'
      )
    );

次に、作成したExternal Volumeに対して以下のクエリを実行し、サービスアカウントの値を取得します。

DESC EXTERNAL VOLUME <作成したExternal Volume名>;

2025-02-10_11h18_13

次に、Google Cloud上で下記の権限を持つカスタムIAMロールを作成します。

  • storage.buckets.get
  • storage.objects.create
  • storage.objects.delete
  • storage.objects.get
  • storage.objects.list

2025-02-10_11h30_28

先ほどIcebergテーブルを作成したバケットに対して、作成したカスタムIAMロールをSnowflakeで確認したサービスアカウントに対して付与します。

2025-02-10_11h40_11

最後に、下記のクエリを実行して"success":trueの結果が返ってくれば、External Volumeの定義は完了です!

SELECT SYSTEM$VERIFY_EXTERNAL_VOLUME('<作成したExternal Volume名>');

2025-02-10_11h49_52

Catalog Integrationの定義

次に、GCS上のIcebergのメタデータファイルを参照させるためのCatalog Integrationを定義します。

CREATE OR REPLACE CATALOG INTEGRATION <任意のCatalog Integration名>
  CATALOG_SOURCE = OBJECT_STORE
  TABLE_FORMAT = ICEBERG
  ENABLED = TRUE;

Iceberg Tableの定義

続いて、これまで定義した各オブジェクトを用いて、BigQuery tables for Apache Icebergで定義されたテーブルを、SnowflakeのIceberg Tableとしてクエリできるように定義します。

CREATE ICEBERG TABLE <任意のテーブル名>
   EXTERNAL_VOLUME='<作成したExternal Volume名>'
   CATALOG='<作成したCatalog Integration名>'
   METADATA_FILE_PATH='flights/metadata/v1739351732.metadata.json'; -- GCS上の最新の〇〇metadata.jsonを指定

この上で、このテーブルに対してSELECT文を実行してみると、無事にクエリが出来ました!

2025-02-13_07h01_45

最新のメタデータファイルを参照するためのストアドプロシージャの定義

これまでの手順でBigQueryで定義したIcebergテーブルをSnowflakeでクエリすることができました。

しかし、このままだとテーブル作成時にメタデータファイルを指定しているため、BigQuery側でデータを更新してもすぐに反映させることが出来ません。

そのため、今回は参考にしているブログに書いてあったストアドプロシージャを用いて、Snowflakeで定義したIcebergテーブルが最新のメタデータファイルを参照するようにしてみます。

事前準備:対象のGCSに対する外部ステージを作成

このストアドプロシージャでは外部ステージに対してファイル名や更新日などを取得するクエリを実行しているため、事前に対象のGCSに対する外部ステージを作成しておく必要があります。

こちらの外部ステージの作成については、下記の記事が参考になると思います。

https://dev.classmethod.jp/articles/snowflake-gcs-as-external-stage/

ストアドプロシージャの定義

次に、最新のメタデータファイルを参照できるようにするためのストアドプロシージャを定義します。

下記は、実際に私がテーブル名とステージ名を書き換えて作成したものとなります。

CREATE OR REPLACE PROCEDURE REFRESH_ICEBERG_METADATA()
RETURNS VARCHAR
LANGUAGE JAVASCRIPT
AS
$$
    // Step 1: Get current metadata location
    var sql_get_metadata = "SELECT SYSTEM$GET_ICEBERG_TABLE_INFORMATION('SAGARA_RAWDATA_DB.ICEBERG_SCHEMA.FLIGHTS_FROM_GCS') AS METADATA_INFO";
    var stmt_metadata = snowflake.createStatement({sqlText: sql_get_metadata});
    var result_metadata = stmt_metadata.execute();

    var current_metadata = '';
    if (result_metadata.next()) {
      var metadata_info = result_metadata.getColumnValue(1);
      current_metadata = JSON.parse(metadata_info).metadataLocation;
    }

    // Step 2: Identify latest metadata file on stage
    var sql_new_file = `
      SELECT METADATA$FILENAME AS FILE_PATH
      FROM @sagara_gcs_stage (PATTERN => 'flights/metadata/.*metadata.json')
      ORDER BY METADATA$FILE_LAST_MODIFIED DESC
      LIMIT 1
    `;
    var stmt_new_file = snowflake.createStatement({sqlText: sql_new_file});
    var rs_new_file = stmt_new_file.execute();
    var new_metadata_file_path = rs_new_file.next() ? rs_new_file.getColumnValue("FILE_PATH") : '';

    // Step 3: Refresh if changed
    if (new_metadata_file_path && !current_metadata.includes(new_metadata_file_path)) {
      var alter_cmd = "ALTER ICEBERG TABLE SAGARA_RAWDATA_DB.ICEBERG_SCHEMA.FLIGHTS_FROM_GCS REFRESH '" + new_metadata_file_path + "'";
      var stmt_alter = snowflake.createStatement({sqlText: alter_cmd});
      stmt_alter.execute();
      return 'Metadata updated with latest file: ' + new_metadata_file_path;
    } else {
      return 'No metadata refresh needed.';
    }
$$;

ストアドプロシージャの動作確認

まず、BigQuery上で以下のクエリを実行し、新しいデータをINSERTして、EXPORT TABLE METADATAコマンドでGCS上のメタデータもアップデートします。

INSERT INTO `data_set_iceberg.flights` (FL_DATE,DEP_DELAY,ARR_DELAY,AIR_TIME,DISTANCE,DEP_TIME,ARR_TIME)
VALUES
('2024-07-01',25,35,150,800,22.167,0.667),
('2024-07-02',30,45,155,800,23.083,1.667),
('2024-07-03',20,30,152,800,21.917,0.417),
('2024-07-04',15,25,148,800,22.500,0.917),
('2024-07-05',40,55,153,800,23.333,1.917);

EXPORT TABLE METADATA FROM `data_set_iceberg.flights`;

※余談:上述のEXPORT TABLE METADATAのクエリを実行したときに下図のエラーが出たのですが、GCSを見るとメタデータファイルは更新されていたのでよくわからずです…BigQuery for Apache Icebergがプレビュー機能なこともあり動作が不安定なところがあるかもしれません。

2025-02-13_09h17_33

この上で、BigQueryとSnowflakeそれぞれから対象のIcebergテーブルを確認すると、BigQueryの方のみ最新のデータとなっています。これはSnowflakeのIcebergテーブルは古いメタデータファイルを参照しているためです。

  • BigQueryからの実行結果

2025-02-13_09h20_21

  • Snowflakeからの実行結果

2025-02-13_09h21_13

この上で、先程定義したストアドプロシージャを実行して、その上でもう一度SnowflakeからIcebergテーブルの内容を確認してみます。

2025-02-13_09h22_14

すると、下図のようにBigQueryで新しくINSERTしたデータがSnowflake上でも確認できるようになりました!Snowflakeから定期的にこのストアドプロシージャを実行すれば、BigQueryで更新された内容を元に、Snowflakeでも最新のデータに対してクエリすることができますね。

2025-02-13_09h22_56

最後に

Mediumの記事を参考に、BigQuery tables for Apache Icebergで定義されたテーブルをSnowflakeのIceberg Tableとしてクエリできるようにしてみました。

メタデータファイルの更新が少し手間ではあるのですが、これまでのBigQuery⇛Snowflakeのデータパイプラインの構築方法が抜本的に変わるアイデアだと思います!!(Mediumの記事の執筆者の方に感謝ですね!)

先日Google CloudはBigQuery metastoreも発表しています。BigQuery metastoreをカタログとして、BigQueryからSQLでIcebergテーブルの作成・更新ができるようになると、BigQuery metastoreを介して最新のメタデータを参照できるようになると思うので、今後のアップデートに期待しています!(現状はBigQuery metastoreでのテーブルの作成・更新がSparkなどの外部エンジンからのAPI経由でしか出来ない認識です。)

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.