Snowflakeのイケてる機能「ストリーム」を使用してテーブルの変更履歴を追跡する
奈良県でリモートワーク中の玉井です。
今回はSnowflakeのストリームという機能を紹介します。
ストリームとは
超概要
ストリームオブジェクトは、挿入、更新、削除などのテーブルに加えられたデータ操作言語(DML)の変更、および各変更に関するメタデータを記録し、変更されたデータを使用してアクションを実行できるようにします。このプロセスは、変更データキャプチャ(CDC)と呼ばれます。個々のテーブルストリームは、 ソーステーブル の行に加えられた変更を追跡します。テーブルストリーム(単に「ストリーム」とも呼ばれます)は、テーブル内の2つのトランザクションポイント間で行レベルで変更された内容の「変更テーブル」を利用可能にします。これにより、トランザクション形式で一連の変更レコードをクエリおよび使用できます。
テーブルやビュー等と同じように扱えるオブジェクトとなっており、こちらをテーブルに対して設定すると、そのテーブルの変更履歴を追跡・記録してくれます。上記ドキュメントにあるように、「変更データキャプチャ」という手法・仕組みを実現できるものとなっています。
何がうれしいの?
「テーブルの変更履歴が記録できる」ということの何が凄いのでしょうか。この機能は、いわゆる「ELT」処理で利用されることが想定されており、「Snowpipe」「タスク」という別機能と組み合わせることで、データパイプライン処理の構築において絶大な威力を発揮します。
そこらへんについては、記事の後半で触れますので、まずは実際にストリームを使ってみましょう。
やってみた
元ネタ
下記にチュートリアル的なものがあるので、こちらを実際にやってみつつ、ストリームという機能に対する理解を深めたいと思います。
テーブル等の準備
まず、テーブル2つとストリーム1つを作成します。
CREATE OR REPLACE TABLE nation ( n_nationkey number, n_name varchar(25), n_regionkey number, n_comment varchar(152), country_code varchar(2), update_timestamp timestamp_ntz ); CREATE OR REPLACE TABLE nation_history ( n_nationkey number, n_name varchar(25), n_regionkey number, n_comment varchar(152), country_code varchar(2), start_time timestamp_ntz, end_time timestamp_ntz, current_flag int ); CREATE OR REPLACE STREAM nation_table_changes ON TABLE nation;
nationテーブルは、色々な処理(更新や削除など)が行われるテーブルです。常に最新状態を保つため、更新については上書きされます。
nation_historyは、nationテーブル内の各種変更について格納する、という役割のテーブルです。start_time〜end_timeが、そのデータが有効であった期間を指します。current_flagは、その名の通り、そのデータが現在有効であるかどうかを示します。
nation_table_changesは、今回の主人公であるストリームオブジェクトとなります。このストリームの対象はnaitonテーブルですので、nation_table_changesは、nationに対する各種変更を追跡するオブジェクトとなります。
ストリームの情報を表示する
SHOW STREAMS;
で、現在存在するストリームを表示することができます。
ストリームに関するメタデータを確認できます。
ストリームの中身を表示する
先程作成した「nation_table_changes」ストリームの内容を見ます。見方としては、テーブルを参照するのと同じ感じでいけます。
SELECT * FROM nation_table_changes;
データとしては空っぽですね。それもそのはず、記録対象であるnationテーブルに対して、まだ何も処理を行っていないからです。
ここで確認しておきたいのは、右3つのカラムです。
METADATA$ACTION
変更処理の種類…INSERT
かDELETE
が入ります。
METADATA$ISUPDATE
変更処理の種類が更新処理(UPDATE)かどうかを示します。例えば、行の更新がかかった場合、ストリームには「旧データのDELETE」と「新データのINSERT」の2レコードが追加され、それぞれの当フラグは真となります(更新だから)。
METADATA$ROW_ID
変更が行われたレコードに対する一意のIDです。このIDを用いることで、特定のレコードに対する変更を追跡することができます。
NATION_HISTORYテーブルに変更を反映したデータがロードされるようにする
nation_historyテーブルは、nationテーブルで行われた変更を格納するテーブルですので、実際にnationテーブルで変更が行われた際に、nation_historyテーブルにデータが格納されるようにします。かなり長いクエリですが、いくつかの処理をUNIONしているので長くなっているだけです。
CREATE OR REPLACE VIEW nation_change_data AS -- nationテーブルにデータが挿入されたときの処理 -- nationテーブルにデータを挿入すると、nation_historyテーブルにINSERTを実行する SELECT n_nationkey, n_name, n_regionkey, n_comment, country_code, start_time, end_time, current_flag, 'I' AS dml_type FROM ( SELECT n_nationkey, n_name, n_regionkey, n_comment, country_code, update_timestamp AS start_time, lag(update_timestamp) over ( PARTITION by n_nationkey ORDER BY update_timestamp DESC ) AS end_time_raw, case WHEN end_time_raw IS NULL THEN '9999-12-31'::timestamp_ntz ELSE end_time_raw end AS end_time, case WHEN end_time_raw IS NULL THEN 1 ELSE 0 end AS current_flag FROM ( SELECT n_nationkey, n_name, n_regionkey, n_comment, country_code, update_timestamp FROM nation_table_changes WHERE metadata$ACTION = 'INSERT' AND metadata$isupdate = 'FALSE' ) ) UNION -- nationテーブルでデータが更新されたときの処理 -- nationテーブルを更新すると、nation_historyテーブルにINSERTを実行する -- 以下のサブクエリは、それぞれ異なるdml_typeを持つ2つのレコードを生成する SELECT n_nationkey, n_name, n_regionkey, n_comment, country_code, start_time, end_time, current_flag, dml_type FROM ( SELECT n_nationkey, n_name, n_regionkey, n_comment, country_code, update_timestamp AS start_time, lag(update_timestamp) over ( PARTITION by n_nationkey ORDER BY update_timestamp DESC ) AS end_time_raw, case WHEN end_time_raw IS NULL THEN '9999-12-31' :: timestamp_ntz ELSE end_time_raw end AS end_time, case WHEN end_time_raw IS NULL THEN 1 ELSE 0 end AS current_flag, dml_type FROM ( -- nation_historyテーブルに挿入するデータを特定する SELECT n_nationkey, n_name, n_regionkey, n_comment, country_code, update_timestamp, 'I' AS dml_type FROM nation_table_changes WHERE metadata$ACTION = 'INSERT' AND metadata$isupdate = 'TRUE' UNION -- 更新が必要なnation_historyテーブルのデータを特定する SELECT n_nationkey, NULL, NULL, NULL, NULL, start_time, 'U' AS dml_type FROM nation_history WHERE n_nationkey IN ( SELECT DISTINCT n_nationkey FROM nation_table_changes WHERE metadata$ACTION = 'INSERT' AND metadata$isupdate = 'TRUE' ) AND current_flag = 1 ) ) UNION -- nationテーブルでデータが削除されたときの処理 -- nationテーブルのデータを削除すると、nation_historyテーブルにUPDATEを実行する SELECT nms.n_nationkey, NULL, NULL, NULL, NULL, nh.start_time, CURRENT_TIMESTAMP()::timestamp_ntz, NULL, 'D' FROM nation_history nh INNER JOIN nation_table_changes nms ON nh.n_nationkey = nms.n_nationkey WHERE nms.metadata$ACTION = 'DELETE' AND nms.metadata$isupdate = 'FALSE' AND nh.current_flag = 1;
このビューは、nation_historyにINSERTまたはUPDATEする内容を決定するロジックを保持するします。
ストリームが上記クエリでふんだんに使われていることがわかります。ちなみに、上記を実行した後、ストリームを見ても、まだ何もデータはありません。それは、まだ実際にデータを追加したり更新したり削除したりしていないからです。
上記のビューを作成した後は、それを利用したMERGE文を実行します。
MERGE INTO nation_history nh -- nationテーブルの変更をマージする対象テーブルを指定する USING nation_change_data m -- さっき作ったビュー -- n_nationkeyとstart_timeで、nation_historyテーブルに一意のレコードがあるかどうかを判断する ON nh.n_nationkey = m.n_nationkey AND nh.start_time = m.start_time -- 更新されたレコードは、現在のレコードではなくなったので、end_timeにスタンプを押す必要がある WHEN matched AND m.dml_type = 'U' THEN UPDATE SET nh.end_time = m.end_time, nh.current_flag = 0 -- 削除処理は基本的に論理削除。レコードにスタンプをつけて、新しいバージョンはINSERTしない WHEN matched AND m.dml_type = 'D' THEN UPDATE SET nh.end_time = m.end_time, nh.current_flag = 0 -- 新しいn_nationkeyの挿入と既存のn_nationkeyの更新は、どちらもINSERTとして処理 WHEN NOT matched AND m.dml_type = 'I' THEN INSERT ( n_nationkey, n_name, n_regionkey, n_comment, country_code, start_time, end_time, current_flag ) VALUES ( m.n_nationkey, m.n_name, m.n_regionkey, m.n_comment, m.country_code, m.start_time, m.end_time, m.current_flag );
…実行しても、何も変化はありません。それもそのはず、まだデータがないからです。というわけで、そろそろデータを入れていきたいと思います。
データを入れる
下記のクエリでデータを入れます。
SET update_timestamp = current_timestamp()::timestamp_ntz; BEGIN; INSERT INTO nation VALUES(0,'ALGERIA',0,' haggle. carefully final deposits detect slyly agai','DZ',$update_timestamp); INSERT INTO nation VALUES(1,'ARGENTINA',1,'al foxes promise slyly according to the regular accounts. bold requests alon','AR',$update_timestamp); INSERT INTO nation VALUES(2,'BRAZIL',1,'y alongside of the pending deposits. carefully special packages are about the ironic forges. slyly special ','BR',$update_timestamp); INSERT INTO nation VALUES(3,'CANADA',1,'eas hang ironic silent packages. slyly regular packages are furiously over the tithes. fluffily bold','CA',$update_timestamp); INSERT INTO nation VALUES(4,'EGYPT',4,'y above the carefully unusual theodolites. final dugouts are quickly across the furiously regular d','EG',$update_timestamp); INSERT INTO nation VALUES(5,'ETHIOPIA',0,'ven packages wake quickly. regu','ET',$update_timestamp); INSERT INTO nation VALUES(6,'FRANCE',3,'refully final requests. regular ironi','FR',$update_timestamp); INSERT INTO nation VALUES(7,'GERMANY',3,'l platelets. regular accounts x-ray: unusual regular acco','DE',$update_timestamp); INSERT INTO nation VALUES(8,'INDIA',2,'ss excuses cajole slyly across the packages. deposits print aroun','IN',$update_timestamp); INSERT INTO nation VALUES(9,'INDONESIA',2,' slyly express asymptotes. regular deposits haggle slyly. carefully ironic hockey players sleep blithely. carefull','ID',$update_timestamp); INSERT INTO nation VALUES(10,'IRAN',4,'efully alongside of the slyly final dependencies. ','IR',$update_timestamp); INSERT INTO nation VALUES(11,'IRAQ',4,'nic deposits boost atop the quickly final requests? quickly regula','IQ',$update_timestamp); INSERT INTO nation VALUES(12,'JAPAN',2,'ously. final express gifts cajole a','JP',$update_timestamp); INSERT INTO nation VALUES(13,'JORDAN',4,'ic deposits are blithely about the carefully regular pa','JO',$update_timestamp); INSERT INTO nation VALUES(14,'KENYA',0,' pending excuses haggle furiously deposits. pending express pinto beans wake fluffily past t','KE',$update_timestamp); INSERT INTO nation VALUES(15,'MOROCCO',0,'rns. blithely bold courts among the closely regular packages use furiously bold platelets?','MA',$update_timestamp); INSERT INTO nation VALUES(16,'MOZAMBIQUE',0,'s. ironic unusual asymptotes wake blithely r','MZ',$update_timestamp); INSERT INTO nation VALUES(17,'PERU',1,'platelets. blithely pending dependencies use fluffily across the even pinto beans. carefully silent accoun','PE',$update_timestamp); INSERT INTO nation VALUES(18,'CHINA',2,'c dependencies. furiously express notornis sleep slyly regular accounts. ideas sleep. depos','CN',$update_timestamp); INSERT INTO nation VALUES(19,'ROMANIA',3,'ular asymptotes are about the furious multipliers. express dependencies nag above the ironically ironic account','RO',$update_timestamp); INSERT INTO nation VALUES(20,'SAUDI ARABIA',4,'ts. silent requests haggle. closely express packages sleep across the blithely','SA',$update_timestamp); INSERT INTO nation VALUES(21,'VIETNAM',2,'hely enticingly express accounts. even final ','VN',$update_timestamp); INSERT INTO nation VALUES(22,'RUSSIA',3,' requests against the platelets use never according to the quickly regular pint','RU',$update_timestamp); INSERT INTO nation VALUES(23,'UNITED KINGDOM',3,'eans boost carefully special requests. accounts are. carefull','GB',$update_timestamp); INSERT INTO nation VALUES(24,'UNITED STATES',1,'y final packages. slow foxes cajole quickly. quickly silent platelets breach ironic accounts. unusual pinto be','US',$update_timestamp); COMMIT;
nationテーブルを確認します。データが入ってますね。
nationテーブルに20数件のデータが「挿入」されました。ということは、それらはストリームにも記録されているはずです。ということで、ストリームを見てみましょう。
ストリームがテーブルに対する変更処理を記録しています。METADATA$ACTION
がINSERTとなっているところに注目です。
ストリームを使う
ストリームに変更履歴が記録されました。では、この記録データを使って、nation_historyテーブルにデータをロードしてみましょう。先程用意したMERGE文を再度実行します。
25件分実行されました。つまり、nation_historyテーブルに25件挿入されたということです。
nation_historyテーブルを見てみましょう。
start_timeやcurrent_flgにデータがちゃんと入っていますね。ストリームを利用して、変更があったデータを確実に処理できています。
ここで、もう一度ストリームを確認します。
なんと空っぽになっています。実は、ストリームの仕様として、ストリームのデータをDML文で使用すると、使用した分のデータはストリームから外れます(消費扱い)。これは、1度実施した処理を誤って再度行わないためです。
こういった形で、ストリームを利用して、変更があったデータだけを確実に狙って処理することができます。
ストリームの使いどころ
Snowflake上でデータパイプラインを簡単に構築できる
冒頭でも触れましたが、Snowflakeのストリームは、ELT処理で使われることを想定しています。
一旦ストリームを設置してしまえば、後は挿入や更新等を自動でキャッチしてくれます。先程の例では、手動で一回データを入れただけなので、その恩恵が分かりにくいと思いますが、これが例えば「自動で定期的にデータが入ってくる」ようなケースだとどうでしょうか。昨今、SaaSやIoTなど、データがひっきりなしに送られてくる環境は珍しくありません。
Snowflakeには「Snowpipe」という機能があります。ステージというオブジェクトに置かれたファイルを継続的にテーブルにロードされるようにできる、というものです。ここにストリームを加えることで、Snowpipeで送られてくるデータの流れをストリームで補足し続けることができます。Snowpipeの対象にはAmazon S3等も含まれているため、「S3に定期的にファイルを配置する」→「Snowpipeで継続的にロード」→「ストリームで変更を自動追跡」ということが簡単にできます。
さらに、ここに「タスク」という機能も加えることができます。簡単にいうと、事前に定義した処理(SQLやストアド)をスケジュール実行させるというものです。先程「ストリームのデータを消費して処理する」ということをやりましたが、こういった処理をタスク化することができます。つまり、「S3に定期的にファイルを配置する」→「Snowpipeで継続的にロード」→「ストリームで変更を自動追跡」→「ストリームデータを利用したデータ前処理をタスク化」という流れができます。
要するに、いわゆる「データパイプライン」という一連の処理を、Snowflakeだけで簡単に構築できてしまいます。例えば「センサーから吐き出されるログファイル→BIツール用のデータマート」という定期的な処理を、Snowflake上で実行させ続けることができます。非常に強力な機能群だと思います。
ストリームその他もろもろ
APPEND_ONLYオプション
ストリームに対して、こちらのオプションをTRUEとすると、そのストリームの記録対象を「データの挿入のみ」にすることができます(更新や削除は追わない)。データの性質や処理内容に合わせて使い分けたいですね。
おわりに
Snowflakeのストリームを体感しました。Snowpipeやタスクとの組み合わせは本当に凄いと思うので、そちらもブログにしたいですね。