SnowflakeでS3に対するストレージ統合を設定して、COPY INTO、Snowpipeでデータをロードしてみた
かわばたです。
SnowflakeでS3に対するストレージ統合を設定して、COPY INTO、Snowpipeでデータをロードすることを試していきます。
今回行うこと
- ストレージ統合
- COPY INTOでデータロード
- Snowpipeでデータロード
検証環境
今回はSnowflakeトライアルアカウントのEnterprise版で試していきます。
検証のため、権限はACCOUNTADMIN
を利用しています。
ストレージ統合
Amazon S3
にアクセスするためのSnowflakeストレージ統合を行っていきます。
ストレージ統合を使用して、Snowflakeが外部(つまり、S3)ステージで参照されるAmazon S3バケットに対してデータを読み書きできるようにする方法について説明します。統合は、名前付きのファーストクラスのSnowflakeオブジェクトであり、秘密キーまたはアクセストークンといった、クラウドプロバイダーの明示的な認証情報を渡す必要がありません。統合オブジェクトには、 AWS IDおよびアクセス管理(IAM)ユーザー IDが保存されます。> 組織の管理者が、 AWS アカウントの統合 IAM ユーザー権限を付与します。
上記公式ドキュメントからの引用となりますが、簡単に説明するとストレージ統合とは、AWSのアクセスキーを使わずに、SnowflakeとS3を安全に連携させる仕組みとなります。
実際に試していきましょう。
S3バケットのアクセス許可を構成する
Snowflakeでは、ファイルにアクセスできるようにするために、S3バケットおよびフォルダーに対する以下の権限が最低でも必要となります。
権限 | 内容 |
---|---|
s3:GetBucketLocation | Amazon S3 バケットが存在するリージョンを返すアクセス許可を付与します |
s3:GetObject | Amazon S3 からオブジェクトを取得するためのアクセス許可を付与します |
s3:GetObjectVersion | 特定のバージョンのオブジェクトを取得するためのアクセス許可を付与します |
s3:ListBucket | Amazon S3 バケット内のオブジェクトの一部またはすべてを一覧表示するアクセス許可を付与します |
公式ドキュメント
上記権限を含めてIAM ポリシーの作成を行っていきます。
1.AWS管理コンソールからIAMを検索して開きます。
2. 上記画面が開くので、アカウント設定をクリックします。
エンドポイントリストのSTSステータスがアクティブか確認します。
この際、リージョンはSnowflakeのリージョンと同じである必要があります。
3. 1のポリシーをクリックします。その後、左上部にあるポリシーの作成
をクリックします。
赤枠のJSON
を選択し、ポリシーエディタに下記コードを記載します。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:GetObject",
"s3:GetObjectVersion",
"s3:DeleteObject",
"s3:DeleteObjectVersion"
],
"Resource": "arn:aws:s3:::<bucket>/<prefix>/*"
},
{
"Effect": "Allow",
"Action": [
"s3:ListBucket",
"s3:GetBucketLocation"
],
"Resource": "arn:aws:s3:::<bucket>",
"Condition": {
"StringLike": {
"s3:prefix": [
"<prefix>/*"
]
}
}
}
]
}
その際、<bucket>
と<prefix>
には実際のバケット名とフォルダーパスプレフィックスに置き換えてください。
※今回は検証用のため、上記の権限としていますが、必要に応じて編集することを推奨しています。
4. ポリシー名を記載して、ポリシーを作成してください。
IAM AWS ロールを作成する
S3バケットのアクセス許可を構成する
の1と同様に、AWS管理コンソールからIAMを検索して開きロール
をクリックします。- 左上部にある
ロールを作成
をクリックします。 - 下記画面に遷移するので、
AWSアカウント
を選択します。
別のAWSアカウント
を選択し、自身のAWSアカウントIDを入力します。(後ほど変更します)
オプションの外部IDを要求するを選択し、外部IDに0000
を入力し、次へ
をクリックします。(後ほど変更します)
- 作成したポリシーを選択し、
次へ
をクリックします。
- ロールの名前と説明を入力し、
ロールを作成
を選択します。
Snowflakeでクラウドストレージ統合を作成する
作成した、ロールのARN
を調べます。
- AWS管理コンソールからIAMを検索して開き
ロール
をクリックします。 - 作成したロールを選択し、クリックします。
- 概要部分に
ARN
が記載されているので、コピーしておきます。
Snowflakeの操作に移ります。
ワークシートを開き、下記クエリを実行します。
--ストレージ統合の作成
CREATE STORAGE INTEGRATION <integration_name>
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = S3
ENABLED = TRUE
STORAGE_AWS_ROLE_ARN = ''
STORAGE_ALLOWED_LOCATIONS = ('s3://mybucket20250715/snowflake/')
CREATE STORAGE INTEGRATION
には任意の名称を記載します。STORAGE_AWS_ROLE_ARN
にコピーしたARN
を記載します。STORAGE_ALLOWED_LOCATIONS
には実際のバケット名とフォルダーパスプレフィックスを記載します。
Snowflakeアカウントの AWS IAM ユーザーを取得する
Snowflakeアカウント用に自動的に作成された IAMユーザーの ARN を取得するには、DESCRIBE INTEGRATION
を使用します。
1.下記クエリをSnowflakeワークシートで実行します。
integration_name
はさきほど作成したストレージの名称を記載します。
DESC INTEGRATION <integration_name>;
- 出力された結果の
property
カラムのSTORAGE_AWS_IAM_USER_ARN
とSTORAGE_AWS_EXTERNAL_ID
の値(property_value
)をコピーしておきます。
バケットオブジェクトにアクセスするために IAM ユーザー権限を付与する
- AWS管理コンソールからIAMを検索して開き
ロール
をクリックします。 IAM AWS ロールを作成する
で作成したロールを選択します。- 下記画面の
信頼関係
タブを開き、信頼ポリシーを編集
をクリックします。
4.エディタに下記クエリを記載し、右側下部のポリシーを更新
で更新します。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Principal": {
"AWS": "<snowflake_user_arn>"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": "<snowflake_external_id>"
}
}
}
]
}
snowflake_user_arn
にはSTORAGE_AWS_IAM_USER_ARN
の値を記載snowflake_external_id
にはSTORAGE_AWS_EXTERNAL_ID
の値を記載
外部ステージを作成する
作成したストレージ統合を参照する、外部ステージを作成します。
事前に、保存するデータベース・スキーマ・テーブル・ファイルフォーマットを作成しましょう。
今回はCSVデータを連携予定のため、下記クエリでファイルフォーマットを作成しました。
CREATE OR REPLACE FILE FORMAT my_csv_format
TYPE = 'CSV',
FIELD_DELIMITER = ',',
SKIP_HEADER = 1;
※ファイルフォーマット公式ドキュメント
下記クエリを実行し、外部ステージを作成します。
USE SCHEMA mydb.public;
CREATE STAGE my_s3_stage
STORAGE_INTEGRATION = s3_int
URL = 's3://bucket1/path1/'
FILE_FORMAT = my_csv_format;
my_s3_stage
には任意のステージ名を記載します。STORAGE_INTEGRATION
には作成したストレージ統合名を記載してください。URL
には実際のバケット名とフォルダーパスプレフィックスに置き換えます。my_csv_format
は該当のファイルフォーマットを記載してください
- S3のオブジェクト
- Snowflakeのステージ
上記の通り、S3オブジェクトをステージから参照できました。
COPY INTOでデータロード
ここから、COPY INTO <テーブル>
を実行して、データをターゲットテーブルにロードします。
下記クエリを実行します。
--データロード
COPY INTO KAWABATA_RAW_DB.public.user_id
FROM @kawabata_s3_stage
PATTERN='.*users.*.csv';
FROM
は作成した、ステージ名を記載します。PATTERN
は今回'users'で始まるファイルのみロードするようにしています。
users.csvは生成AIでダミーデータを作成しました。詳細下記のとおりです。
user_id | user_name | created_at |
---|---|---|
101 | 田中 太郎 | 2024-01-15 |
102 | 鈴木 花子 | 2024-02-20 |
103 | 佐藤 次郎 | 2024-03-10 |
104 | 高橋 三郎 | 2024-04-05 |
105 | 伊藤 四郎 | 2024-05-21 |
Snowflake上で該当のテーブルを確認したところ、データがロードされていることが確認できました。
Snowpipeによるデータロード
Snowpipeでもデータをロードしていきます。
さきほど、ストレージ統合項目で外部ステージまで作成しましたが、Snowpipeでも必要となります。
今回は検証のため同じものを使用しますが、状況により作成することを推奨します。
概要
Snowpipeは、データウェアハウスサービスであるSnowflakeに、データを継続的かつ自動的にロードするための機能です。
あらかじめ定義したデータ形式(例:CSV、JSON、Parquetなど)と構造でデータが連携されることを前提としています。
定義と異なる形式のファイルや、データ構造が違うファイル、破損したファイルが連携された場合、そのファイルはエラーとなりロードないので注意が必要です。
自動インジェストを有効にしたパイプを作成する
今回連携するファイルはorders.csv
を使用します。(同様に生成AIで作成しています。)
order_id | user_id | product_id | quantity | order_date | status |
---|---|---|---|---|---|
3001 | 101 | 201 | 5 | 2024-06-01 | completed |
3002 | 102 | 204 | 1 | 2024-06-01 | completed |
3003 | 101 | 205 | 2 | 2024-06-02 | shipped |
3004 | 103 | 202 | 10 | 2024-06-03 | completed |
3005 | 104 | 203 | 3 | 2024-06-05 | shipped |
3006 | 102 | 201 | 2 | 2024-06-05 | pending |
3007 | 105 | 205 | 1 | 2024-06-08 | completed |
3008 | 101 | 204 | 1 | 2024-06-10 | pending |
事前にorders.csv
を格納するテーブルを作成しておきましょう。
-- snowpipe利用
CREATE PIPE KAWABATA_RAW_DB.public.kawabata_order_id
AUTO_INGEST = TRUE
AS
COPY INTO KAWABATA_RAW_DB.public.order_id
FROM @KAWABATA_RAW_DB.public.kawabata_s3_stage
FILE_FORMAT = my_csv_format;
CREATE PIPE
にはターゲットテーブルを記載します。FROM
は作成した、ステージ名を記載します。
イベント通知を構成する
- 下記クエリを実行します。
SHOW PIPES;
出力された結果から、notification_channel
カラムの値をコピーしておきます。
ここから再びAWS側での作業となります。
2. Amazon S3 コンソールに移動し汎用バケット
を選択します。
3. 該当のバケットを選択すると下記画面となりプロパティ
を選択します。
4. イベント通知セクションに移動し、イベント通知作成
を選択します。
5. 一般的な設定部分セクションのイベント名
を記載します。
6.イベントタイプセクションはすべてのオブジェクト作成イベント
にチェックを入れます。
7. 送信先セクションではSQSキュー
を選択,SQSキューの特定はSQSキューARNを入力
を選択します。
SQSキューはSHOW PIPES
で出力したnotification_channel
カラムの値を入力します。
入力が完了したら、変更を保存
します。
これで該当のS3バケットにオブジェクトが追加されたら、Snowpipeに通知して、それらをパイプで定義されたターゲットテーブルにロードします。
実際に確認してみましょう。
S3にorders.csv
をアップロードします。
Snowflake側を確認すると
しっかりとデータロードが出来ていました。
最後に
いかがでしたでしょうか。
今回はデータロードの代表的な二つを紹介させていただきました。
データロードの仕方や連携先については他にも、外部テーブルへ自動更新
やウェブインターフェイスを使用したデータのロード
,Dynamic Table
等あります。
状況に合わせてご活用ください。
この記事が参考になれば幸いです。