Snowflakeでデータパイプラインを作ってみた

2020.10.22

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは!DA(データアナリティクス)事業本部 インテグレーション部の大高です。

Snowflakeでは、以下のドキュメントでも記載されている通り、Snowflakeの機能を組み合わせてデータパイプラインを作成することができます。

分類 Snowflakeの機能
連続データロード Snowpipe
変更データの追跡 ストリーム
定期的なタスク タスク

今回はこのドキュメントの内容を踏まえて、実際に以下のようなイメージのデータパイプラインを作成してみたいと思います。

データの流れですが、以下のようなJSONL形式のデータがSnowpipe経由でLogテーブルに投入されてきます。

{"ticker_symbol":"ALY","sector":"ENERGY","change":-1.01,"price":88.09}
{"ticker_symbol":"TBV","sector":"HEALTHCARE","change":8.62,"price":199.62}
{"ticker_symbol":"ASD","sector":"FINANCIAL","change":-1.39,"price":64.41}
...

LogテーブルはVARIANT型のカラムRecordのみで構成されており、これをカラム毎に型定義されているStockテーブルに移していく流れとします。

Snowpipe

では、早速作成していきます。まずはSnowpipeの作成なのですが、Snowpipeの作成については、以下のエントリで作成したものをそのまま利用します。なお、元データについてもこのエントリでテストとして利用したKinesis FirehoseのDelivery Streamから送られるテストデータを利用します。

このSnowpipeによって、S3(外部ステージ)に配置されたデータが、データを配置したタイミングでlogテーブルにロードされるようになります。

ストリーム

ストリームは、テーブルのレコードに対して行った変更(差分データ)を追跡できるオブジェクトです。これを利用してlogテーブルのレコードに対する差分を検出します。 今回、ストリームは以下のように生データを投入しているテーブルlogに対して作成しておきます。

USE ROLE OOTAKA_SANDBOX_ROLE;
USE DATABASE OOTAKA_SANDBOX_DB;
USE SCHEMA PUBLIC;

CREATE OR REPLACE STREAM log_stream ON TABLE log;

タスク

最後にタスクです。タスクは定期的にSQLクエリを実行可能なオブジェクトで、ツリー関係(タスクAの実行後にタスクBを実行など)を持つこともできます。

今回は定期実行するタスクを1つだけ作成します。

まずは、タスクの前にデータ投入先のstockテーブルを作成します。

USE ROLE OOTAKA_SANDBOX_ROLE;
USE DATABASE OOTAKA_SANDBOX_DB;
USE SCHEMA PUBLIC;

CREATE OR REPLACE TABLE stock (
  ticker_symbol VARCHAR(5),
  sector VARCHAR(16),
  price  NUMBER(8,2),
  change  NUMBER(8,2)
);

次にタスクを作成します。タスクはシンプルなタスクで「VARIANT型だったデータをlogテーブルのストリームを利用して読み込み、各カラム毎に分けて型定義したstockテーブルに移す」というタスクとします。また、スケジュール設定は「1分毎の実行」とします。

USE ROLE OOTAKA_SANDBOX_ROLE;
USE DATABASE OOTAKA_SANDBOX_DB;
USE SCHEMA PUBLIC;

CREATE OR REPLACE TASK log_to_stock
  WAREHOUSE = X_SMALL_WH
  SCHEDULE = '1 minute'
WHEN
  SYSTEM$STREAM_HAS_DATA('log_stream')
AS
  INSERT INTO stock
  SELECT
    record:ticker_symbol::VARCHAR AS ticker_symbol,
    record:sector::VARCHAR AS sector,
    record:price::NUMBER AS price,
    record:change::NUMBER AS change
  FROM
    log_stream
;

これでタスクが出来ましたが、この段階ではまだタスクは「一時停止」した状態なので開始してあげる必要があります。

タスクの起動には「グローバル権限」のEXECUTE TASKが必要となるので、ACCOUNTADMINでカスタムロールのOOTAKA_SANDBOX_ROLEに権限を付与します。

USE ROLE ACCOUNTADMIN;
GRANT EXECUTE TASK ON ACCOUNT TO ROLE OOTAKA_SANDBOX_ROLE;

その後、OOTAKA_SANDBOX_ROLEにロールを再度切り替えてタスクを起動します。

USE ROLE OOTAKA_SANDBOX_ROLE;
USE DATABASE OOTAKA_SANDBOX_DB;

ALTER TASK log_to_stock RESUME;

これでタスクが起動状態になりました。

パイプラインを試してみる

これで、下準備ができたので実際にデータを流してみます!

想定通りであれば、S3にファイルが送信された結果、Streamに差分データが溜まっていき、1分に一度のタイミングでstockテーブルに登録されていくはずです。

Kinesis Firehoseのテストデータを流し込んで、しばらくしたら停止します。

停止してしばらく(5分程度)するとS3にファイルが配置されるので、以下のクエリで確認します。

USE ROLE OOTAKA_SANDBOX_ROLE;
USE DATABASE OOTAKA_SANDBOX_DB;
USE SCHEMA PUBLIC;

SELECT COUNT(*) from log;
╒══════════╕
│ COUNT(*) │
╞══════════╡
│      780 │
╘══════════╛
1 Row(s) produced. Time Elapsed: 0.125s

データロード前は300だったのですが、780に変わりました。Snowpipeによって、logテーブルにデータが追加でロードされたことが分かります。

今度はストリームとタスクによって、データが流れているかを確認してみましょう。タスクが1分毎に動作するので、1分程度待ってから、stockテーブルを確認してみます。

SELECT COUNT(*) FROM stock;
╒══════════╕
│ COUNT(*) │
╞══════════╡
│      480 │
╘══════════╛
1 Row(s) produced. Time Elapsed: 0.696s

差分データの480件がちゃんと登録されていますね。データの中身も少しみてみます。

USE WAREHOUSE X_SMALL_WH;

SELECT * FROM stock LIMIT 5;
╒═══════════════╤════════════╤════════╤════════╕
│ TICKER_SYMBOL │ SECTOR     │  PRICE │ CHANGE │
╞═══════════════╪════════════╪════════╪════════╡
│ IOP           │ TECHNOLOGY │ 117.00 │  -1.00 │
├───────────────┼────────────┼────────┼────────┤
│ IOP           │ TECHNOLOGY │ 117.00 │  -1.00 │
├───────────────┼────────────┼────────┼────────┤
│ QAZ           │ FINANCIAL  │ 192.00 │  -3.00 │
├───────────────┼────────────┼────────┼────────┤
│ SED           │ HEALTHCARE │   2.00 │   0.00 │
├───────────────┼────────────┼────────┼────────┤
│ PPL           │ HEALTHCARE │  31.00 │   1.00 │
╘═══════════════╧════════════╧════════╧════════╛
5 Row(s) produced. Time Elapsed: 0.557s

想定通り、各カラムに値が入っていますね!

おまけ:タスクの停止

タスクはしばらく利用しないので、以下のクエリで停止しておきます。

USE ROLE OOTAKA_SANDBOX_ROLE;
USE DATABASE OOTAKA_SANDBOX_DB;

ALTER TASK log_to_stock SUSPEND;

まとめ

以上、Snowflakeでデータパイプラインを作ってみました。今回はとてもシンプルなものを作成しましたが、うまく組み合わせてあげることでSnowflakeだけでデータパイプラインを構築することもできそうです。

更には、Snowsightを利用することでデータの可視化も可能で、まさに「クラウドデータプラットフォーム」という感じで、とても面白いですね。

どなたかのお役に立てば幸いです。それでは!