Snowflakeの継続的なデータロード「Snowpipe」を試してみた

2020.10.01

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは!DA(データアナリティクス)事業本部 インテグレーション部の大高です。

Snowflakeには継続的にデータをロードするための仕組みとして、「Snowpipe」という機能があります。この機能をこれまで実際に触ったことがなかったので、今回は設定からデータをロードするまでを試してみました。

なお、「Snowpipe」については以下の記事でも紹介されていますので、よろしければご参照ください。

Snowpipeとは

SnowpipeはSnowflakeの継続的なデータロードの仕組みです。COPYコマンドを利用した一括ロードとは異なり、ステージ上でファイルが利用できるようになったことを検知して、ファイルを継続的にロードすることができます。

Snowpipeを設定してみる

実際にどのようなものなのか、設定してみて確認したいと思います。今回はAWSの「S3イベント通知」をトリガーとしてSnowpipeがデータロードをするようにしたいと思います。

なお、今回は一般的なオプションとされている「S3イベント通知」を利用した設定を行いますが、S3バケットに競合するイベント通知が存在する場合には、代わりに「SNSトピック」を利用したパターンでの実装が必要となります。

詳しくは以下のドキュメントの「正しいオプションの決定」の項に記載されています。

ステージの準備

ではドキュメントに沿って作成を進めてみます。まずはステージの準備です。

S3からのロードになるので外部ステージの設定が必要となるわけなのですが、ここは設定手順が長くなるので記述を割愛したいと思います。以前に以下の記事で実施したので同様に設定を行いました。

今回は外部ステージとして OOTAKA_SANDBOX_DB.PUBLIC.CM_OOTAKA_EXTERNAL_STAGE というステージを作成しています。

パイプの準備

次にパイプの準備をします。指定テーブルに対してCOPYを行うパイプを作成するのですが、まず先にテーブルを作成しておきます。

以下のように、VARIANT型のカラムだけを持つテーブルを作成しておきます。

CREATE TABLE OOTAKA_SANDBOX_DB.PUBLIC.LOG(
    record VARIANT
);

作成したら、このテーブルに対してCOPYを行うパイプを作成します。

CREATE PIPE OOTAKA_SANDBOX_DB.PUBLIC.CM_OOTAKA_PIPE AUTO_INGEST=TRUE AS
  COPY INTO OOTAKA_SANDBOX_DB.PUBLIC.LOG
  FROM @OOTAKA_SANDBOX_DB.PUBLIC.CM_OOTAKA_EXTERNAL_STAGE/Snowpipe/
  FILE_FORMAT = (TYPE = 'JSON')
;

指定外部ステージCM_OOTAKA_EXTERNAL_STAGEの配下にあるSnowpipeフォルダ内のオブジェクトを、JSONフォーマットでCOPYするPIPEです。 AUTO_INGEST=TRUEとしていますが、これにより新しいデータをロードする準備ができたときに、S3バケットからSQSキューに送信されたイベント通知を読み取るようになるそうです。

権限設定

次に各オブジェクトの権限設定です。Snowpipeを利用するためには以下の権限を持つロールが必要となります。

オブジェクト 権限
名前付きパイプ OWNERSHIP
名前付きステージ USAGE, READ
名前付きファイル形式 USAGE(*)
ターゲットデータベース USAGE
ターゲットスキーマ USAGE
ターゲットテーブル INSERT, SELECT

(*)作成したステージが名前付きファイル形式を参照する場合にのみ必要

今回利用するロールOOTAKA_SANDBOX_ROLEを改めて確認します。「名前付きファイル形式」は利用していないので、それ以外をクエリで確認していきます。

名前付きパイプ

名前付きパイプについては以下のクエリで確認します。

SELECT
  PIPE_CATALOG,
  PIPE_SCHEMA,
  PIPE_NAME,
  PIPE_OWNER
FROM
  OOTAKA_SANDBOX_DB.INFORMATION_SCHEMA.PIPES
WHERE
  PIPE_NAME = 'CM_OOTAKA_PIPE'
;

結果は以下になります。

PIPE_CATALOG PIPE_SCHEMA PIPE_NAME PIPE_OWNER
OOTAKA_SANDBOX_DB PUBLIC CM_OOTAKA_PIPE OOTAKA_SANDBOX_ROLE

PIPE_OWNERなのでOWNERSHIP権限があります。

名前付きステージ

名前付きステージについては以下のクエリで確認します。

SELECT
  STAGE_CATALOG,
  STAGE_SCHEMA,
  STAGE_NAME,
  STAGE_OWNER
FROM
  OOTAKA_SANDBOX_DB.INFORMATION_SCHEMA.STAGES
WHERE
  STAGE_NAME = 'CM_OOTAKA_EXTERNAL_STAGE'
;

結果は以下になります。

STAGE_CATALOG STAGE_SCHEMA STAGE_NAME STAGE_OWNER
OOTAKA_SANDBOX_DB PUBLIC CM_OOTAKA_EXTERNAL_STAGE OOTAKA_SANDBOX_ROLE

STAGE_OWNERなのでUSAGEREAD権限も持っています。

ターゲットデータベース、ターゲットスキーマ、ターゲットテーブル

データベース、スキーマ、テーブルについては以下のクエリで確認します。

SELECT
  GRANTEE,
  OBJECT_CATALOG,
  OBJECT_NAME,
  OBJECT_TYPE,
  PRIVILEGE_TYPE
FROM
  OOTAKA_SANDBOX_DB.INFORMATION_SCHEMA.OBJECT_PRIVILEGES
WHERE
      GRANTEE = 'OOTAKA_SANDBOX_ROLE'
  AND
  (
    (OBJECT_TYPE = 'DATABASE' AND OBJECT_NAME = 'OOTAKA_SANDBOX_DB' AND PRIVILEGE_TYPE = 'USAGE')
    OR
    (OBJECT_TYPE = 'SCHEMA' AND OBJECT_CATALOG = 'OOTAKA_SANDBOX_DB' AND OBJECT_NAME = 'PUBLIC' AND PRIVILEGE_TYPE = 'USAGE')
    OR
    (OBJECT_TYPE = 'TABLE' AND OBJECT_CATALOG = 'OOTAKA_SANDBOX_DB' AND OBJECT_NAME = 'LOG' )
  )
ORDER BY
  OBJECT_TYPE,
  PRIVILEGE_TYPE
;

結果は以下になります。

GRANTEE OBJECT_CATALOG OBJECT_NAME OBJECT_TYPE PRIVILEGE_TYPE
OOTAKA_SANDBOX_ROLE OOTAKA_SANDBOX_DB DATABASE USAGE
OOTAKA_SANDBOX_ROLE OOTAKA_SANDBOX_DB PUBLIC SCHEMA USAGE
OOTAKA_SANDBOX_ROLE OOTAKA_SANDBOX_DB LOG TABLE OWNERSHIP

データベース、スキーマは想定通りUSAGE権限を持っており、テーブルはOWNERSHIP権限があるのでINSERTSELECTに問題はありません。

以上で、権限に問題ないことが確認できました!

イベント通知設定

イベント通知設定では、AWSのS3イベント通知の設定を行っていきます。

まずは以下のコマンドで設定に必要なARNを取得します。

SHOW PIPES;

ここで表示された、利用するPIPEのSQSのARNを控えておきます。以下のような値になります。

arn:aws:sqs:ap-northeast-1:123456789012:sf-snowpipe-foo_bar

次に、AWSの管理コンソールに移り、外部ステージとして利用するS3バケットの画面を開き、「プロパティ」タブの「詳細設定」から「イベント」を選択します。

選択すると「+通知の追加」が表示されるのでクリックします。

設定は以下のように設定します。

「名前」には任意の名前を、「イベント」には「すべてのオブジェクト作成イベント」を指定します。

「プレフィックス」には「パイプの準備」の項で記載した通り、今回はSnowpipeというフォルダ配下のファイルをロードしたいので、Snowpipe/と指定します。

「送信先」と「SQS」には「SQLキュー」と「SQSキューのARNを追加」を指定して、「SQSキューのARN」に先程Snowflakeで確認したARNを指定して「保存」します。

履歴データのロード

今回はこれから初めてデータをロードすることになるので関係がないのですが、以前にステージングされたファイルがある場合には、以下の手順で履歴データをロードすることができるそうです。

COPY INTO の実行

これはPIPE作成時に設定したCOPY文の実行ですね。今回の例だと以下になります。

COPY INTO OOTAKA_SANDBOX_DB.PUBLIC.LOG
FROM @OOTAKA_SANDBOX_DB.PUBLIC.CM_OOTAKA_EXTERNAL_STAGE/Snowpipe/
FILE_FORMAT = (TYPE = 'JSON')
;

後述のALTER PIPE ... REFRESHでは過去7日間にステージングされたファイルを処理してくれるので、それより以前のファイルを考慮した処理になるということですね。

自動データロードを構成

これは、まさにいま設定してきたSnowpipeの設定です。

ALTER PIPE ... REFRESH の実行

この実行により、上記の「COPY INTOの実行」から、「自動データロードを構成」までの期間にステージングされたファイルがロードされます。

今回の例だと以下になります。なお、ターゲットテーブルとパイプの両方のロード履歴がチェックされるので、同じファイルが2回ロードされることはないそうです。

ALTER PIPE OOTAKA_SANDBOX_DB.PUBLIC.CM_OOTAKA_PIPE REFRESH;

Snowpipeを動かしてみる

以上で、設定が終わりました!早速試してみましょう。

ここでは詳細は省きますが、Kinesis FirehoseのDelivery Streamを作成して、データの取り込み対象のS3パスにテストデータを送り込んでみます。テストデータは「Test with demo data」から送信できます。

しばらくすると以下のようにファイルが送信されたのが確認できたので、送信を止めました。

なお、ファイルの中身は以下のようなJSONL形式のデータとなっています。

{"ticker_symbol":"ALY","sector":"ENERGY","change":-1.01,"price":88.09}
{"ticker_symbol":"TBV","sector":"HEALTHCARE","change":8.62,"price":199.62}
{"ticker_symbol":"ASD","sector":"FINANCIAL","change":-1.39,"price":64.41}
...

では、Snowflake側でロードされているかを確認してみます。

SELECT * FROM OOTAKA_SANDBOX_DB.PUBLIC.LOG;

想定どおり、ちゃんとロードされていますね!

まとめ

以上、Snowflakeの継続的なデータロード「Snowpipe」を試してみました。Kinesis Firehoseのように、継続的にS3に配置されるデータのロードにとても便利そうな機能ですね。うまく活用できるように利用方法について勉強していきたいと思います。

どなたかのお役に立てば幸いです。それでは!