
Snowflakeでデータパイプラインを作ってみた
この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
こんにちは!DA(データアナリティクス)事業本部 インテグレーション部の大高です。
Snowflakeでは、以下のドキュメントでも記載されている通り、Snowflakeの機能を組み合わせてデータパイプラインを作成することができます。
| 分類 | Snowflakeの機能 | 
|---|---|
| 連続データロード | Snowpipe | 
| 変更データの追跡 | ストリーム | 
| 定期的なタスク | タスク | 
今回はこのドキュメントの内容を踏まえて、実際に以下のようなイメージのデータパイプラインを作成してみたいと思います。
データの流れですが、以下のようなJSONL形式のデータがSnowpipe経由でLogテーブルに投入されてきます。
{"ticker_symbol":"ALY","sector":"ENERGY","change":-1.01,"price":88.09}
{"ticker_symbol":"TBV","sector":"HEALTHCARE","change":8.62,"price":199.62}
{"ticker_symbol":"ASD","sector":"FINANCIAL","change":-1.39,"price":64.41}
...
LogテーブルはVARIANT型のカラムRecordのみで構成されており、これをカラム毎に型定義されているStockテーブルに移していく流れとします。
Snowpipe
では、早速作成していきます。まずはSnowpipeの作成なのですが、Snowpipeの作成については、以下のエントリで作成したものをそのまま利用します。なお、元データについてもこのエントリでテストとして利用したKinesis FirehoseのDelivery Streamから送られるテストデータを利用します。
このSnowpipeによって、S3(外部ステージ)に配置されたデータが、データを配置したタイミングでlogテーブルにロードされるようになります。
ストリーム
ストリームは、テーブルのレコードに対して行った変更(差分データ)を追跡できるオブジェクトです。これを利用してlogテーブルのレコードに対する差分を検出します。
今回、ストリームは以下のように生データを投入しているテーブルlogに対して作成しておきます。
USE ROLE OOTAKA_SANDBOX_ROLE; USE DATABASE OOTAKA_SANDBOX_DB; USE SCHEMA PUBLIC; CREATE OR REPLACE STREAM log_stream ON TABLE log;
タスク
最後にタスクです。タスクは定期的にSQLクエリを実行可能なオブジェクトで、ツリー関係(タスクAの実行後にタスクBを実行など)を持つこともできます。
今回は定期実行するタスクを1つだけ作成します。
まずは、タスクの前にデータ投入先のstockテーブルを作成します。
USE ROLE OOTAKA_SANDBOX_ROLE; USE DATABASE OOTAKA_SANDBOX_DB; USE SCHEMA PUBLIC; CREATE OR REPLACE TABLE stock ( ticker_symbol VARCHAR(5), sector VARCHAR(16), price NUMBER(8,2), change NUMBER(8,2) );
次にタスクを作成します。タスクはシンプルなタスクで「VARIANT型だったデータをlogテーブルのストリームを利用して読み込み、各カラム毎に分けて型定義したstockテーブルに移す」というタスクとします。また、スケジュール設定は「1分毎の実行」とします。
USE ROLE OOTAKA_SANDBOX_ROLE;
USE DATABASE OOTAKA_SANDBOX_DB;
USE SCHEMA PUBLIC;
CREATE OR REPLACE TASK log_to_stock
  WAREHOUSE = X_SMALL_WH
  SCHEDULE = '1 minute'
WHEN
  SYSTEM$STREAM_HAS_DATA('log_stream')
AS
  INSERT INTO stock
  SELECT
    record:ticker_symbol::VARCHAR AS ticker_symbol,
    record:sector::VARCHAR AS sector,
    record:price::NUMBER AS price,
    record:change::NUMBER AS change
  FROM
    log_stream
;
これでタスクが出来ましたが、この段階ではまだタスクは「一時停止」した状態なので開始してあげる必要があります。
タスクの起動には「グローバル権限」のEXECUTE TASKが必要となるので、ACCOUNTADMINでカスタムロールのOOTAKA_SANDBOX_ROLEに権限を付与します。
USE ROLE ACCOUNTADMIN; GRANT EXECUTE TASK ON ACCOUNT TO ROLE OOTAKA_SANDBOX_ROLE;
その後、OOTAKA_SANDBOX_ROLEにロールを再度切り替えてタスクを起動します。
USE ROLE OOTAKA_SANDBOX_ROLE; USE DATABASE OOTAKA_SANDBOX_DB; ALTER TASK log_to_stock RESUME;
これでタスクが起動状態になりました。
パイプラインを試してみる
これで、下準備ができたので実際にデータを流してみます!
想定通りであれば、S3にファイルが送信された結果、Streamに差分データが溜まっていき、1分に一度のタイミングでstockテーブルに登録されていくはずです。
Kinesis Firehoseのテストデータを流し込んで、しばらくしたら停止します。
停止してしばらく(5分程度)するとS3にファイルが配置されるので、以下のクエリで確認します。
USE ROLE OOTAKA_SANDBOX_ROLE; USE DATABASE OOTAKA_SANDBOX_DB; USE SCHEMA PUBLIC; SELECT COUNT(*) from log;
╒══════════╕ │ COUNT(*) │ ╞══════════╡ │ 780 │ ╘══════════╛ 1 Row(s) produced. Time Elapsed: 0.125s
データロード前は300だったのですが、780に変わりました。Snowpipeによって、logテーブルにデータが追加でロードされたことが分かります。
今度はストリームとタスクによって、データが流れているかを確認してみましょう。タスクが1分毎に動作するので、1分程度待ってから、stockテーブルを確認してみます。
SELECT COUNT(*) FROM stock;
╒══════════╕ │ COUNT(*) │ ╞══════════╡ │ 480 │ ╘══════════╛ 1 Row(s) produced. Time Elapsed: 0.696s
差分データの480件がちゃんと登録されていますね。データの中身も少しみてみます。
USE WAREHOUSE X_SMALL_WH; SELECT * FROM stock LIMIT 5;
╒═══════════════╤════════════╤════════╤════════╕ │ TICKER_SYMBOL │ SECTOR │ PRICE │ CHANGE │ ╞═══════════════╪════════════╪════════╪════════╡ │ IOP │ TECHNOLOGY │ 117.00 │ -1.00 │ ├───────────────┼────────────┼────────┼────────┤ │ IOP │ TECHNOLOGY │ 117.00 │ -1.00 │ ├───────────────┼────────────┼────────┼────────┤ │ QAZ │ FINANCIAL │ 192.00 │ -3.00 │ ├───────────────┼────────────┼────────┼────────┤ │ SED │ HEALTHCARE │ 2.00 │ 0.00 │ ├───────────────┼────────────┼────────┼────────┤ │ PPL │ HEALTHCARE │ 31.00 │ 1.00 │ ╘═══════════════╧════════════╧════════╧════════╛ 5 Row(s) produced. Time Elapsed: 0.557s
想定通り、各カラムに値が入っていますね!
おまけ:タスクの停止
タスクはしばらく利用しないので、以下のクエリで停止しておきます。
USE ROLE OOTAKA_SANDBOX_ROLE; USE DATABASE OOTAKA_SANDBOX_DB; ALTER TASK log_to_stock SUSPEND;
まとめ
以上、Snowflakeでデータパイプラインを作ってみました。今回はとてもシンプルなものを作成しましたが、うまく組み合わせてあげることでSnowflakeだけでデータパイプラインを構築することもできそうです。
更には、Snowsightを利用することでデータの可視化も可能で、まさに「クラウドデータプラットフォーム」という感じで、とても面白いですね。
どなたかのお役に立てば幸いです。それでは!

















