Amazon SNS を使用する Snowpipe の構成オプションを試してみた #SnowflakeDB

Amazon SNS を使用する Snowpipe の構成オプションを試してみた #SnowflakeDB

Clock Icon2025.07.10

はじめに

Snowflake では Snowpipe 構成時のオプションとして、 Amazon Simple Notification Service(SNS) をブロードキャスターとして使用できます。
こちらを試してみましたので、本記事で内容をまとめてみます。

https://docs.snowflake.com/user-guide/data-load-snowpipe-auto-s3#option-2-configuring-amazon-sns-to-automate-snowpipe-using-sqs-notifications

概要

Snowflake には継続的なデータ ロードの仕組みとして Snowpipe という機能が提供されています。

最もシンプルな構成方法は Snowflake アカウント側でパイプオブジェクトに割り当てられる SQS キューの Arn を S3 バケットのイベント通知先として設定します。これにより、バケットに新しいファイルがアップロードされるたびにイベントがトリガーされ、データファイルががテーブルにロードされます。

https://docs.snowflake.com/en/user-guide/data-load-snowpipe-auto-s3#option-1-creating-a-new-s3-event-notification-to-automate-snowpipe

その他にも、本記事で扱う SNS を使用する構成オプションがあります。
図は公式ドキュメントからの引用ですが、オプション1(S3 イベント通知 → SQS)と異なりイベント通知先に SNS トピックを指定し、SQS キューは SNS トピックをサブスクライブします。

image

S3イベント通知では、同じS3バケット内で、同じパスと、同じイベントタイプを指定して複数のイベント通知を設定することができません。そのため、対象のパスで同じイベントタイプに基づく別のイベントがある場合、オプション1による構成ができなくなります。

https://repost.aws/ja/knowledge-center/lambda-s3-event-configuration-error

例として、事前にある S3 バケットのsqstest/プレフィックス配下に「すべてのオブジェクト管理作成イベント」を指定する送信先が Lambda 関数であるイベントがあるとします。

image 1

送信先

image 2

この状態で同じパスが含まれる、同じイベントタイプのイベント通知を作成します。

image 3

ここでは送信先として Snowflake アカウント側の SQS を指定してみると、下図のようにエラー(Configuration is ambiguously defined)となり構成できません。

image 4

このようなケースでは構成オプション2を使用し、イベントの送信先を SNS とすることで、SNS から複数のサブスクライバー(既存の Lambda 関数とSnowflake アカウント側の SQSキュー など)へイベントを配信し、ある S3イベントから複数の異なるサービスへの通知を構成できます。

試してみる

環境用意

はじめに以下のコマンドで検証用の各種オブジェクトを作成しました。

--環境用意
-- SYSADMIN で実行
USE ROLE SYSADMIN;
-- データベースを作成
CREATE DATABASE IF NOT EXISTS raw_db;
CREATE or replace DATABASE raw_db;

-- パイプ管理を行う検証用ロールを作成
USE ROLE USERADMIN;
CREATE OR REPLACE ROLE pipe_admin;

USE ROLE SECURITYADMIN;
GRANT USAGE ON DATABASE raw_db TO ROLE pipe_admin;
GRANT USAGE ON SCHEMA raw_db.public TO ROLE pipe_admin;
GRANT CREATE TABLE ON SCHEMA raw_db.public TO ROLE pipe_admin;
GRANT CREATE PIPE ON SCHEMA raw_db.public TO ROLE pipe_admin;
GRANT CREATE FILE FORMAT ON SCHEMA raw_db.public TO ROLE pipe_admin;
GRANT CREATE STAGE ON SCHEMA raw_db.public TO ROLE pipe_admin;

GRANT USAGE ON WAREHOUSE COMPUTE_WH TO ROLE pipe_admin;
GRANT ROLE pipe_admin TO ROLE SYSADMIN;

-- ストレージ統合オブジェクトを作成
USE ROLE ACCOUNTADMIN;

CREATE OR REPLACE STORAGE INTEGRATION s3_storage_int
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'S3'
  ENABLED = TRUE
  STORAGE_AWS_ROLE_ARN = '<IAMロールArn>'
  STORAGE_ALLOWED_LOCATIONS = ('s3://<バケット名>/');      

-- STORAGE_AWS_IAM_USER_ARNと外部IDを取得
DESCRIBE integration s3_storage_int;

-- 統合オブジェクトの使用権限を付与
USE ROLE SECURITYADMIN;
GRANT USAGE ON INTEGRATION s3_storage_int TO ROLE pipe_admin;

-- データロードに使用する各種オブジェクトを作成
USE ROLE pipe_admin;
USE SCHEMA RAW_DB.PUBLIC;

-- ファイルフォーマット
CREATE OR REPLACE FILE FORMAT my_csv_format
    TYPE = CSV
    COMPRESSION = NONE
    RECORD_DELIMITER = '\n'
    FIELD_DELIMITER = ','
    SKIP_HEADER = 1 ;

-- 外部ステージ
CREATE OR REPLACE STAGE my_stg
    STORAGE_INTEGRATION = s3_storage_int
    URL = 's3://<バケット名>/'
    FILE_FORMAT = my_csv_format;

list @my_stg;

ストレージ統合を使用する手順は以下の記事が参考になると思います。

https://dev.classmethod.jp/articles/snowflake-bulk-loading-from-amazon-s3/

SNS トピックを作成

AWS 側で SNS トピックを作成します。
ここでは下図の設定でその他もデフォルトのままとして作成しました。

image 5

SNS トピック作成後、トピックの Arn を控え Snowflake 側で以下のコマンドを実行します。

select system$get_aws_sns_iam_policy('<SNSトピックのArn>');

実行結果は以下のようになり、Snowflake アカウント側に作成された SQS キューにトピックをサブスクライブさせるためのポリシーステートメントが返ってきます。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "1",
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::<Snowflake側のアカウントID>:user/<ユーザー名>"
      },
      "Action": [
        "sns:Subscribe"
      ],
      "Resource": [
        "<SNSトピックのArn>"
      ]
    }
  ]
}

https://docs.snowflake.com/ja/sql-reference/functions/system_get_aws_sns_iam_policy

先の出力とあわせて、SNS トピックのアクセスポリシーを更新します。ここでは以下のように変更しました。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "1",
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::<Snowflake側のアカウントID>:user/<ユーザー名>"
      },
      "Action": [
        "sns:Subscribe"
      ],
      "Resource": [
        "<SNSトピックのArn>"
      ]
    },
    {
      "Sid": "s3-event-notifier",
      "Effect": "Allow",
      "Principal": {
        "Service": "s3.amazonaws.com"
      },
      "Action": "SNS:Publish",
      "Resource": "<SNSトピックのArn>",
      "Condition": {
        "ArnLike": {
          "aws:SourceArn": "<S3バケットのArn>"
        }
      }
    }
  ]
}

パイプオブジェクトを作成

諸々用意ができたので、データロード先となるテーブル、データロードに使用するパイプオブジェクトを作成します。

USE ROLE pipe_admin;
USE SCHEMA raw_db.public;

-- テーブル作成
CREATE TABLE mytable (
    ID NUMBER(2,0),
    Val NUMBER(10,9),
    Category VARCHAR(16777216),
    Date VARCHAR(16777216)
);

-- パイプを作成
CREATE OR REPLACE PIPE mypipe
  AUTO_INGEST = TRUE
  AWS_SNS_TOPIC='<SNSトピックのArn>'
  AS
    COPY INTO raw_db.public.mytable
      FROM @raw_db.public.my_stg/pipetest/;

ポイントはパイプ定義のAWS_SNS_TOPICプロパティです。SNS を使用する本構成の場合、ここでサブスクライブ先の SNS トピックの Arn を指定します。
また、このテーブルではpipetest/プレフィックス配下にデータファイルをロードする設定としました。

パイプを作成すると SNS トピックのサブスクリプションが更新されています。

image 6

S3 イベント通知を構成

S3 のイベント通知を設定します。
ここでは下図の通り、特にプレフィックスは指定せずに、バケット全体でオブジェクト作成イベントを設定しました。

image 7

送信先には SNS トピックを指定します。

image 8

以上で構成は完了です。

データロード

テーブルの中身を確認しておきます。当然、この時点では空です。

>SELECT * FROM mytable;
+----+-----+----------+------+
| ID | VAL | CATEGORY | DATE |
|----+-----+----------+------|
+----+-----+----------+------+

パイプ定義のプレフィックス(pipetest/)配下に、ファイルを配置します。

image 9

Snowflake で確認すると1分もしないうちに レコードが追加されていました。

>SELECT * FROM mytable;
+----+--------------+----------+-----------+
| ID |          VAL | CATEGORY | DATE      |
|----+--------------+----------+-----------|
|  1 |  1.114388475 | A        | 2024/4/12 |
|  2 | -0.617730236 | B        | 2024/4/12 |
|  3 | -1.226039849 | A        | 2024/4/12 |
|  4 | -1.249994022 | A        | 2024/4/12 |
|  5 |  0.775035178 | A        | 2024/4/12 |
|  6 |  0.442294860 | C        | 2024/4/12 |
|  7 |  0.819239543 | A        | 2024/4/12 |
|  8 | -0.921003054 | B        | 2024/4/12 |
|  9 | -0.288820276 | B        | 2024/4/12 |
| 10 |  0.752006217 | C        | 2024/4/12 |
+----+--------------+----------+-----------+

コピー履歴

>SELECT * FROM TABLE(information_schema.copy_history(
                        table_name => 'mytable',
                        start_time => DATEADD(hour, -1, current_timestamp())
                        ));
+--------------------------+-------------------------------+-------------------------------+-----------+------------+-----------+---------------------+-------------------------+---------------------------+-------------------------+-------------+-------------+--------+--------------------+-------------------+------------+-------------------+------------------+-----------+-------------------------------+
| FILE_NAME                | STAGE_LOCATION                | LAST_LOAD_TIME                | ROW_COUNT | ROW_PARSED | FILE_SIZE | FIRST_ERROR_MESSAGE | FIRST_ERROR_LINE_NUMBER | FIRST_ERROR_CHARACTER_POS | FIRST_ERROR_COLUMN_NAME | ERROR_COUNT | ERROR_LIMIT | STATUS | TABLE_CATALOG_NAME | TABLE_SCHEMA_NAME | TABLE_NAME | PIPE_CATALOG_NAME | PIPE_SCHEMA_NAME | PIPE_NAME | PIPE_RECEIVED_TIME            |
|--------------------------+-------------------------------+-------------------------------+-----------+------------+-----------+---------------------+-------------------------+---------------------------+-------------------------+-------------+-------------+--------+--------------------+-------------------+------------+-------------------+------------------+-----------+-------------------------------|
| sample_data_20240412.csv | s3://cm-yasuhara-tk/pipetest/ | 2025-07-09 22:33:32.393 -0700 |        10 |         10 |       297 | NULL                |                    NULL |                      NULL | NULL                    |           0 |           1 | Loaded | RAW_DB             | PUBLIC            | MYTABLE    | RAW_DB            | PUBLIC           | MYPIPE    | 2025-07-09 22:33:11.745 -0700 |
+--------------------------+-------------------------------+-------------------------------+-----------+------------+-----------+---------------------+-------------------------+---------------------------+-------------------------+-------------+-------------+--------+--------------------+-------------------+------------+-------------------+------------------+-----------+-------------------------------+

パイプを追加

バケット全体でイベント通知設定をしたので、異なるプレフィックス配下にファイルを配置した際に別のテーブルにデータロードされることを見ておきます。SNSトピックの Arn は同じものを指定します。

-- 異なるテーブルを作成
USE SCHEMA raw_db.public;
CREATE TABLE mytable_2 (
    ID NUMBER(2,0),
    Val NUMBER(10,9),
    Category VARCHAR(16777216),
    Date VARCHAR(16777216)
);

-- 異なるパスを指定しパイプを作成
CREATE  OR REPLACE PIPE mypipe_2
  AUTO_INGEST = TRUE
  AWS_SNS_TOPIC='<SNSトピックのArn>'
  AS
    COPY INTO raw_db.public.mytable_2
      FROM @raw_db.public.my_stg/pipetest_2/;

>SHOW PIPES;
+-------------------------------+----------+---------------+-------------+----------------------------------------------+------------+-----------------------------------------------------------------+---------+-------------+---------+-------------------+-----------------+----------------+-------+
| created_on                    | name     | database_name | schema_name | definition                                   | owner      | notification_channel                                            | comment | integration | pattern | error_integration | owner_role_type | invalid_reason | kind  |
|-------------------------------+----------+---------------+-------------+----------------------------------------------+------------+-----------------------------------------------------------------+---------+-------------+---------+-------------------+-----------------+----------------+-------|
| 2025-07-09 22:20:54.812 -0700 | MYPIPE   | RAW_DB        | PUBLIC      | COPY INTO raw_db.public.mytable              | PIPE_ADMIN | arn:aws:sns:ap-northeast-1:<アカウントID>:yasuhara-test-sns-topic |         | NULL        | NULL    | NULL              | ROLE            | NULL           | STAGE |
|                               |          |               |             |       FROM @raw_db.public.my_stg/pipetest/   |            |                                                                 |         |             |         |                   |                 |                |       |
| 2025-07-09 22:37:29.221 -0700 | MYPIPE_2 | RAW_DB        | PUBLIC      | COPY INTO raw_db.public.mytable_2            | PIPE_ADMIN | arn:aws:sns:ap-northeast-1:<アカウントID>:yasuhara-test-sns-topic |         | NULL        | NULL    | NULL              | ROLE            | NULL           | STAGE |
|                               |          |               |             |       FROM @raw_db.public.my_stg/pipetest_2/ |            |                                                                 |         |             |         |                   |                 |                |       |
+-------------------------------+----------+---------------+-------------+----------------------------------------------+------------+-----------------------------------------------------------------+---------+-------------+---------+-------------------+-----------------+----------------+-------+

この状態で新規作成したパイプ定義で指定したプレフィックス配下(pipetest_2/)にファイルをアップロードします。

image 10

以下のようなファイルの配置です。

$ aws s3 ls s3://cm-yasuhara-tk --recursive --profile <profile>
2025-07-10 14:28:35          0 pipetest/
2025-07-10 14:33:08        297 pipetest/sample_data_20240412.csv
2025-07-10 14:39:44          0 pipetest_2/
2025-07-10 14:39:54        300 pipetest_2/sample_data_20240413.csv

Snowflake 側を確認すると、対応するテーブルにデータがロードされていることを確認できました。

>SELECT * FROM mytable;
+----+--------------+----------+-----------+
| ID |          VAL | CATEGORY | DATE      |
|----+--------------+----------+-----------|
|  1 |  1.114388475 | A        | 2024/4/12 |
|  2 | -0.617730236 | B        | 2024/4/12 |
|  3 | -1.226039849 | A        | 2024/4/12 |
|  4 | -1.249994022 | A        | 2024/4/12 |
|  5 |  0.775035178 | A        | 2024/4/12 |
|  6 |  0.442294860 | C        | 2024/4/12 |
|  7 |  0.819239543 | A        | 2024/4/12 |
|  8 | -0.921003054 | B        | 2024/4/12 |
|  9 | -0.288820276 | B        | 2024/4/12 |
| 10 |  0.752006217 | C        | 2024/4/12 |
+----+--------------+----------+-----------+

>SELECT * FROM mytable_2;
+----+--------------+----------+-----------+
| ID |          VAL | CATEGORY | DATE      |
|----+--------------+----------+-----------|
|  1 | -0.359878994 | A        | 2024/4/13 |
|  2 | -2.681319922 | B        | 2024/4/13 |
|  3 | -0.929509687 | B        | 2024/4/13 |
|  4 | -0.682002319 | A        | 2024/4/13 |
|  5 | -1.241710994 | C        | 2024/4/13 |
|  6 |  0.769089496 | B        | 2024/4/13 |
|  7 | -0.151905356 | B        | 2024/4/13 |
|  8 |  0.462443169 | C        | 2024/4/13 |
|  9 | -0.453144568 | A        | 2024/4/13 |
| 10 |  0.582974854 | B        | 2024/4/13 |
+----+--------------+----------+-----------+

さいごに

Amazon SNS を使用する Snowpipe の構成オプションを試してみました。イベントが競合する場合はこのオプションが必要となります。こちらの内容が何かの参考になれば幸いです。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.