はじめに
Fivetran では、Destination に Amazon S3 を設定することが可能です。この際、宛先テーブルは AWS Glue データカタログの機能を使用した Apache Iceberg 形式のテーブルとして作成されます。
Snowflake では Iceberg テーブルとしてテーブルを作成しデータレイクに対してクエリする機能を提供しています。※執筆時点でパブリックプレビュー
またこの際 Snowflake では以下のカタログオプションをサポートしています。
- Snowflake を Iceberg カタログとして使用
- カタログ統合 を使用し外部の Iceberg カタログに接続
カタログ統合オブジェクトを使用することで、AWS Glue データカタログの機能へのアクセスも可能となります。ここではカタログ統合経由で Iceberg テーブルを Snowflake から参照してみました。
前提条件
以下の環境を使用しています。
- データソース
- RDS for SQL Server
- SQL Server Express Edition
- 2019 15.00.4365.2v.1
- インスタンスタイプ:db.t3.micro
- RDS for SQL Server
- Destination
- Amazon S3
- 同期方法
- Fivetran Teleport Sync
データソースへの Fivetran からの接続設定手順は以下の記事をご参照ください。
事前準備
検証用として SQLServer 上で以下のデータベース・テーブルを作成します。ここでは少し多めに1000件のレコードを追加しています。データ生成のコマンドは ChatGPT に作成してもらいました。
- データベースの作成
CREATE DATABASE testdb;
GO
- テーブルの作成、データの追加
-- テーブルの作成
CREATE TABLE sampledata (
id int primary key,
randomnumber int,
randomdate date,
randomstring varchar(100)
);
-- ランダムなデータの挿入
;WITH cte as (
SELECT TOP (1000)
row_number() over (order by (select null)) as rownum
FROM sys.columns a
CROSS JOIN sys.columns b
)
INSERT INTO sampledata (id, randomnumber, randomdate, randomstring)
SELECT
rownum,
abs(checksum(newid())) % 1000, -- 0から999までのランダムな数
dateadd(day, abs(checksum(newid())) % 3650, '2000-01-01'), -- 過去10年間のランダムな日付
left(convert(varchar(255), newid()), 10) -- ランダムな文字列
FROM cte;
確認
1> SELECT COUNT(*) FROM sampledata;
2> GO
-----------
1000
(1 行処理されました)
データの内容
1> SELECT TOP 10 * FROM sampledata;
2> GO
id randomnumber randomdate randomstring
----------- ------------ ---------------- ----------------------------------------------------------------------------------------------------
1 337 2006-02-18 B84E3B7A-A
2 19 2009-09-28 B911ED2D-7
3 216 2005-01-30 3C48E674-E
4 778 2006-10-19 5FE4AC7C-1
5 998 2001-02-28 D6A79E15-F
6 304 2004-03-31 5C8AEA83-9
7 447 2006-10-18 CD08016E-1
8 636 2000-11-13 0226AC41-6
9 365 2003-01-02 00B66B98-3
10 718 2004-08-01 6886EFE3-1
(10 行処理されました)
Fivetran 側の作業
上記の記事通り、データソース・Destination を設定し初期同期を行います。Destination の設定は下図の通りで Prefix は特に設定していません。
同期が完了すると下図の通り、ここでは Fivetran のコネクタ側で Destination schema prefix として指定した値とデータベーススキーマからなるプレフィックス配下に、テーブル名のパスが追加されていました。
さらにこの対象のテーブル名のパス配下はdata
,metadata
パスに分かれています。
名称通りデータファイルはs3://<バケット名>/<Destination schema prefix_スキーマ名>/<テーブル名>/data
に Parquet として生成されます。
Glue 側も確認すると Apache Iceberg 形式として Glue Data Catalog が追加されています。
データファイル自体は S3 にあるため、ストレージ統合を作成し Snowflake からクエリすることも可能です。スキーマ検出をしてみると下図のようになっていました。
--ストレージ統合
CREATE STORAGE INTEGRATION s3_int
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = 'S3'
ENABLED = TRUE
STORAGE_AWS_ROLE_ARN = '<IAM Role>'
STORAGE_ALLOWED_LOCATIONS = ('s3://<バケット名>/');
--ストレージ統合の情報を取得
DESC INTEGRATION s3_int;
--検証用DBの作成
CREATE DATABASE test;
USE SCHEMA test.public;
--ファイルフォーマットの作成
CREATE OR REPLACE FILE FORMAT my_parquet_format
TYPE = PARQUET;
--ステージの作成
CREATE OR REPLACE STAGE my_s3_stage
STORAGE_INTEGRATION = s3_int
URL = 's3://<バケット名>/sql_server_rds_dbo/sampledata/data'
FILE_FORMAT = my_parquet_format;
list @my_s3_stage;
--スキーマ検出
SELECT *
FROM TABLE(
INFER_SCHEMA(
LOCATION=>'@my_s3_stage'
, FILE_FORMAT=>'my_parquet_format'
)
);
Fivetran 経由での同期のため「_fivetran_deleted」と「_fivetran_synced」カラムが追加されています。
レコードの追加
どのようにデータファイルが追加・変更されるかを確認したかったので、まずはこのまま以下のコマンドでデータソース(SQLServer)側でレコードを 1000件追加してみます。これにより id の値が1001~ 2000のレコードが追加されます。
-- ランダムなデータをさらに1000レコード挿入
;WITH cte AS (
SELECT TOP (1000)
ROW_NUMBER() OVER (ORDER BY (SELECT NULL)) AS RowNum
FROM sys.columns a
CROSS JOIN sys.columns b
),
max_existing_id AS (
SELECT COALESCE(MAX(id), 0) AS max_id FROM sampledata
)
INSERT INTO sampledata (id, randomnumber, randomdate, randomstring)
SELECT
m.max_id + c.rownum,
ABS(CHECKSUM(NEWID())) % 1000, -- 0から999までのランダムな数
DATEADD(day, ABS(CHECKSUM(NEWID())) % 3650, '2000-01-01'), -- 過去10年間のランダムな日付
LEFT(CONVERT(VARCHAR(255), NEWID()), 10) -- ランダムな文字列
FROM cte c, max_existing_id m;
確認
1> SELECT COUNT(*) FROM sampledata;
2> GO
-----------
2000
(1 行処理されました)
Fivetran による同期が完了すると S3 側でデータファイルの追加が確認できます。
Snowflake からステージ経由で各ファイルの id の最小・最大値をクエリして確認してみます。
SELECT
MIN($1:id::NUMBER) AS min_id,
MAX($1:id::NUMBER) AS max_id,
METADATA$FILENAME AS filename
FROM @my_s3_stage
GROUP BY filename;
新規追加された Parquet ファイルの id 値は1001~2000 と意図する結果となっていました。
Snowflake 上で Iceberg テーブルを作成する
ここまでは、ステージのデータファイルに対して直接クエリしていたので以降の手順で Snowflake における Iceberg テーブルを作成し、S3 データレイクに対してクエリしてみます。Iceberg テーブルは以下の手順で作成します。
- Iceberg テーブル用に外部ボリュームを構成
- カタログ統合を作成
それぞれ基本的な流れはストレージ統合オブジェクトを作成する手順と同様です。
Iceberg テーブル用に外部ボリュームを構成する
はじめに外部ボリュームを作成します。ドキュメントは以下です。
Snowflake における Iceberg テーブルでは、データとメタデータファイルは外部のクラウドストレージ(Amazon S3、Google Cloud Storage、または Azure Storage)に保存されます。このうちメタデータファイルは、外部アカウントのストレージや Snowflake 管理のストレージを使用することも可能です。
データファイルに Snowflake からアクセスするためには外部ボリュームを定義します。下図は公式ドキュメントからの引用ですが、ストレージに対する記載の権限が必要です。メタデータファイルをどこで管理するかによって必要な権限が異なります。
ここでは公式ドキュメント記載のポリシー雛形をそのまま使用し以下の IAM ポリシーを定義しました。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:GetObject",
"s3:GetObjectVersion",
"s3:DeleteObject",
"s3:DeleteObjectVersion"
],
"Resource": "arn:aws:s3:::<バケット名>/sql_server_rds_dbo/*"
},
{
"Effect": "Allow",
"Action": [
"s3:ListBucket",
"s3:GetBucketLocation"
],
"Resource": "arn:aws:s3:::<バケット名>",
"Condition": {
"StringLike": {
"s3:prefix": [
"sql_server_rds_dbo/*"
]
}
}
}
]
}
あわせて IAM ロールも作成し、このポリシーをアタッチしておきます。
次に Snowflake 側で以下のコマンドを実行し、外部ボリュームを作成します。
CREATE OR REPLACE EXTERNAL VOLUME exvol
STORAGE_LOCATIONS =
(
(
NAME = 'my-s3-us-east-2'
STORAGE_PROVIDER = 'S3'
STORAGE_BASE_URL = 's3://<バケット名>/sql_server_rds_dbo/'
STORAGE_AWS_ROLE_ARN = '<上記のポリシーをアタッチしたIAMロールのArn>'
ENCRYPTION=(TYPE='AWS_SSE_S3')
)
);
外部ボリュームはアカウントレベルのオブジェクトでデフォルトでは ACCOUNTADMIN のみで作成可能です。NAME
で指定する値はアカウント内で一意の名称である必要があります。
CREATE EXTERNAL VOLUME | Snowflake DOCUMENTATION
外部ボリュームを作成後、以下のコマンドで Snowflake アカウント側の AWS IAM ユーザーの情報を取得します。
DESC EXTERNAL VOLUME exvol;
出力のSTORAGE_AWS_IAM_USER_ARN
,STORAGE_AWS_EXTERNAL_ID
プロパティの値を使用し、上記のポリシーをアタッチした IAM ロールの信頼関係を編集します。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Principal": {
"AWS": "<STORAGE_AWS_IAM_USER_ARN>"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": "<STORAGE_AWS_EXTERNAL_ID>"
}
}
}
]
}
外部ボリュームの構成手順は以上になります。
カタログ統合を作成する
続けて、ここでは AWS Glue データカタログを使用するため、カタログ統合オブジェクトを作成します。手順は以下に記載があります。
Snowflake からテーブルに関する情報にアクセスするために、Glue データカタログに対する以下の権限を持つ IAM ロールを作成します。
glue:GetTable
glue:GetTables
ドキュメントに記載がある通り以下のポリシーを作成します。accountid
を自身の id に置き換えます。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowGlueCatalogTableAccess",
"Effect": "Allow",
"Action": [
"glue:GetTable",
"glue:GetTables"
],
"Resource": [
"arn:aws:glue:*:<accountid>:table/*/*",
"arn:aws:glue:*:<accountid>:catalog",
"arn:aws:glue:*:<accountid>:database/<database-name>"
]
}
]
}
あわせて IAM ロールも作成し、このポリシーをアタッチしておきます。
次に Snowflake 側で以下のコマンドを実行しカタログ統合を作成します。
CREATE CATALOG INTEGRATION glueCatalogInt
CATALOG_SOURCE=GLUE
CATALOG_NAMESPACE='sql_server_rds_dbo'
TABLE_FORMAT=ICEBERG
GLUE_AWS_ROLE_ARN='<上記のポリシーをアタッチしたIAMロールのArn>'
GLUE_CATALOG_ID='<AWSアカウントのID>'
GLUE_REGION='us-east-2'
ENABLED=TRUE;
カタログ統合もアカウントレベルのオブジェクトで、デフォルトでは ACCOUNTADMIN のみで作成可能です。 CREATE CATALOG INTEGRATION | Snowflake DOCUMENTATION
外部ボリュームを作成後、以下のコマンドで Snowflake アカウント側の AWS IAM ユーザーの情報を取得します。
DESCRIBE CATALOG INTEGRATION glueCatalogInt;
出力のGLUE_IAM_USER_ARN
,GLUE_AWS_EXTERNAL_ID
プロパティの値を使用し、上記のポリシーをアタッチした IAM ロールの信頼関係を編集します。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Principal": {
"AWS": "<glue_iam_user_arn>"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": "<glue_aws_external_id>"
}
}
}
]
}
カタログ統合の構成手順は以上です。
Icebergテーブルを作成する
さいごにこれまで作成したオブジェクトを使用して Iceberg テーブルを作成します。作成には以下のコマンドを使用しました。
CATALOG_TABLE_NAME
に Glue 上のテーブル名を指定します。
CREATE ICEBERG TABLE myGlueTable
EXTERNAL_VOLUME='exvol'
CATALOG='glueCatalogInt'
CATALOG_TABLE_NAME='sampledata';
Icebergテーブルを作成する | Snowflake DOCUMENTATION
また、上記のドキュメントからの引用ですが、作成には下図の権限が必要です。
コマンド実行後、通常のテーブルのようにクエリすることができます。
SELECT * FROM mygluetable;
SELECT COUNT(*) FROM mygluetable;
Icebergテーブルへのクエリ権限
他のロールに Iceberg テーブルに対するクエリを許可するには、以下の権限を付与または継承させます。
- テーブルに関連付けられた外部ボリュームに対する USAGE 権限
- 本記事のように Snowflake で管理されていないカタログの場合は、カタログ統合に対する USAGE 権限
- Iceberg テーブルに対する SELECT 権限
- テーブルを作成したデータベースとスキーマへの USAGE 権限を含む
コマンドの例
--外部ボリュームに対する USAGE 権限
GRANT USAGE ON EXTERNAL VOLUME exvol TO ROLE <ロール>;
--外部カタログに対する USAGE 権限
GRANT USAGE ON INTEGRATION glueCatalogInt TO ROLE <ロール>;
--Icebergテーブルに対するSELECT権限
GRANT USAGE ON DATABASE <データベース> TO ROLE <ロール>;
GRANT USAGE ON SCHEMA <データベース>.<スキーマ> TO ROLE <ロール>;
GRANT SELECT ON ICEBERG TABLE <データベース>.<スキーマ>.<Icebergテーブル>TO ROLE <ロール>;
メタデータファイルの更新
既存レコードの更新
データソース(SQLServer)側で以下のコマンドを実行しレコードを更新してみます。
UPDATE sampledata
SET
randomnumber = 1,
randomdate = '2024-05-21',
randomstring = 'UPDATE'
WHERE id = 100;
GO
Fivetran による同期完了後 S3 のデータファイルを見てみると更新前は2つだった状態から下図の通り追加されていました。
Snowflake からステージ経由で各ファイルの id の最小・最大値をクエリして確認してみます。
SELECT
MIN($1:id::NUMBER) AS min_id,
MAX($1:id::NUMBER) AS max_id,
METADATA$FILENAME AS filename
FROM @my_s3_stage
GROUP BY filename;
出力は下図のようになっており、既存のレコードが格納されているデータファイルは残りつつも、変更があったレコードは別ファイルとして追加され、それ以外のレコードが格納されたデータファイルが追加されています。
Icebergテーブルを通して確認する際は以下のコマンドを実行し、メタデータファイルを更新してから参照します。
ALTER ICEBERG TABLE mygluetable REFRESH;
SELECT * FROM mygluetable WHERE id = 100;
下図の通り更新後のデータを確認できました。
Iceberg テーブルではなく、ストレージに対して通常のようにクエリすると、変更前のデータファイル自体は削除されていないため、同じ id の複数レコードが出力されます。
WITH temp AS (
SELECT
$1:id::NUMBER AS id
,$1:randomstring::TEXT AS randomstring
,$1:randomdate::DATE AS randomdate
,$1:randomnumber::NUMBER AS randomnumber
,$1:_fivetran_deleted::BOOLEAN AS _fivetran_deleted
,$1:_fivetran_synced::TIMESTAMP_NTZ AS _fivetran_synced
,METADATA$FILENAME AS filename
FROM @my_s3_stage
)
SELECT
*
FROM temp
WHERE id = 100;
Iceberg テーブルでは、カタログを通して変更前のデータファイルへのリンクが外れる形で最新のデータとしては出力されないように制御されます。
レコードの追加
さらにレコードを追加しみます。データソース(SQLServer)側で1000レコード追加し、3000件とし Fivetran で同期を行います。
Iceberg テーブル側では、以下でメタデータを更新しクエリすることで、内容が更新されます。
ALTER ICEBERG TABLE mygluetable REFRESH;
レコードの削除
データソース(SQLServer)側で以下のコマンドを実行し、レコードを削除します。
1> DELETE FROM sampledata
2> WHERE id = 100;
3> GO
(1 行処理されました)
Fivetran による同期後、メタデータファイルの更新前は当然変更が反映されていません。
SELECT * FROM mygluetable WHERE id = 100;
上記のコマンドを実行し、メタデータファイルを更新後再度クエリすると、下図の通り更新後のデータにアクセスできます。(Fivetran による同期のため論理削除となっています)
データファイルを確認すると、ファイルがひとつ追加される形で更新されていました。
ファイルに対してクエリしてみると、実データファイルとしてはレコードが残っていることを確認できます。(3レコード)
WITH temp AS (
SELECT
$1:id::NUMBER AS id
,$1:randomstring::TEXT AS randomstring
,$1:randomdate::DATE AS randomdate
,$1:randomnumber::NUMBER AS randomnumber
,$1:_fivetran_deleted::BOOLEAN AS _fivetran_deleted
,$1:_fivetran_synced::TIMESTAMP_NTZ AS _fivetran_synced
,METADATA$FILENAME AS filename
FROM @my_s3_stage
)
SELECT
*
FROM temp
WHERE id = 100
ORDER BY 6;
タイムトラベルの使用
Iceberg テーブルはタイムトラベルに対応しています。
例えば、以下のコマンドで論理削除前のデータを取得できます。
SELECT * FROM mygluetable AT(TIMESTAMP => 'Tue, 21 May 2024 20:00:00 +0900'::timestamp_ltz) WHERE id = 100;
スキーマ移行の検証
列を追加
データベース側で以下のコマンドを実行し、テーブルに列を追加します。
ALTER TABLE sampledata
ADD newcolumn VARCHAR(50);
GO
SQLServer 側で確認
1> SELECT TOP 10 * FROM sampledata;
2> GO
id randomnumber randomdate randomstring newcolumn
----------- ------------ ---------------- ------------------------ --------------------------------------------------
1 337 2006-02-18 B84E3B7A-A NULL
2 19 2009-09-28 B911ED2D-7 NULL
3 216 2005-01-30 3C48E674-E NULL
4 778 2006-10-19 5FE4AC7C-1 NULL
5 998 2001-02-28 D6A79E15-F NULL
6 304 2004-03-31 5C8AEA83-9 NULL
7 447 2006-10-18 CD08016E-1 NULL
8 636 2000-11-13 0226AC41-6 NULL
9 365 2003-01-02 00B66B98-3 NULL
10 718 2004-08-01 6886EFE3-1 NULL
(10 行処理されました)
Fivetran による同期後、メタデータファイルを更新し、Icebergテーブルを参照すると列が追加されてることを確認できます。
ALTER ICEBERG TABLE mygluetable REFRESH;
SELECT * FROM mygluetable LIMIT 10;
列を削除
次にデータソース側で列を削除します。
--randomstring列を削除
ALTER TABLE sampledata
DROP COLUMN randomstring;
GO
Fivetran による同期後、Snowflake 側で同様の手順を行いテーブルを参照すると対象の列の値が NULL に置き換わっていることを確認できます。
ALTER ICEBERG TABLE mygluetable REFRESH;
SELECT * FROM mygluetable LIMIT 10;
この際、すでに論理削除済みのレコードについては、この変更は反映されません。
テーブルを追加
さいごに、データソース側の同スキーマに以下のコマンドでテーブルを追加します。
CREATE TABLE customers (
customer_id INT PRIMARY KEY,
name VARCHAR(100),
email VARCHAR(100),
phone_number VARCHAR(15),
registration_date DATE
);
INSERT INTO customers (customer_id, name, email, phone_number, registration_date)
VALUES
(1, 'John Doe', 'john.doe@example.com', '123-456-7890', '2024-01-01'),
(2, 'Jane Smith', 'jane.smith@example.com', '123-456-7891', '2024-01-02'),
(3, 'Alice Johnson', 'alice.johnson@example.com', '123-456-7892', '2024-01-03'),
(4, 'Chris Lee', 'chris.lee@example.com', '123-456-7893', '2024-01-04'),
(5, 'Patricia Brown', 'patricia.brown@example.com', '123-456-7894', '2024-01-05');
GO
SQLServer で確認
1> SELECT * FROM customers;
2> GO
customer_id name email phone_number registration_date
----------- ---------------------------------------------------------------------------------------------------- ---------------------------------------------------------------------------------------------------- --------------- -----------------
1 John Doe john.doe@example.com 123-456-7890 2024-01-01
2 Jane Smith jane.smith@example.com 123-456-7891 2024-01-02
3 Alice Johnson alice.johnson@example.com 123-456-7892 2024-01-03
4 Chris Lee chris.lee@example.com 123-456-7893 2024-01-04
5 Patricia Brown patricia.brown@example.com 123-456-7894 2024-01-05
(5 行処理されました)
Fivetran 側でも変更が検知されます。
同期後、S3 側でもテーブル名からなる新しいパスが追加されます。データファイルやメタデータファイルは、テーブルごとのパス配下に格納されます。
Snowflake 側でも Iceberg テーブルとして作成できます。
CREATE ICEBERG TABLE myIcebergTable_customers
EXTERNAL_VOLUME='exvol'
CATALOG='glueCatalogInt'
CATALOG_TABLE_NAME='customers';
作成後、これまでと同様の手順で参照できるようになります。
SELECT * FROM myIcebergTable_customers;
さいごに
Fivetran から S3 にデータを同期し、一部ではありますが、データ更新時にデータファイル自体はどのように変更されるのかを確認しつつ、実際にデータレイクに対して Snowflake の Iceberg テーブルからクエリしてみました。Glue 側の操作も Fivetran 側で実施してくれる点は便利だなと感じました。こちらの内容が何かの参考になれば幸いです。