Azure Blob Storage を外部ステージに設定し Snowpipe によるデータロードを試してみる #SnowflakeDB
はじめに
Snowflake で Azure の Blob Storage を外部ステージに設定し、Snowpipe によるデータロードを試してみましたので、その内容をまとめてみます。
前提条件
以下の環境で検証しています。
- Snowflake
- クラウドサービス:Microsoft Azure
- リージョン:japaneast
- ストレージ アカウントのリージョン:japaneast
- リソースグループは作成済み
- 単一のストレージ アカウントに対するストレージ統合オブジェクトによる連携設定は実施済み
- こちらは以下の記事で作成済みの環境を使用しています
Azure Snowpipe の構成
Azure での Snowpipe の構成については以下に記載があります。
各構成要素
Azure の Snowpipe 構成にあたり、以下の機能を使用します。
- Azure Queue Storage(ストレージキュー)
- メッセージを格納するためのサービス
- Snowpipe では、新しいデータファイルが Azure Blob Storage にアップロードされたときに、Event Grid から送られるイベント通知(メッセージ)を一時的に保管する場所として使用される
- Azure Event Grid
- Azure 内で発生するイベント(例:Blob Storage へのファイルアップロード など)を、指定した宛先(エンドポイント)にルーティングするサービス
- イベント サブスクリプション
- サブスクリプションを通して、Event Grid に「どのリソースの、どのようなイベントを、どの宛先に送るか」を設定できる
- Snowpipe の構成では、以下の設定とすることで、新しいファイルが Blob Storage にアップロードされるたびに、Event Grid がそのイベントを検知し、指定されたストレージキューに通知を送信する
- ソース:データファイルが保存される Azure Blob Storage
- イベント:Blobが作成されたイベント(
PutBlob
など) - 宛先:ストレージキュー
上記の各構成要素を使用し、以下のようなデータフローとなります。
- Blob Storage に新しいファイルがアップロードされる
- このイベントを、Blob Storage を監視している Event Grid が検知する
- Event Grid は、設定されたイベントサブスクリプションに基づいて、その通知をストレージキューに送信する
- この通知をサービスプリンシパルを通じて Snowflake のパイプが検知し、Blob Storage からステージにファイルをコピーする
- Snowflake の仮想ウェアハウスが、キューに入れられたファイルからターゲットテーブルにデータをロードする
ストレージアカウントの構成
Snowpipe 構成時は、ストレージキューを作成することになるため、ストレージキューの作成先として、汎用v2
のストレージ アカウントが必要です。
データファイル向けにはこちらに記載のあるいずれからのタイプのストレージアカウントを使用できます。
考慮事項
- ストレージキューの構成
- 通知統合の上限
- Azure の Snowpipe の場合、ストレージキューごとに通知統合を作成する必要があります
- 以下にまとめられているように、この通知統合には上限があるため、上記のようにまずはアカウントで単一の通知統合(ストレージキュー)から始めるとよいとされています
試してみる
前提条件として、ストレージ アカウントは作成済みで、ストレージ統合オブジェクトによる外部ステージの作成まで完了しているため、ここでは Snowpipe を構成するための以下の設定を行います。
- Azure 側
- Event Grid リソースプロバイダーの有効化
- ストレージキュー向けのストレージ アカウントの作成
- ストレージキューの作成
- Event Grid サブスクリプションの構成
- Snowflake 側
- 通知統合の作成
- Azure 側でサービスプリンシパルへのロールを割り当て
- Snowpipe の構成
- 通知統合の作成
また、各種手順は以下を参考にしています。
Azure 側:Event Gridリソースプロバイダーの有効化
こちらは初回のみで、アカウントではじめて使用する場合、以下で Event Gridリソースプロバイダーの有効化を行います。
クラウドシェルで以下を実行し、ステータスが完了になることを確認します。
az provider register --namespace Microsoft.EventGrid
$ az provider show --namespace Microsoft.EventGrid --query "registrationState"
"Registered"
Azure 側:ストレージキュー向けのストレージ アカウントの作成
ストレージキューの作成先となる汎用 v2
タイプのストレージ アカウントを作成します。(ここでは Blob ストレージとは異なるストレージ アカウントを用意しました)。
クラウドシェルで以下により作成しました。
# 環境変数を設定
resource_group_name="rg-test" # 作成済みのリソースグループ名
data_storage_account_name="<Blobストレージの作成先ストレージアカウント名>"
storage_queue_account_name="<ストレージキューの作成先となるストレージアカウント名>"
queue_name="snowpipe-queue" # キューの名前
subscription_name="snowpipe-subscription" # サブスクリプション名
# ストレージキュー用ストレージアカウントを作成
az storage account create \
--resource-group $resource_group_name \
--name $storage_queue_account_name \
--location japaneast \
--sku Standard_LRS \
--kind StorageV2
# ストレージキューを作成
az storage queue create \
--name $queue_name \
--account-name $storage_queue_account_name
上記実行後、Azure ポータル上でもそれぞれのオブジェクトを確認できます。
ストレージキューは「(対象の)ストレージアカウント > データストレージ > キュー」から確認できます。
Azure 側:Event Grid サブスクリプションの構成
事前にクラウドシェルで以下を実行し、Event Grid 拡張機能をインストールします。
$ az extension add --name eventgrid
No stable version of 'eventgrid' to install. Preview versions allowed.
The installed extension 'eventgrid' is in preview.
以下のコマンドで Event Grid サブスクリプションを作成しました。
# データ用ストレージアカウントIDを取得 (Event Gridのソース)
storageid=$(az storage account show \
--name $data_storage_account_name \
--resource-group $resource_group_name \
--query id \
--output tsv)
# キュー用ストレージアカウントIDを取得 (Event Gridの宛先)
queue_storage_id=$(az storage account show \
--name $storage_queue_account_name \
--resource-group $resource_group_name \
--query id \
--output tsv)
# キュー用ストレージアカウントIDを使用し、キューのIDを作成
queueid="$queue_storage_id/queueservices/default/queues/$queue_name"
# Event Gridサブスクリプションを作成
az eventgrid event-subscription create \
--source-resource-id $storageid \
--name $subscription_name \
--endpoint-type storagequeue \
--endpoint $queueid \
--advanced-filter data.api stringin CopyBlob PutBlob PutBlockList FlushWithClose SftpCommit
完了後「Event Grid > Azure サービス イベント > システム トピック」よりトピックをクリックするとサブスクリプションを確認できます。
Snowflake 側:通知統合の作成
ここから Snowflake側で作業します。
はじめに通知統合を作成します。通知統合は Snowflake と Azure Event Grid といった、サードパーティのクラウドメッセージキューサービス間のインターフェースとして機能する Snowflake オブジェクトです。
通知統合の作成には、以下の情報が必要なので、事前に Azure 側で確認しておきます。
- ストレージキュー URL
- 「ストレージアカウント > データストレージ > キュー」より確認できます
- テナント ID
上記を使用し、以下のコマンドで通知統合を作成できます。
--通知統合を作成
USE ROLE ACCOUNTADMIN;
CREATE NOTIFICATION INTEGRATION pipe_queue_int
ENABLED = true
TYPE = QUEUE
NOTIFICATION_PROVIDER = AZURE_STORAGE_QUEUE
AZURE_STORAGE_QUEUE_PRIMARY_URI = '<ストレージキューURL>'
AZURE_TENANT_ID = '<テナントID>';
オブジェクト作成後は、Azure 側で Snowflake にアクセス権を付与する作業が必要です。
以下を実行し、Microsoft のアクセス許可リクエストページへの URL とアプリケーション名を取得します。
DESC NOTIFICATION INTEGRATION pipe_queue_int;
出力のAZURE_CONSENT_URL
が Microsoft のアクセス許可リクエストページへの URL となるので、コピーしてブラウザで開きます。
また、AZURE_MULTI_TENANT_APP_NAME
プロパティの値も後ほど使用するので控えておきます。
Azure 側の管理者アカウントでサインインし、「承諾」をクリックします。
承諾後は、Azure ポータルの「Microsoft Entra ID > エンタープライズ アプリケーション」よりAZURE_MULTI_TENANT_APP_NAME
のアンダースコア以前の名称からなる Snowflake サービスプリンシパルが作成されていることを確認できます。
続けて、Snowflake サービスプリンシパルにストレージキューへのアクセス権(ロール)を付与します。
Azure ポータルで、先の手順で作成したストレージキューのメニューから「アクセス制御(IAM) > 追加 > ロールの割り当ての追加」をクリックします。各種ロールが表示されるので検索フィルタし Snowflake サービスプリンシパルへの付与対象となるロール(「ストレージキュー データ共同作成者」)を選択し、「次へ」をクリックします。
「+ メンバーを選択する」をクリックし、DESC NOTIFICATION INTEGRATION
で取得したAZURE_MULTI_TENANT_APP_NAME
のアンダースコアより前の文字列を検索、該当のサービスプリンシパルを選択し、ロールを割り当てます。
割り当て後は「ロールの割り当て」タブより対象のサービスプリンシパルに対して、指定のロールが割り当てられていることを確認できます。
Snowflake側:Snowpipe の構成
さいごに、Snowflake 側で外部ステージやパイプ、パイプによりデータロード対象となるテーブルを作成します。
前提条件より、外部ステージは作成済みのものを使用します。
CREATE STAGE my_azure_stage
STORAGE_INTEGRATION = my_azure_storage_int
URL = 'azure://<ストレージアカウント名>.blob.core.windows.net/<コンテナー名>/';
テーブルやファイルフォーマットも事前に作成済みのものを使用します。テーブルはCREATE OR REPLACE TABLE
で再作成しています。
--ファイルフォーマットを作成
CREATE OR REPLACE FILE FORMAT my_csv_format
TYPE = CSV
FIELD_DELIMITER = ','
PARSE_HEADER = TRUE
EMPTY_FIELD_AS_NULL = true
COMPRESSION = AUTO;
--スキーマ検出機能によりテーブルを定義
>CREATE OR REPLACE TABLE mytable
USING TEMPLATE (
SELECT ARRAY_AGG(OBJECT_CONSTRUCT(*))
FROM TABLE(
INFER_SCHEMA(
LOCATION=>'@my_azure_stage/',
FILE_FORMAT=>'my_csv_format'
)
)
);
+-------------------------------------+
| status |
|-------------------------------------|
| Table MYTABLE successfully created. |
+-------------------------------------+
なお、対象のコンテナー(Blob ストレージ)には、以下のファイルを配置済みとしています。
パイプを定義します。
CREATE OR REPLACE PIPE mypipe
AUTO_INGEST = true
INTEGRATION = 'PIPE_QUEUE_INT'
AS
COPY INTO test_db.public.mytable
FROM @test_db.public.my_azure_stage/
FILE_FORMAT=(FORMAT_NAME = 'my_csv_format')
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE;
この時点ではテーブルは空です。
>SELECT * FROM MYTABLE;
+----+-----+----------+------+
| ID | Val | Category | Date |
|----+-----+----------+------|
+----+-----+----------+------+
コンテナーに新しいデータファイルをアップロードします。
少ししてテーブルを再度確認すると、データがロードされていました。
>SELECT * FROM MYTABLE;
+----+--------------+----------+-----------+
| ID | Val | Category | Date |
|----+--------------+----------+-----------|
| 1 | -0.359878994 | A | 2024/4/13 |
| 2 | -2.681319922 | B | 2024/4/13 |
| 3 | -0.929509687 | B | 2024/4/13 |
| 4 | -0.682002319 | A | 2024/4/13 |
| 5 | -1.241710994 | C | 2024/4/13 |
| 6 | 0.769089496 | B | 2024/4/13 |
| 7 | -0.151905356 | B | 2024/4/13 |
| 8 | 0.462443169 | C | 2024/4/13 |
| 9 | -0.453144568 | A | 2024/4/13 |
| 10 | 0.582974854 | B | 2024/4/13 |
+----+--------------+----------+-----------+
コピー履歴を確認すると、パイプから取り込まれていることがわかります。
SELECT * FROM TABLE(information_schema.copy_history(
table_name => 'mytable',
start_time => DATEADD(hour, -1, current_timestamp())
));
+--------------------------+----------------------------------------------------------------------+-------------------------------+-----------+------------+-----------+---------------------+-------------------------+---------------------------+-------------------------+-------------+-------------+--------+--------------------+-------------------+------------+-------------------+------------------+-----------+-------------------------------+--------------+
| FILE_NAME | STAGE_LOCATION | LAST_LOAD_TIME | ROW_COUNT | ROW_PARSED | FILE_SIZE | FIRST_ERROR_MESSAGE | FIRST_ERROR_LINE_NUMBER | FIRST_ERROR_CHARACTER_POS | FIRST_ERROR_COLUMN_NAME | ERROR_COUNT | ERROR_LIMIT | STATUS | TABLE_CATALOG_NAME | TABLE_SCHEMA_NAME | TABLE_NAME | PIPE_CATALOG_NAME | PIPE_SCHEMA_NAME | PIPE_NAME | PIPE_RECEIVED_TIME | BYTES_BILLED |
|--------------------------+----------------------------------------------------------------------+-------------------------------+-----------+------------+-----------+---------------------+-------------------------+---------------------------+-------------------------+-------------+-------------+--------+--------------------+-------------------+------------+-------------------+------------------+-----------+-------------------------------+--------------|
| sample_data_20240413.csv | azure://<ストレージアカウント>.blob.core.windows.net/<コンテナ名>/ | 2025-10-10 00:42:05.182 -0700 | 10 | 10 | 300 | NULL | NULL | NULL | NULL | 0 | 1 | Loaded | TEST_DB | PUBLIC | MYTABLE | TEST_DB | PUBLIC | MYPIPE | 2025-10-10 00:41:44.725 -0700 | 300 |
+--------------------------+----------------------------------------------------------------------+-------------------------------+-----------+------------+-----------+---------------------+-------------------------+---------------------------+-------------------------+-------------+-------------+--------+--------------------+-------------------+------------+-------------------+------------------+-----------+-------------------------------+--------------+
パイプ作成前から既に配置済みのデータファイルは、過去7日以内にストレージにアップロードしていたため、パイプをリフレッシュしてみます。
>ALTER PIPE mypipe REFRESH;
+--------------------------+--------+
| File | Status |
|--------------------------+--------|
| sample_data_20240412.csv | SENT |
| sample_data_20240413.csv | SENT |
+--------------------------+--------+
>SELECT * FROM MYTABLE;
+----+--------------+----------+-----------+
| ID | Val | Category | Date |
|----+--------------+----------+-----------|
| 1 | 1.114388475 | A | 2024/4/12 |
| 2 | -0.617730236 | B | 2024/4/12 |
| 3 | -1.226039849 | A | 2024/4/12 |
| 4 | -1.249994022 | A | 2024/4/12 |
| 5 | 0.775035178 | A | 2024/4/12 |
| 6 | 0.442294860 | C | 2024/4/12 |
| 7 | 0.819239543 | A | 2024/4/12 |
| 8 | -0.921003054 | B | 2024/4/12 |
| 9 | -0.288820276 | B | 2024/4/12 |
| 10 | 0.752006217 | B | 2024/4/12 |
| 1 | -0.359878994 | A | 2024/4/13 |
| 2 | -2.681319922 | B | 2024/4/13 |
| 3 | -0.929509687 | B | 2024/4/13 |
| 4 | -0.682002319 | A | 2024/4/13 |
| 5 | -1.241710994 | C | 2024/4/13 |
| 6 | 0.769089496 | B | 2024/4/13 |
| 7 | -0.151905356 | B | 2024/4/13 |
| 8 | 0.462443169 | C | 2024/4/13 |
| 9 | -0.453144568 | A | 2024/4/13 |
| 10 | 0.582974854 | B | 2024/4/13 |
+----+--------------+----------+-----------+
パイプ作成前にステージに配置されていたファイルもロードできました。
コピー履歴は以下のようになっていました。
>SELECT * FROM TABLE(information_schema.copy_history(
table_name => 'mytable',
start_time => DATEADD(hour, -1, current_timestamp())
));
+--------------------------+----------------------------------------------------------------------+-------------------------------+-----------+------------+-----------+---------------------+-------------------------+---------------------------+-------------------------+-------------+-------------+--------+--------------------+-------------------+------------+-------------------+------------------+-----------+-------------------------------+--------------+
| FILE_NAME | STAGE_LOCATION | LAST_LOAD_TIME | ROW_COUNT | ROW_PARSED | FILE_SIZE | FIRST_ERROR_MESSAGE | FIRST_ERROR_LINE_NUMBER | FIRST_ERROR_CHARACTER_POS | FIRST_ERROR_COLUMN_NAME | ERROR_COUNT | ERROR_LIMIT | STATUS | TABLE_CATALOG_NAME | TABLE_SCHEMA_NAME | TABLE_NAME | PIPE_CATALOG_NAME | PIPE_SCHEMA_NAME | PIPE_NAME | PIPE_RECEIVED_TIME | BYTES_BILLED |
|--------------------------+----------------------------------------------------------------------+-------------------------------+-----------+------------+-----------+---------------------+-------------------------+---------------------------+-------------------------+-------------+-------------+--------+--------------------+-------------------+------------+-------------------+------------------+-----------+-------------------------------+--------------|
| sample_data_20240413.csv | azure://<ストレージアカウント>.blob.core.windows.net/<コンテナ名>/ | 2025-10-10 00:42:05.182 -0700 | 10 | 10 | 300 | NULL | NULL | NULL | NULL | 0 | 1 | Loaded | TEST_DB | PUBLIC | MYTABLE | TEST_DB | PUBLIC | MYPIPE | 2025-10-10 00:41:44.725 -0700 | 300 |
| sample_data_20240412.csv | azure://<ストレージアカウント>.blob.core.windows.net/<コンテナ名>/ | 2025-10-10 00:45:53.183 -0700 | 10 | 10 | 297 | NULL | NULL | NULL | NULL | 0 | 1 | Loaded | TEST_DB | PUBLIC | MYTABLE | TEST_DB | PUBLIC | MYPIPE | 2025-10-10 00:45:27.489 -0700 | 297 |
+--------------------------+----------------------------------------------------------------------+-------------------------------+-----------+------------+-----------+---------------------+-------------------------+---------------------------+-------------------------+-------------+-------------+--------+--------------------+-------------------+------------+-------------------+------------------+-----------+-------------------------------+--------------+
パイプの構成パターンを試してみる
ここまでで、基本的な Azure での Snowpipe の構成手順を確認できたので、複数のテーブルに対して Snowpipe でデータロードすることを想定し、いくつかの構成パターンを試してみます。
同一コンテナでパスによるフィルタリング
はじめに、単一の Blob ストレージ内にパスで分けられたデータファイルがあると仮定します。パスを変更することで異なるテーブルに対応するデータファイルがロードされることを確認します。
Snowflake 側で以下のオブジェクトを作成しました。
--「path1/」配下のデータファイルをロードするためのテーブルとパイプを定義
CREATE OR REPLACE TABLE mytable_path1
USING TEMPLATE (
SELECT ARRAY_AGG(OBJECT_CONSTRUCT(*))
FROM TABLE(
INFER_SCHEMA(
LOCATION=>'@my_azure_stage/',
FILE_FORMAT=>'my_csv_format'
)
)
);
-- ステージの「path1/」配下のデータファイルを「mytable_path1」にSnowpipeでロード
CREATE OR REPLACE PIPE mypipe_path1
AUTO_INGEST = true
INTEGRATION = 'PIPE_QUEUE_INT'
AS
COPY INTO test_db.public.mytable_path1
FROM @test_db.public.my_azure_stage/path1/
FILE_FORMAT=(FORMAT_NAME = 'my_csv_format')
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE;
SELECT * FROM mytable_path1;
--「path2/」配下のデータファイルをロードするためのテーブルとパイプを定義
CREATE OR REPLACE TABLE mytable_path2
USING TEMPLATE (
SELECT ARRAY_AGG(OBJECT_CONSTRUCT(*))
FROM TABLE(
INFER_SCHEMA(
LOCATION=>'@my_azure_stage/',
FILE_FORMAT=>'my_csv_format'
)
)
);
-- ステージの「path2/」配下のデータファイルを「mytable_path2」にSnowpipeでロード
CREATE OR REPLACE PIPE mypipe_path2
AUTO_INGEST = true
INTEGRATION = 'PIPE_QUEUE_INT'
AS
COPY INTO test_db.public.mytable_path2
FROM @test_db.public.my_azure_stage/path2/
FILE_FORMAT=(FORMAT_NAME = 'my_csv_format')
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE;
SELECT * FROM mytable_path2;
ポイントとして、INTEGRATION
には同じ通知統合(PIPE_QUEUE_INT
)を指定しています。これにより、1つのストレージキューへのイベント通知で、両方のパイプがトリガーされる構成となります。
対象のコンテナーの指定のディレクトリ配下にファイルを追加します。
path1/
ディレクトリにデータファイルをアップロード
path2/
ディレクトリにデータファイルをアップロード
少ししてから、テーブルを確認します。
path1
のロード先テーブル (mytable_path1
)
>SELECT COUNT(*) FROM mytable_path1;
+----------+
| COUNT(*) |
|----------|
| 10 |
+----------+
>SELECT * FROM mytable_path1 ORDER BY 2 LIMIT 1;
+----+-----------+----------+--------------+
| ID | Date | Category | Val |
|----+-----------+----------+--------------|
| 1 | 2024/4/14 | C | -0.939345586 |
+----+-----------+----------+--------------+
path2
のロード先テーブル (mytable_path2
)
>SELECT COUNT(*) FROM mytable_path2;
+----------+
| COUNT(*) |
|----------|
| 10 |
+----------+
>SELECT * FROM mytable_path2 ORDER BY 2 LIMIT 1;
+-----------+----+-------------+----------+
| Date | ID | Val | Category |
|-----------+----+-------------+----------|
| 2024/4/15 | 1 | 0.331979236 | B |
+-----------+----+-------------+----------+
単一のイベントキューと通知統合を共有しつつ、パイプのCOPY INTO
ステートメントで指定したパスによって、異なるテーブルへデータロードできました。
異なるコンテナを使用
同じストレージ アカウント内に新しいコンテナーを作成し、そこからデータをロードする構成を試してみます。
また、この際のポイントは以下です。
- Azure 側の構成
- 既存の Event Grid サブスクリプションは、ストレージ アカウント全体をソースとしています。そのため、このアカウントに新しいコンテナーを追加した場合でも、特に Event Grid の設定は不要です。新しいコンテナーへのファイルアップロードイベントも自動的に既存のストレージキューに送信されます
- Snowflake 側の構成
- 複数のコンテナーからのロードを許可するため、既存のストレージ統合オブジェクトを変更する必要があります
- 具体的には
STORAGE_ALLOWED_LOCATIONS
プロパティを変更します
はじめに、Azure 側で同じストレージアカウントに新しいコンテナを作成します。
Snowflake 側の構成として、ストレージ統合の現在の設定を変更します。具体的には、先の手順では単一のコンテナーのみを許可しているので、新規作成したコンテナもSTORAGE_ALLOWED_LOCATIONS
に含めます。
ALTER STORAGE INTEGRATION my_azure_storage_int SET
STORAGE_ALLOWED_LOCATIONS = (
'azure://<ストレージアカウント名>.blob.core.windows.net/test-container/',
'azure://<ストレージアカウント名>.blob.core.windows.net/test-container2/');
新しいコンテナ向けの外部ステージを作成します。
CREATE STAGE my_azure_stage2
STORAGE_INTEGRATION = my_azure_storage_int
URL = 'azure://<ストレージアカウント名>.blob.core.windows.net/test-container2/';
データロード先のテーブルとパイプを作成します。
--テーブルとパイプを作成
CREATE OR REPLACE TABLE mytable_container2
USING TEMPLATE (
SELECT ARRAY_AGG(OBJECT_CONSTRUCT(*))
FROM TABLE(
INFER_SCHEMA(
LOCATION=>'@my_azure_stage/',
FILE_FORMAT=>'my_csv_format'
)
)
);
CREATE OR REPLACE PIPE mypipe_container2
AUTO_INGEST = true
INTEGRATION = 'PIPE_QUEUE_INT'
AS
COPY INTO test_db.public.mytable_container2
FROM @test_db.public.my_azure_stage2/
FILE_FORMAT=(FORMAT_NAME = 'my_csv_format')
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE;
新しいコンテナ(test-container2
)にファイル(sample_data_20240417.csv
)をアップロードします。
少ししてからテーブルを確認します。
>SELECT COUNT(*) FROM mytable_container2;
+----------+
| COUNT(*) |
|----------|
| 10 |
+----------+
>SELECT * FROM mytable_container2 LIMIT 1;
+-----------+-------------+----------+----+
| Date | Val | Category | ID |
|-----------+-------------+----------+----|
| 2024/4/17 | 0.013401644 | C | 1 |
+-----------+-------------+----------+----+
既存の構成に新しいコンテナを追加し、ALTER STORAGE INTEGRATION
で設定を更新することで、同じ Event Grid サブスクリプションと通知統合を利用しながら、異なるコンテナからのデータを対応するテーブルにロードできました。
異なるストレージアカウント内のコンテナを使用
さいごに、コンテナーが異なるストレージ アカウントにあるパターンを試してみます。
ここでは、以下の条件で検証します。
- ストレージ統合オブジェクトは異なるストレージ アカウントでも共通のものを使用する
- 既に作成済みの単一のストレージキューを使用する
同じストレージキューを通じて Snowpipe でデータをロードできるよう、この場合、各ストレージアカウントと Snowflake の間で個別の連携設定が必要です。
はじめに、追加のストレージ アカウントを作成します。
このパターンでは、新しいストレージアカウントごとに Event Grid サブスクリプションを作成する必要があります。ただし、宛先には既存のストレージキューを指定します。これにより、すべてのストレージ アカウントからのイベントが、単一のストレージキューに集約されます。
以下のコマンドで、新しいストレージアカウント(yasuhara2storageaccount
)用の Event Grid サブスクリプションを作成します。
# 環境変数を設定
resource_group_name="rg-test"
new_storage_account_name='<新規作成したストレージアカウント名>'
storage_queue_account_name="<ストレージキュー名>"
queue_name="<既存のキューの名前>"
subscription_name="<新規作成するサブスクリプション名>"
# データ用ストレージアカウントIDを取得 (Event Gridのソースとなる新規作成したストレージアカウント)
storageid=$(az storage account show \
--name $new_storage_account_name \
--resource-group $resource_group_name \
--query id \
--output tsv)
# キュー用ストレージアカウントIDを取得 (Event Gridの宛先)
queue_storage_id=$(az storage account show \
--name $storage_queue_account_name \
--resource-group $resource_group_name \
--query id \
--output tsv)
# キュー用ストレージアカウントIDを使用しキューのIDを作成
queueid="$queue_storage_id/queueservices/default/queues/$queue_name"
# Event Gridサブスクリプションを作成
az eventgrid event-subscription create \
--source-resource-id $storageid \
--name $subscription_name \
--endpoint-type storagequeue \
--endpoint $queueid \
--advanced-filter data.api stringin CopyBlob PutBlob PutBlockList FlushWithClose SftpCommit
これにより、新旧両方のストレージアカウントからのイベントが、単一のストレージキューに集約されます。
ストレージ統合オブジェクトは異なるストレージ アカウントでも共通のものを使用する前提のため、ストレージ統合オブジェクト作成時に Azure 側に作成されているプリンシパルに、新規作成したストレージ アカウントへのアクセス権(ロール)を付与します。
Azure ポータルの対象ストレージ アカウントメニューの「アクセス制御(IAM) > 追加 > ロールの割り当ての追加」をクリックします。
ここでは「ストレージ BLOB データ閲覧者」ロールをサービスプリンシパルに割り当てました。こちらの手順は以下の記事内の「ストレージに Snowflake アクセスを許可する」と同様です。
新しいストレージ アカウントに新規のコンテナを作成します。
Azure 側の作業は以上となります。
続けて Snowflake 側でストレージ統合の設定を以下のように変更します。
ALTER STORAGE INTEGRATION my_azure_storage_int SET
STORAGE_ALLOWED_LOCATIONS = (
'azure://<既存のストレージアカウント名>.blob.core.windows.net/test-container/',
'azure://<既存のストレージアカウント名>.blob.core.windows.net/test-container2/',
'azure://<新規作成したストレージアカウント名>.blob.core.windows.net/new-container/');
異なるストレージ アカウント内の新しいコンテナ向けの外部ステージを作成します。
--新しいコンテナ向けのステージを作成
CREATE STAGE my_azure_stage3
STORAGE_INTEGRATION = my_azure_storage_int
URL = 'azure://<新規作成したストレージアカウント名>.blob.core.windows.net/new-container/';
これまでの同様の手順でデータロード先となるテーブルとパイプを作成します。
--テーブルとパイプを作成
CREATE OR REPLACE TABLE mytable_storageaccount2
USING TEMPLATE (
SELECT ARRAY_AGG(OBJECT_CONSTRUCT(*))
FROM TABLE(
INFER_SCHEMA(
LOCATION=>'@my_azure_stage/',
FILE_FORMAT=>'my_csv_format'
)
)
);
CREATE OR REPLACE PIPE mypipe_storageaccount2
AUTO_INGEST = true
INTEGRATION = 'PIPE_QUEUE_INT'
AS
COPY INTO test_db.public.mytable_storageaccount2
FROM @test_db.public.my_azure_stage3/
FILE_FORMAT=(FORMAT_NAME = 'my_csv_format')
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE;
新しいコンテナ(new-container
)にファイル(sample_data_20240418.csv
)をアップロードします。
少ししてからテーブルを確認します。
>SELECT COUNT(*) FROM mytable_storageaccount2;
+----------+
| COUNT(*) |
|----------|
| 10 |
+----------+
>SELECT * FROM mytable_storageaccount2 ORDER BY 2 LIMIT 1;
+-----------+--------------+----+----------+
| Date | Val | ID | Category |
|-----------+--------------+----+----------|
| 2024/4/18 | -1.033691009 | 4 | A |
+-----------+--------------+----+----------+
複数の異なるストレージアカウントをソースとしながら、単一の通知統合とストレージキュー共有することで、Snowpipe を構成できました。
さいごに
Snowflake で Azure の Blob Storage を外部ステージに設定し、Snowpipe によるデータロードを試してみました。
こちらの内容が何かの参考になれば幸いです。