Snowflakeでデータパイプラインを作ってみた
こんにちは!DA(データアナリティクス)事業本部 インテグレーション部の大高です。
Snowflakeでは、以下のドキュメントでも記載されている通り、Snowflakeの機能を組み合わせてデータパイプラインを作成することができます。
分類 | Snowflakeの機能 |
---|---|
連続データロード | Snowpipe |
変更データの追跡 | ストリーム |
定期的なタスク | タスク |
今回はこのドキュメントの内容を踏まえて、実際に以下のようなイメージのデータパイプラインを作成してみたいと思います。
データの流れですが、以下のようなJSONL形式のデータがSnowpipe経由でLog
テーブルに投入されてきます。
{"ticker_symbol":"ALY","sector":"ENERGY","change":-1.01,"price":88.09} {"ticker_symbol":"TBV","sector":"HEALTHCARE","change":8.62,"price":199.62} {"ticker_symbol":"ASD","sector":"FINANCIAL","change":-1.39,"price":64.41} ...
Log
テーブルはVARIANT型のカラムRecord
のみで構成されており、これをカラム毎に型定義されているStock
テーブルに移していく流れとします。
Snowpipe
では、早速作成していきます。まずはSnowpipeの作成なのですが、Snowpipeの作成については、以下のエントリで作成したものをそのまま利用します。なお、元データについてもこのエントリでテストとして利用したKinesis FirehoseのDelivery Streamから送られるテストデータを利用します。
このSnowpipeによって、S3(外部ステージ)に配置されたデータが、データを配置したタイミングでlog
テーブルにロードされるようになります。
ストリーム
ストリームは、テーブルのレコードに対して行った変更(差分データ)を追跡できるオブジェクトです。これを利用してlog
テーブルのレコードに対する差分を検出します。
今回、ストリームは以下のように生データを投入しているテーブルlog
に対して作成しておきます。
USE ROLE OOTAKA_SANDBOX_ROLE; USE DATABASE OOTAKA_SANDBOX_DB; USE SCHEMA PUBLIC; CREATE OR REPLACE STREAM log_stream ON TABLE log;
タスク
最後にタスクです。タスクは定期的にSQLクエリを実行可能なオブジェクトで、ツリー関係(タスクAの実行後にタスクBを実行など)を持つこともできます。
今回は定期実行するタスクを1つだけ作成します。
まずは、タスクの前にデータ投入先のstock
テーブルを作成します。
USE ROLE OOTAKA_SANDBOX_ROLE; USE DATABASE OOTAKA_SANDBOX_DB; USE SCHEMA PUBLIC; CREATE OR REPLACE TABLE stock ( ticker_symbol VARCHAR(5), sector VARCHAR(16), price NUMBER(8,2), change NUMBER(8,2) );
次にタスクを作成します。タスクはシンプルなタスクで「VARIANT型だったデータをlog
テーブルのストリームを利用して読み込み、各カラム毎に分けて型定義したstock
テーブルに移す」というタスクとします。また、スケジュール設定は「1分毎の実行」とします。
USE ROLE OOTAKA_SANDBOX_ROLE; USE DATABASE OOTAKA_SANDBOX_DB; USE SCHEMA PUBLIC; CREATE OR REPLACE TASK log_to_stock WAREHOUSE = X_SMALL_WH SCHEDULE = '1 minute' WHEN SYSTEM$STREAM_HAS_DATA('log_stream') AS INSERT INTO stock SELECT record:ticker_symbol::VARCHAR AS ticker_symbol, record:sector::VARCHAR AS sector, record:price::NUMBER AS price, record:change::NUMBER AS change FROM log_stream ;
これでタスクが出来ましたが、この段階ではまだタスクは「一時停止」した状態なので開始してあげる必要があります。
タスクの起動には「グローバル権限」のEXECUTE TASK
が必要となるので、ACCOUNTADMIN
でカスタムロールのOOTAKA_SANDBOX_ROLE
に権限を付与します。
USE ROLE ACCOUNTADMIN; GRANT EXECUTE TASK ON ACCOUNT TO ROLE OOTAKA_SANDBOX_ROLE;
その後、OOTAKA_SANDBOX_ROLE
にロールを再度切り替えてタスクを起動します。
USE ROLE OOTAKA_SANDBOX_ROLE; USE DATABASE OOTAKA_SANDBOX_DB; ALTER TASK log_to_stock RESUME;
これでタスクが起動状態になりました。
パイプラインを試してみる
これで、下準備ができたので実際にデータを流してみます!
想定通りであれば、S3にファイルが送信された結果、Streamに差分データが溜まっていき、1分に一度のタイミングでstock
テーブルに登録されていくはずです。
Kinesis Firehoseのテストデータを流し込んで、しばらくしたら停止します。
停止してしばらく(5分程度)するとS3にファイルが配置されるので、以下のクエリで確認します。
USE ROLE OOTAKA_SANDBOX_ROLE; USE DATABASE OOTAKA_SANDBOX_DB; USE SCHEMA PUBLIC; SELECT COUNT(*) from log;
╒══════════╕ │ COUNT(*) │ ╞══════════╡ │ 780 │ ╘══════════╛ 1 Row(s) produced. Time Elapsed: 0.125s
データロード前は300
だったのですが、780
に変わりました。Snowpipeによって、log
テーブルにデータが追加でロードされたことが分かります。
今度はストリームとタスクによって、データが流れているかを確認してみましょう。タスクが1分毎に動作するので、1分程度待ってから、stock
テーブルを確認してみます。
SELECT COUNT(*) FROM stock;
╒══════════╕ │ COUNT(*) │ ╞══════════╡ │ 480 │ ╘══════════╛ 1 Row(s) produced. Time Elapsed: 0.696s
差分データの480件がちゃんと登録されていますね。データの中身も少しみてみます。
USE WAREHOUSE X_SMALL_WH; SELECT * FROM stock LIMIT 5;
╒═══════════════╤════════════╤════════╤════════╕ │ TICKER_SYMBOL │ SECTOR │ PRICE │ CHANGE │ ╞═══════════════╪════════════╪════════╪════════╡ │ IOP │ TECHNOLOGY │ 117.00 │ -1.00 │ ├───────────────┼────────────┼────────┼────────┤ │ IOP │ TECHNOLOGY │ 117.00 │ -1.00 │ ├───────────────┼────────────┼────────┼────────┤ │ QAZ │ FINANCIAL │ 192.00 │ -3.00 │ ├───────────────┼────────────┼────────┼────────┤ │ SED │ HEALTHCARE │ 2.00 │ 0.00 │ ├───────────────┼────────────┼────────┼────────┤ │ PPL │ HEALTHCARE │ 31.00 │ 1.00 │ ╘═══════════════╧════════════╧════════╧════════╛ 5 Row(s) produced. Time Elapsed: 0.557s
想定通り、各カラムに値が入っていますね!
おまけ:タスクの停止
タスクはしばらく利用しないので、以下のクエリで停止しておきます。
USE ROLE OOTAKA_SANDBOX_ROLE; USE DATABASE OOTAKA_SANDBOX_DB; ALTER TASK log_to_stock SUSPEND;
まとめ
以上、Snowflakeでデータパイプラインを作ってみました。今回はとてもシンプルなものを作成しましたが、うまく組み合わせてあげることでSnowflakeだけでデータパイプラインを構築することもできそうです。
更には、Snowsightを利用することでデータの可視化も可能で、まさに「クラウドデータプラットフォーム」という感じで、とても面白いですね。
どなたかのお役に立てば幸いです。それでは!