GCS ファイル を Snowflake on AWS に Snowpipe で取り込んでみた。
こんにちは、みかみです。
カエルの季節になってきました。
夜、犬の散歩で外を歩いていると、カエルが足にぶつかってきてびっくりします。
カニが歩いているのもよく見かけます。
はじめに
Snowflake で外部テーブルを作成してファイルストレージ上のファイルが更新された場合、最新データを参照するには REFRESH が必要です。
同じクラウドプラットフォーム上であれば、AWS SQS や Google Cloud Pub/Sub などのメッセージングサービスを使って AUTO_REFRESH の設定が可能で、ユーザーは特に意識することなく最新データの参照が可能になります。
しかしこの外部テーブルの AUTO_REFRESH は、クロスクラウドでは利用できない機能で、GCS 上のファイルを参照する AWS 上の Snowflake の外部テーブルでは、AUTO_REFRESH が利用できません。
では同じくメッセージングサービスを使ってファイル put イベントを検出して COPY INTO を実行する Snowpipe は、本当にクロスクラウドでも使えるの?
ということで。
やりたいこと
- Snowpipe がクロスクラウドに対応していることを確認したい。
- Snowflake on AWS のテーブルに、GCS に出力されたファイルを Snowpipe で取り込みたい。
- GCS ファイルを取り込む Snowpipe の作成手順を確認したい。
前提
Google Cloud SDK(gcloud コマンド)の実行環境は準備済みであるものとします。 本エントリでは、Cloud Shell を使用しました。
また、BigQuery や GCS など各サービス操作に必要な API の有効化と権限は付与済みです。
Snowflake 環境側でも、必要なユーザーの作成や権限付与は実施済みです。
なお、文中、Google Cloud プロジェクト ID など一部の文字は伏字に変更しています。
本ブログの検証に使用した Snowflake の AWS と BigQuery のデータセットは、ともに東京リージョンです。
準備:Snowflake のストレージ統合を作成
取り込み対象ファイルが出力される GCS バケットに対する、ストレージ統合を作成して、ストレージ統合のサービスアカウントに GCS バケットへのアクセス権を付与します。
なお、本検証では、作成済みの以下のストレージ統合と外部ステージを使用します。


まだファイルを配置していないので外部ステージの LIST 結果は返ってきませんが、エラーが発生していないので正常に作成できています。
ストレージ統合と外部ステージの新規作成が必要な場合は、以下をご参照ください。
準備:Google Cloud Pub/Sub のトピック・サブスクリプションと通知を作成
GCS にファイルが配置された時に通知するための Cloud Pub/Sub トピックとサブスクリプションを作成して、Pub/Sub 通知を作成します。
以下の gcloud コマンドを実行して、Cloud Pub/Sub のトピック・サブスクリプションと通知を作成しました。
gcloud pubsub topics create snowpipe-test-mikami-topic
gcloud pubsub subscriptions create snowpipe-test-mikami-sub \
--topic=snowpipe-test-mikami-topic
gsutil notification create \
-t snowpipe-test-mikami-topic \
-f json \
-p snowpipe/ \
gs://test-mikami
gsutil notification create の -p オプションで、ファイル配置パスを指定しています。
合わせて、GCS バケットのファイルが配置されたときに Pub/Sub にメッセージを送信するための権限付与も必要です。
以下のコマンドで GCS バケットのサービスエージェントを取得します。
gcloud storage service-agent --project=[PROJECT_ID]
取得したサービスエージェントに、Pub/Sub パブリッシャー権限を付与しました。
gcloud pubsub topics add-iam-policy-binding snowpipe-test-mikami-topic \
--member="serviceAccount:service-[PROJECT_NUMBER]@gs-project-accounts.iam.gserviceaccount.com" \
--role="roles/pubsub.publisher"
Snowflake の通知統合を作成
Snowflake から Pub/Sub サブスクリプションにアクセスするための通知統合を作成します。
Snowflake で以下の SQL を実行して、通知統合を作成し、サービスアカウントを確認します。
CREATE NOTIFICATION INTEGRATION MIKAMI_GCS_SNOWPIPE_NOTIFICATION
TYPE = QUEUE
NOTIFICATION_PROVIDER = GCP_PUBSUB
ENABLED = TRUE
GCP_PUBSUB_SUBSCRIPTION_NAME = 'projects/[PROJECT_ID]/subscriptions/snowpipe-test-mikami-sub';
DESC INTEGRATION MIKAMI_GCS_SNOWPIPE_NOTIFICATION;
確認した通知統合のサービスアカウントに Pub/Sub メッセージ取得とモニタリングの権限を付与する必要があります。
以下の gcloud コマンドを実行して権限を付与しました。
gcloud pubsub subscriptions add-iam-policy-binding snowpipe-test-mikami-sub \
--member="serviceAccount:[SNOWFLAKE_SERVICE_ACCOUNT]@awsapnortheast1-0987.iam.gserviceaccount.com" \
--role="roles/pubsub.subscriber"
gcloud projects add-iam-policy-binding [PROJECT_ID] \
--member="serviceAccount:[SNOWFLAKE_SERVICE_ACCOUNT]@awsapnortheast1-0987.iam.gserviceaccount.com" \
--role="roles/monitoring.viewer"
準備:取り込み先テーブルを作成
Snowflake で以下の SQL を実行して、データ取り込み先テーブルを作成しておきます。
CREATE TABLE DOGS (
id NUMBER,
name STRING
);
検証では id と name 2項目の CSV ファイルを取り込む予定です。
テーブル定義は実際の取り込みファイルのフォーマットに合わせてご作成ください。
Snowpipe を作成してデータ取り込みを確認
準備できたので、Snowpipe を作成して GCS にファイルを配置し、ファイルデータを取り込みできるか確認してみます。
Snowflake で以下の SQL を実行して、Snowpipe を作成します。
CREATE OR REPLACE PIPE MIKAMI_DOGS_PIPE
AUTO_INGEST = TRUE
INTEGRATION = 'MIKAMI_GCS_SNOWPIPE_NOTIFICATION'
COMMENT = 'DOGS テーブルデータ取り込み'
AS
COPY INTO MIKAMI_DB.PUBLIC.DOGS
FROM (
SELECT $1::NUMBER, $2::STRING
FROM @MIKAMI_DB.PUBLIC.MIKAMI_GCS_TEST_STAGE/snowpipe/
)
FILE_FORMAT = (TYPE = 'CSV' FIELD_DELIMITER = ',' SKIP_HEADER = 1);
動作確認のため、BigQuery で以下の SQL を実行して、CSV ファイルを出力します。
EXPORT DATA
OPTIONS (
uri = 'gs://test-mikami/snowpipe/dogs_export_1_*.csv',
format = 'CSV',
overwrite = true,
header = true,
field_delimiter = ','
)
AS
SELECT *
FROM dataset_1.dogs
WHERE id > 5
;
なお、出力元の BigQuery テーブルには以下のデータが格納されている状態です。

ファイルが出力できました。
$ gcloud storage ls gs://test-mikami/snowpipe/*
gs://test-mikami/snowpipe/
gs://test-mikami/snowpipe/dogs_export_1_000000000000.csv
しばらく待ってから、Snowflake のテーブルにデータが取り込まれたか確認してみます。

想定通り、GCS に出力されたファイルのデータが Snowpipe で Snowflake テーブルに取り込まれたことが確認できました。
追加出力ファイルを取り込み
追加ファイルも正常に取り込まれるか確認します。
BigQuery で以下の SQL を実行して、CSV ファイルを出力します。
EXPORT DATA
OPTIONS (
uri = 'gs://test-mikami/snowpipe/dogs_export_2_*.csv',
format = 'CSV',
overwrite = true,
header = true,
field_delimiter = ','
)
AS
SELECT *
FROM dataset_1.dogs
WHERE id <= 5
;
追加ファイルが出力できました。
$ gcloud storage ls gs://test-mikami/snowpipe/*
gs://test-mikami/snowpipe/
gs://test-mikami/snowpipe/dogs_export_1_000000000000.csv
gs://test-mikami/snowpipe/dogs_export_2_000000000000.csv
Snowflake テーブルデータを確認してみます。

追加ファイルのデータも正常に取り込まれたことが確認できました。
まとめ(所感)
クロスクラウド環境でも、Snowpipe でデータ取り込みが正常に実行できることが確認できました。
各パブリッククラウドのネイティブサービスでデータパイプラインを構築する場合、クロスクラウドのデータ取り込みが必要なケースでは、ファイル転送サービスなどを利用して一度自クラウドのファイルストレージにファイル転送してから取り込む構成が多い印象です。
しかし Snowflake のストレージ統合を使えば、パブリッククラウドの壁をあまり意識することなく利用できるので、ユーザーにとってはとても使い勝手が良いと思います。
今回は Snowpipe の検証を行いましたが、Snowflake のタスクなどで COPY INTO を実行すればメッセージングサービスを利用せずともデータ取り込みできるので、クロスクラウドの壁はより低くなるのではないでしょうか。







