Fivetran で S3 を Destination に設定し作成された Iceberg テーブルを Snowflake から参照してみた

2024.05.22

はじめに

Fivetran では、Destination に Amazon S3 を設定することが可能です。この際、宛先テーブルは AWS Glue データカタログの機能を使用した Apache Iceberg 形式のテーブルとして作成されます。

Snowflake では Iceberg テーブルとしてテーブルを作成しデータレイクに対してクエリする機能を提供しています。※執筆時点でパブリックプレビュー
またこの際 Snowflake では以下のカタログオプションをサポートしています。

  • Snowflake を Iceberg カタログとして使用
  • カタログ統合 を使用し外部の Iceberg カタログに接続

カタログ統合オブジェクトを使用することで、AWS Glue データカタログの機能へのアクセスも可能となります。ここではカタログ統合経由で Iceberg テーブルを Snowflake から参照してみました。

前提条件

以下の環境を使用しています。

  • データソース
    • RDS for SQL Server
      • SQL Server Express Edition
      • 2019 15.00.4365.2v.1
    • インスタンスタイプ:db.t3.micro
  • Destination
    • Amazon S3
  • 同期方法
    • Fivetran Teleport Sync

データソースへの Fivetran からの接続設定手順は以下の記事をご参照ください。

事前準備

検証用として SQLServer 上で以下のデータベース・テーブルを作成します。ここでは少し多めに1000件のレコードを追加しています。データ生成のコマンドは ChatGPT に作成してもらいました。

  • データベースの作成
CREATE DATABASE testdb;
GO
  • テーブルの作成、データの追加
-- テーブルの作成
CREATE TABLE sampledata (
    id int primary key,
    randomnumber int,
    randomdate date,
    randomstring varchar(100)
);

-- ランダムなデータの挿入
;WITH cte as (
    SELECT TOP (1000) 
        row_number() over (order by (select null)) as rownum
    FROM sys.columns a
    CROSS JOIN sys.columns b
)
INSERT INTO sampledata (id, randomnumber, randomdate, randomstring)
SELECT 
    rownum,
    abs(checksum(newid())) % 1000, -- 0から999までのランダムな数
    dateadd(day, abs(checksum(newid())) % 3650, '2000-01-01'), -- 過去10年間のランダムな日付
    left(convert(varchar(255), newid()), 10) -- ランダムな文字列
FROM cte;

確認

1> SELECT COUNT(*) FROM sampledata;
2> GO

-----------
       1000

(1 行処理されました)

データの内容

1> SELECT TOP 10 * FROM sampledata;
2> GO
id          randomnumber randomdate       randomstring                                                                  
----------- ------------ ---------------- ----------------------------------------------------------------------------------------------------
          1          337       2006-02-18 B84E3B7A-A                                                                    
          2           19       2009-09-28 B911ED2D-7                                                                    
          3          216       2005-01-30 3C48E674-E                                                                    
          4          778       2006-10-19 5FE4AC7C-1                                                                    
          5          998       2001-02-28 D6A79E15-F                                                                    
          6          304       2004-03-31 5C8AEA83-9                                                                    
          7          447       2006-10-18 CD08016E-1                                                                    
          8          636       2000-11-13 0226AC41-6                                                                    
          9          365       2003-01-02 00B66B98-3                                                                    
         10          718       2004-08-01 6886EFE3-1                                                                    

(10 行処理されました)

Fivetran 側の作業

上記の記事通り、データソース・Destination を設定し初期同期を行います。Destination の設定は下図の通りで Prefix は特に設定していません。

同期が完了すると下図の通り、ここでは Fivetran のコネクタ側で Destination schema prefix として指定した値とデータベーススキーマからなるプレフィックス配下に、テーブル名のパスが追加されていました。

さらにこの対象のテーブル名のパス配下はdata ,metadataパスに分かれています。

名称通りデータファイルはs3://<バケット名>/<Destination schema prefix_スキーマ名>/<テーブル名>/dataに Parquet として生成されます。
Glue 側も確認すると Apache Iceberg 形式として Glue Data Catalog が追加されています。

データファイル自体は S3 にあるため、ストレージ統合を作成し Snowflake からクエリすることも可能です。スキーマ検出をしてみると下図のようになっていました。

--ストレージ統合
CREATE STORAGE INTEGRATION s3_int
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'S3'
  ENABLED = TRUE
  STORAGE_AWS_ROLE_ARN = '<IAM Role>'
  STORAGE_ALLOWED_LOCATIONS = ('s3://<バケット名>/');

--ストレージ統合の情報を取得
DESC INTEGRATION s3_int;

--検証用DBの作成
CREATE DATABASE test;
USE SCHEMA test.public;

--ファイルフォーマットの作成
CREATE OR REPLACE FILE FORMAT my_parquet_format
  TYPE = PARQUET;

--ステージの作成
CREATE OR REPLACE STAGE my_s3_stage
  STORAGE_INTEGRATION = s3_int
  URL = 's3://<バケット名>/sql_server_rds_dbo/sampledata/data'
  FILE_FORMAT = my_parquet_format;

list @my_s3_stage;

--スキーマ検出
SELECT *
  FROM TABLE(
    INFER_SCHEMA(
      LOCATION=>'@my_s3_stage'
      , FILE_FORMAT=>'my_parquet_format'
      )
 );

Fivetran 経由での同期のため「_fivetran_deleted」と「_fivetran_synced」カラムが追加されています。

レコードの追加

どのようにデータファイルが追加・変更されるかを確認したかったので、まずはこのまま以下のコマンドでデータソース(SQLServer)側でレコードを 1000件追加してみます。これにより id の値が1001~ 2000のレコードが追加されます。

-- ランダムなデータをさらに1000レコード挿入
;WITH cte AS (
    SELECT TOP (1000) 
        ROW_NUMBER() OVER (ORDER BY (SELECT NULL)) AS RowNum
    FROM sys.columns a
    CROSS JOIN sys.columns b
),
max_existing_id AS (
    SELECT COALESCE(MAX(id), 0) AS max_id FROM sampledata
)
INSERT INTO sampledata (id, randomnumber, randomdate, randomstring)
SELECT 
    m.max_id + c.rownum,
    ABS(CHECKSUM(NEWID())) % 1000, -- 0から999までのランダムな数
    DATEADD(day, ABS(CHECKSUM(NEWID())) % 3650, '2000-01-01'), -- 過去10年間のランダムな日付
    LEFT(CONVERT(VARCHAR(255), NEWID()), 10) -- ランダムな文字列
FROM cte c, max_existing_id m;

確認

1> SELECT COUNT(*) FROM sampledata;
2> GO

-----------
       2000

(1 行処理されました)

Fivetran による同期が完了すると S3 側でデータファイルの追加が確認できます。

Snowflake からステージ経由で各ファイルの id の最小・最大値をクエリして確認してみます。

SELECT 
	MIN($1:id::NUMBER) AS min_id,
	MAX($1:id::NUMBER) AS max_id,
	METADATA$FILENAME AS filename        
FROM @my_s3_stage
GROUP BY filename;

新規追加された Parquet ファイルの id 値は1001~2000 と意図する結果となっていました。

Snowflake 上で Iceberg テーブルを作成する

ここまでは、ステージのデータファイルに対して直接クエリしていたので以降の手順で Snowflake における Iceberg テーブルを作成し、S3 データレイクに対してクエリしてみます。Iceberg テーブルは以下の手順で作成します。

  • Iceberg テーブル用に外部ボリュームを構成
  • カタログ統合を作成

それぞれ基本的な流れはストレージ統合オブジェクトを作成する手順と同様です。

Iceberg テーブル用に外部ボリュームを構成する

はじめに外部ボリュームを作成します。ドキュメントは以下です。

Snowflake における Iceberg テーブルでは、データとメタデータファイルは外部のクラウドストレージ(Amazon S3、Google Cloud Storage、または Azure Storage)に保存されます。このうちメタデータファイルは、外部アカウントのストレージや Snowflake 管理のストレージを使用することも可能です。

データファイルに Snowflake からアクセスするためには外部ボリュームを定義します。下図は公式ドキュメントからの引用ですが、ストレージに対する記載の権限が必要です。メタデータファイルをどこで管理するかによって必要な権限が異なります。

ここでは公式ドキュメント記載のポリシー雛形をそのまま使用し以下の IAM ポリシーを定義しました。

{
   "Version": "2012-10-17",
   "Statement": [
         {
            "Effect": "Allow",
            "Action": [
               "s3:PutObject",
               "s3:GetObject",
               "s3:GetObjectVersion",
               "s3:DeleteObject",
               "s3:DeleteObjectVersion"
            ],
            "Resource": "arn:aws:s3:::<バケット名>/sql_server_rds_dbo/*"
         },
         {
            "Effect": "Allow",
            "Action": [
               "s3:ListBucket",
               "s3:GetBucketLocation"
            ],
            "Resource": "arn:aws:s3:::<バケット名>",
            "Condition": {
               "StringLike": {
                     "s3:prefix": [
                        "sql_server_rds_dbo/*"
                     ]
               }
            }
         }
   ]
}

あわせて IAM ロールも作成し、このポリシーをアタッチしておきます。

次に Snowflake 側で以下のコマンドを実行し、外部ボリュームを作成します。

CREATE OR REPLACE EXTERNAL VOLUME exvol
   STORAGE_LOCATIONS =
      (
         (
            NAME = 'my-s3-us-east-2'
            STORAGE_PROVIDER = 'S3'
            STORAGE_BASE_URL = 's3://<バケット名>/sql_server_rds_dbo/'
            STORAGE_AWS_ROLE_ARN = '<上記のポリシーをアタッチしたIAMロールのArn>'
            ENCRYPTION=(TYPE='AWS_SSE_S3')
         )
      );

外部ボリュームはアカウントレベルのオブジェクトでデフォルトでは ACCOUNTADMIN のみで作成可能です。NAME で指定する値はアカウント内で一意の名称である必要があります。
CREATE EXTERNAL VOLUME | Snowflake DOCUMENTATION

外部ボリュームを作成後、以下のコマンドで Snowflake アカウント側の AWS IAM ユーザーの情報を取得します。

DESC EXTERNAL VOLUME exvol;

出力のSTORAGE_AWS_IAM_USER_ARN ,STORAGE_AWS_EXTERNAL_ID プロパティの値を使用し、上記のポリシーをアタッチした IAM ロールの信頼関係を編集します。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "",
      "Effect": "Allow",
      "Principal": {
        "AWS": "<STORAGE_AWS_IAM_USER_ARN>"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "sts:ExternalId": "<STORAGE_AWS_EXTERNAL_ID>"
        }
      }
    }
  ]
}

外部ボリュームの構成手順は以上になります。

カタログ統合を作成する

続けて、ここでは AWS Glue データカタログを使用するため、カタログ統合オブジェクトを作成します。手順は以下に記載があります。

Snowflake からテーブルに関する情報にアクセスするために、Glue データカタログに対する以下の権限を持つ IAM ロールを作成します。

  • glue:GetTable
  • glue:GetTables

ドキュメントに記載がある通り以下のポリシーを作成します。accountid を自身の id に置き換えます。

{
   "Version": "2012-10-17",
   "Statement": [
      {
         "Sid": "AllowGlueCatalogTableAccess",
         "Effect": "Allow",
         "Action": [
            "glue:GetTable",
            "glue:GetTables"
         ],
         "Resource": [
            "arn:aws:glue:*:<accountid>:table/*/*",
            "arn:aws:glue:*:<accountid>:catalog",
            "arn:aws:glue:*:<accountid>:database/<database-name>"
         ]
      }
   ]
}

あわせて IAM ロールも作成し、このポリシーをアタッチしておきます。

次に Snowflake 側で以下のコマンドを実行しカタログ統合を作成します。

CREATE CATALOG INTEGRATION glueCatalogInt
   CATALOG_SOURCE=GLUE
   CATALOG_NAMESPACE='sql_server_rds_dbo'
   TABLE_FORMAT=ICEBERG
   GLUE_AWS_ROLE_ARN='<上記のポリシーをアタッチしたIAMロールのArn>'
   GLUE_CATALOG_ID='<AWSアカウントのID>'
   GLUE_REGION='us-east-2'
   ENABLED=TRUE;

カタログ統合もアカウントレベルのオブジェクトで、デフォルトでは ACCOUNTADMIN のみで作成可能です。 CREATE CATALOG INTEGRATION | Snowflake DOCUMENTATION

外部ボリュームを作成後、以下のコマンドで Snowflake アカウント側の AWS IAM ユーザーの情報を取得します。

DESCRIBE CATALOG INTEGRATION glueCatalogInt;

出力のGLUE_IAM_USER_ARN ,GLUE_AWS_EXTERNAL_IDプロパティの値を使用し、上記のポリシーをアタッチした IAM ロールの信頼関係を編集します。

{
   "Version": "2012-10-17",
   "Statement": [
      {
      "Sid": "",
      "Effect": "Allow",
      "Principal": {
         "AWS": "<glue_iam_user_arn>"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
         "StringEquals": {
            "sts:ExternalId": "<glue_aws_external_id>"
         }
      }
      }
   ]
}

カタログ統合の構成手順は以上です。

Icebergテーブルを作成する

さいごにこれまで作成したオブジェクトを使用して Iceberg テーブルを作成します。作成には以下のコマンドを使用しました。
CATALOG_TABLE_NAMEに Glue 上のテーブル名を指定します。

CREATE ICEBERG TABLE myGlueTable
  EXTERNAL_VOLUME='exvol'
  CATALOG='glueCatalogInt'
  CATALOG_TABLE_NAME='sampledata';

Icebergテーブルを作成する | Snowflake DOCUMENTATION

また、上記のドキュメントからの引用ですが、作成には下図の権限が必要です。

コマンド実行後、通常のテーブルのようにクエリすることができます。

SELECT * FROM mygluetable;
SELECT COUNT(*) FROM mygluetable;

Icebergテーブルへのクエリ権限

他のロールに Iceberg テーブルに対するクエリを許可するには、以下の権限を付与または継承させます。

  • テーブルに関連付けられた外部ボリュームに対する USAGE 権限
  • 本記事のように Snowflake で管理されていないカタログの場合は、カタログ統合に対する USAGE 権限
  • Iceberg テーブルに対する SELECT 権限
    • テーブルを作成したデータベースとスキーマへの USAGE 権限を含む

コマンドの例

--外部ボリュームに対する USAGE 権限
GRANT USAGE ON EXTERNAL VOLUME exvol TO ROLE <ロール>;
--外部カタログに対する USAGE 権限
GRANT USAGE ON INTEGRATION glueCatalogInt TO ROLE <ロール>;
--Icebergテーブルに対するSELECT権限
GRANT USAGE ON DATABASE <データベース> TO ROLE <ロール>;
GRANT USAGE ON SCHEMA <データベース>.<スキーマ> TO ROLE <ロール>;
GRANT SELECT ON ICEBERG TABLE <データベース>.<スキーマ>.<Icebergテーブル>TO ROLE <ロール>;

メタデータファイルの更新

既存レコードの更新

データソース(SQLServer)側で以下のコマンドを実行しレコードを更新してみます。

UPDATE sampledata
SET
    randomnumber = 1,
    randomdate = '2024-05-21',
    randomstring = 'UPDATE'
WHERE id = 100;
GO

Fivetran による同期完了後 S3 のデータファイルを見てみると更新前は2つだった状態から下図の通り追加されていました。

Snowflake からステージ経由で各ファイルの id の最小・最大値をクエリして確認してみます。

SELECT 
	MIN($1:id::NUMBER) AS min_id,
	MAX($1:id::NUMBER) AS max_id,
	METADATA$FILENAME AS filename        
FROM @my_s3_stage
GROUP BY filename;

出力は下図のようになっており、既存のレコードが格納されているデータファイルは残りつつも、変更があったレコードは別ファイルとして追加され、それ以外のレコードが格納されたデータファイルが追加されています。

Icebergテーブルを通して確認する際は以下のコマンドを実行し、メタデータファイルを更新してから参照します。

ALTER ICEBERG TABLE mygluetable REFRESH;
SELECT * FROM mygluetable WHERE id = 100;

下図の通り更新後のデータを確認できました。

Iceberg テーブルではなく、ストレージに対して通常のようにクエリすると、変更前のデータファイル自体は削除されていないため、同じ id の複数レコードが出力されます。

WITH temp AS (
    SELECT 
        $1:id::NUMBER AS id
        ,$1:randomstring::TEXT AS randomstring
        ,$1:randomdate::DATE AS randomdate
        ,$1:randomnumber::NUMBER AS randomnumber
        ,$1:_fivetran_deleted::BOOLEAN AS _fivetran_deleted
        ,$1:_fivetran_synced::TIMESTAMP_NTZ AS _fivetran_synced
        ,METADATA$FILENAME AS filename
    FROM @my_s3_stage
)
SELECT 
    *
FROM temp
WHERE id  = 100;

Iceberg テーブルでは、カタログを通して変更前のデータファイルへのリンクが外れる形で最新のデータとしては出力されないように制御されます。

レコードの追加

さらにレコードを追加しみます。データソース(SQLServer)側で1000レコード追加し、3000件とし Fivetran で同期を行います。
Iceberg テーブル側では、以下でメタデータを更新しクエリすることで、内容が更新されます。

ALTER ICEBERG TABLE mygluetable REFRESH;

レコードの削除

データソース(SQLServer)側で以下のコマンドを実行し、レコードを削除します。

1> DELETE FROM sampledata
2> WHERE id = 100;
3> GO

(1 行処理されました)

Fivetran による同期後、メタデータファイルの更新前は当然変更が反映されていません。

SELECT * FROM mygluetable WHERE id = 100;

上記のコマンドを実行し、メタデータファイルを更新後再度クエリすると、下図の通り更新後のデータにアクセスできます。(Fivetran による同期のため論理削除となっています)

データファイルを確認すると、ファイルがひとつ追加される形で更新されていました。

ファイルに対してクエリしてみると、実データファイルとしてはレコードが残っていることを確認できます。(3レコード)

WITH temp AS (
    SELECT 
        $1:id::NUMBER AS id
        ,$1:randomstring::TEXT AS randomstring
        ,$1:randomdate::DATE AS randomdate
        ,$1:randomnumber::NUMBER AS randomnumber
        ,$1:_fivetran_deleted::BOOLEAN AS _fivetran_deleted
        ,$1:_fivetran_synced::TIMESTAMP_NTZ AS _fivetran_synced
        ,METADATA$FILENAME AS filename
    FROM @my_s3_stage
)
SELECT 
    *
FROM temp
WHERE id  = 100
ORDER BY 6;

タイムトラベルの使用

Iceberg テーブルはタイムトラベルに対応しています。

例えば、以下のコマンドで論理削除前のデータを取得できます。

SELECT * FROM mygluetable AT(TIMESTAMP => 'Tue, 21 May 2024 20:00:00 +0900'::timestamp_ltz) WHERE id = 100;

スキーマ移行の検証

列を追加

データベース側で以下のコマンドを実行し、テーブルに列を追加します。

ALTER TABLE sampledata
ADD newcolumn VARCHAR(50);
GO

SQLServer 側で確認

1> SELECT TOP 10 * FROM sampledata;
2> GO
id          randomnumber randomdate       randomstring             newcolumn
----------- ------------ ---------------- ------------------------ --------------------------------------------------
          1          337       2006-02-18 B84E3B7A-A               NULL
          2           19       2009-09-28 B911ED2D-7               NULL
          3          216       2005-01-30 3C48E674-E               NULL
          4          778       2006-10-19 5FE4AC7C-1               NULL
          5          998       2001-02-28 D6A79E15-F               NULL
          6          304       2004-03-31 5C8AEA83-9               NULL
          7          447       2006-10-18 CD08016E-1               NULL
          8          636       2000-11-13 0226AC41-6               NULL
          9          365       2003-01-02 00B66B98-3               NULL
         10          718       2004-08-01 6886EFE3-1               NULL

(10 行処理されました)

Fivetran による同期後、メタデータファイルを更新し、Icebergテーブルを参照すると列が追加されてることを確認できます。

ALTER ICEBERG TABLE mygluetable REFRESH;
SELECT * FROM mygluetable LIMIT 10;

列を削除

次にデータソース側で列を削除します。

--randomstring列を削除
ALTER TABLE sampledata
DROP COLUMN randomstring;
GO

Fivetran による同期後、Snowflake 側で同様の手順を行いテーブルを参照すると対象の列の値が NULL に置き換わっていることを確認できます。

ALTER ICEBERG TABLE mygluetable REFRESH;
SELECT * FROM mygluetable LIMIT 10;

この際、すでに論理削除済みのレコードについては、この変更は反映されません。

テーブルを追加

さいごに、データソース側の同スキーマに以下のコマンドでテーブルを追加します。

CREATE TABLE customers (
    customer_id INT PRIMARY KEY,
    name VARCHAR(100),
    email VARCHAR(100),
    phone_number VARCHAR(15),
    registration_date DATE
);
INSERT INTO customers (customer_id, name, email, phone_number, registration_date)
VALUES
    (1, 'John Doe', 'john.doe@example.com', '123-456-7890', '2024-01-01'),
    (2, 'Jane Smith', 'jane.smith@example.com', '123-456-7891', '2024-01-02'),
    (3, 'Alice Johnson', 'alice.johnson@example.com', '123-456-7892', '2024-01-03'),
    (4, 'Chris Lee', 'chris.lee@example.com', '123-456-7893', '2024-01-04'),
    (5, 'Patricia Brown', 'patricia.brown@example.com', '123-456-7894', '2024-01-05');
GO

SQLServer で確認

1> SELECT * FROM customers;
2> GO
customer_id name                                                                                                 email                                                                                                phone_number    registration_date
----------- ---------------------------------------------------------------------------------------------------- ---------------------------------------------------------------------------------------------------- --------------- -----------------
          1 John Doe                                                                                             john.doe@example.com                                                                                 123-456-7890           2024-01-01
          2 Jane Smith                                                                                           jane.smith@example.com                                                                               123-456-7891           2024-01-02
          3 Alice Johnson                                                                                        alice.johnson@example.com                                                                            123-456-7892           2024-01-03
          4 Chris Lee                                                                                            chris.lee@example.com                                                                                123-456-7893           2024-01-04
          5 Patricia Brown                                                                                       patricia.brown@example.com                                                                           123-456-7894           2024-01-05

(5 行処理されました)

Fivetran 側でも変更が検知されます。

同期後、S3 側でもテーブル名からなる新しいパスが追加されます。データファイルやメタデータファイルは、テーブルごとのパス配下に格納されます。

Snowflake 側でも Iceberg テーブルとして作成できます。

CREATE ICEBERG TABLE myIcebergTable_customers
  EXTERNAL_VOLUME='exvol'
  CATALOG='glueCatalogInt'
  CATALOG_TABLE_NAME='customers';

作成後、これまでと同様の手順で参照できるようになります。

SELECT * FROM myIcebergTable_customers;

さいごに

Fivetran から S3 にデータを同期し、一部ではありますが、データ更新時にデータファイル自体はどのように変更されるのかを確認しつつ、実際にデータレイクに対して Snowflake の Iceberg テーブルからクエリしてみました。Glue 側の操作も Fivetran 側で実施してくれる点は便利だなと感じました。こちらの内容が何かの参考になれば幸いです。