Snowflakeのイケてる機能「ストリーム」を使用してテーブルの変更履歴を追跡する

流れに身を任せよう
2020.07.07

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

奈良県でリモートワーク中の玉井です。

今回はSnowflakeのストリームという機能を紹介します。

ストリームとは

超概要

ストリームオブジェクトは、挿入、更新、削除などのテーブルに加えられたデータ操作言語(DML)の変更、および各変更に関するメタデータを記録し、変更されたデータを使用してアクションを実行できるようにします。このプロセスは、変更データキャプチャ(CDC)と呼ばれます。個々のテーブルストリームは、 ソーステーブル の行に加えられた変更を追跡します。テーブルストリーム(単に「ストリーム」とも呼ばれます)は、テーブル内の2つのトランザクションポイント間で行レベルで変更された内容の「変更テーブル」を利用可能にします。これにより、トランザクション形式で一連の変更レコードをクエリおよび使用できます。

テーブルやビュー等と同じように扱えるオブジェクトとなっており、こちらをテーブルに対して設定すると、そのテーブルの変更履歴を追跡・記録してくれます。上記ドキュメントにあるように、「変更データキャプチャ」という手法・仕組みを実現できるものとなっています。

何がうれしいの?

「テーブルの変更履歴が記録できる」ということの何が凄いのでしょうか。この機能は、いわゆる「ELT」処理で利用されることが想定されており、「Snowpipe」「タスク」という別機能と組み合わせることで、データパイプライン処理の構築において絶大な威力を発揮します

そこらへんについては、記事の後半で触れますので、まずは実際にストリームを使ってみましょう。

やってみた

元ネタ

下記にチュートリアル的なものがあるので、こちらを実際にやってみつつ、ストリームという機能に対する理解を深めたいと思います。

テーブル等の準備

まず、テーブル2つとストリーム1つを作成します。

CREATE OR REPLACE TABLE nation (
    n_nationkey number,
    n_name varchar(25),
    n_regionkey number,
    n_comment varchar(152),
    country_code varchar(2),
    update_timestamp timestamp_ntz
);

CREATE OR REPLACE TABLE nation_history (
    n_nationkey number,
    n_name varchar(25),
    n_regionkey number,
    n_comment varchar(152),
    country_code varchar(2),
    start_time timestamp_ntz,
    end_time timestamp_ntz,
    current_flag int
);

CREATE OR REPLACE STREAM nation_table_changes ON TABLE nation;

nationテーブルは、色々な処理(更新や削除など)が行われるテーブルです。常に最新状態を保つため、更新については上書きされます。

nation_historyは、nationテーブル内の各種変更について格納する、という役割のテーブルです。start_time〜end_timeが、そのデータが有効であった期間を指します。current_flagは、その名の通り、そのデータが現在有効であるかどうかを示します。

nation_table_changesは、今回の主人公であるストリームオブジェクトとなります。このストリームの対象はnaitonテーブルですので、nation_table_changesは、nationに対する各種変更を追跡するオブジェクトとなります。

ストリームの情報を表示する

SHOW STREAMS;で、現在存在するストリームを表示することができます。

ストリームに関するメタデータを確認できます。

ストリームの中身を表示する

先程作成した「nation_table_changes」ストリームの内容を見ます。見方としては、テーブルを参照するのと同じ感じでいけます。

SELECT
    *
FROM
    nation_table_changes;

データとしては空っぽですね。それもそのはず、記録対象であるnationテーブルに対して、まだ何も処理を行っていないからです。

ここで確認しておきたいのは、右3つのカラムです。

METADATA$ACTION

変更処理の種類…INSERTDELETEが入ります。

METADATA$ISUPDATE

変更処理の種類が更新処理(UPDATE)かどうかを示します。例えば、行の更新がかかった場合、ストリームには「旧データのDELETE」と「新データのINSERT」の2レコードが追加され、それぞれの当フラグは真となります(更新だから)。

METADATA$ROW_ID

変更が行われたレコードに対する一意のIDです。このIDを用いることで、特定のレコードに対する変更を追跡することができます。

NATION_HISTORYテーブルに変更を反映したデータがロードされるようにする

nation_historyテーブルは、nationテーブルで行われた変更を格納するテーブルですので、実際にnationテーブルで変更が行われた際に、nation_historyテーブルにデータが格納されるようにします。かなり長いクエリですが、いくつかの処理をUNIONしているので長くなっているだけです。

CREATE OR REPLACE VIEW nation_change_data AS 
-- nationテーブルにデータが挿入されたときの処理
-- nationテーブルにデータを挿入すると、nation_historyテーブルにINSERTを実行する
SELECT
    n_nationkey,
    n_name,
    n_regionkey,
    n_comment,
    country_code,
    start_time,
    end_time,
    current_flag,
    'I' AS dml_type
FROM
    (
        SELECT
            n_nationkey,
            n_name,
            n_regionkey,
            n_comment,
            country_code,
            update_timestamp AS start_time,
            lag(update_timestamp) over (
                PARTITION by n_nationkey
                ORDER BY
                    update_timestamp DESC
            ) AS end_time_raw,
            case
                WHEN end_time_raw IS NULL THEN '9999-12-31'::timestamp_ntz
                ELSE end_time_raw
            end AS end_time,
            case
                WHEN end_time_raw IS NULL THEN 1
                ELSE 0
            end AS current_flag
        FROM
            (
                SELECT
                    n_nationkey,
                    n_name,
                    n_regionkey,
                    n_comment,
                    country_code,
                    update_timestamp
                FROM
                    nation_table_changes
                WHERE
                    metadata$ACTION = 'INSERT'
                    AND metadata$isupdate = 'FALSE'
            )
    )
UNION
    -- nationテーブルでデータが更新されたときの処理
    -- nationテーブルを更新すると、nation_historyテーブルにINSERTを実行する
    -- 以下のサブクエリは、それぞれ異なるdml_typeを持つ2つのレコードを生成する
SELECT
    n_nationkey,
    n_name,
    n_regionkey,
    n_comment,
    country_code,
    start_time,
    end_time,
    current_flag,
    dml_type
FROM
    (
        SELECT
            n_nationkey,
            n_name,
            n_regionkey,
            n_comment,
            country_code,
            update_timestamp AS start_time,
            lag(update_timestamp) over (
                PARTITION by n_nationkey
                ORDER BY
                    update_timestamp DESC
            ) AS end_time_raw,
            case
                WHEN end_time_raw IS NULL THEN '9999-12-31' :: timestamp_ntz
                ELSE end_time_raw
            end AS end_time,
            case
                WHEN end_time_raw IS NULL THEN 1
                ELSE 0
            end AS current_flag,
            dml_type
        FROM
            (
                -- nation_historyテーブルに挿入するデータを特定する
                SELECT
                    n_nationkey,
                    n_name,
                    n_regionkey,
                    n_comment,
                    country_code,
                    update_timestamp,
                    'I' AS dml_type
                FROM
                    nation_table_changes
                WHERE
                    metadata$ACTION = 'INSERT'
                    AND metadata$isupdate = 'TRUE'
                UNION
                    -- 更新が必要なnation_historyテーブルのデータを特定する
                SELECT
                    n_nationkey,
                    NULL,
                    NULL,
                    NULL,
                    NULL,
                    start_time,
                    'U' AS dml_type
                FROM
                    nation_history
                WHERE
                    n_nationkey IN (
                        SELECT
                            DISTINCT n_nationkey
                        FROM
                            nation_table_changes
                        WHERE
                            metadata$ACTION = 'INSERT'
                            AND metadata$isupdate = 'TRUE'
                    )
                    AND current_flag = 1
            )
    )
UNION
    -- nationテーブルでデータが削除されたときの処理
    -- nationテーブルのデータを削除すると、nation_historyテーブルにUPDATEを実行する
SELECT
    nms.n_nationkey,
    NULL,
    NULL,
    NULL,
    NULL,
    nh.start_time,
    CURRENT_TIMESTAMP()::timestamp_ntz,
    NULL,
    'D'
FROM
    nation_history nh
    INNER JOIN nation_table_changes nms ON nh.n_nationkey = nms.n_nationkey
WHERE
    nms.metadata$ACTION = 'DELETE'
    AND nms.metadata$isupdate = 'FALSE'
    AND nh.current_flag = 1;

このビューは、nation_historyにINSERTまたはUPDATEする内容を決定するロジックを保持するします。

ストリームが上記クエリでふんだんに使われていることがわかります。ちなみに、上記を実行した後、ストリームを見ても、まだ何もデータはありません。それは、まだ実際にデータを追加したり更新したり削除したりしていないからです。

上記のビューを作成した後は、それを利用したMERGE文を実行します。

MERGE INTO nation_history nh -- nationテーブルの変更をマージする対象テーブルを指定する
USING nation_change_data m -- さっき作ったビュー
-- n_nationkeyとstart_timeで、nation_historyテーブルに一意のレコードがあるかどうかを判断する
ON nh.n_nationkey = m.n_nationkey
AND nh.start_time = m.start_time
-- 更新されたレコードは、現在のレコードではなくなったので、end_timeにスタンプを押す必要がある
WHEN matched
AND m.dml_type = 'U' THEN
    UPDATE
    SET
        nh.end_time = m.end_time,
        nh.current_flag = 0
-- 削除処理は基本的に論理削除。レコードにスタンプをつけて、新しいバージョンはINSERTしない
WHEN matched
AND m.dml_type = 'D' THEN
    UPDATE
    SET
        nh.end_time = m.end_time,
        nh.current_flag = 0
-- 新しいn_nationkeyの挿入と既存のn_nationkeyの更新は、どちらもINSERTとして処理
WHEN NOT matched
AND m.dml_type = 'I' THEN
    INSERT
        (
            n_nationkey,
            n_name,
            n_regionkey,
            n_comment,
            country_code,
            start_time,
            end_time,
            current_flag
        )
    VALUES
        (
            m.n_nationkey,
            m.n_name,
            m.n_regionkey,
            m.n_comment,
            m.country_code,
            m.start_time,
            m.end_time,
            m.current_flag
        );

…実行しても、何も変化はありません。それもそのはず、まだデータがないからです。というわけで、そろそろデータを入れていきたいと思います。

データを入れる

下記のクエリでデータを入れます。

SET update_timestamp = current_timestamp()::timestamp_ntz;
BEGIN;
INSERT INTO nation VALUES(0,'ALGERIA',0,' haggle. carefully final deposits detect slyly agai','DZ',$update_timestamp);
INSERT INTO nation VALUES(1,'ARGENTINA',1,'al foxes promise slyly according to the regular accounts. bold requests alon','AR',$update_timestamp);
INSERT INTO nation VALUES(2,'BRAZIL',1,'y alongside of the pending deposits. carefully special packages are about the ironic forges. slyly special ','BR',$update_timestamp);
INSERT INTO nation VALUES(3,'CANADA',1,'eas hang ironic silent packages. slyly regular packages are furiously over the tithes. fluffily bold','CA',$update_timestamp);
INSERT INTO nation VALUES(4,'EGYPT',4,'y above the carefully unusual theodolites. final dugouts are quickly across the furiously regular d','EG',$update_timestamp);
INSERT INTO nation VALUES(5,'ETHIOPIA',0,'ven packages wake quickly. regu','ET',$update_timestamp);
INSERT INTO nation VALUES(6,'FRANCE',3,'refully final requests. regular ironi','FR',$update_timestamp);
INSERT INTO nation VALUES(7,'GERMANY',3,'l platelets. regular accounts x-ray: unusual regular acco','DE',$update_timestamp);
INSERT INTO nation VALUES(8,'INDIA',2,'ss excuses cajole slyly across the packages. deposits print aroun','IN',$update_timestamp);
INSERT INTO nation VALUES(9,'INDONESIA',2,' slyly express asymptotes. regular deposits haggle slyly. carefully ironic hockey players sleep blithely. carefull','ID',$update_timestamp);
INSERT INTO nation VALUES(10,'IRAN',4,'efully alongside of the slyly final dependencies. ','IR',$update_timestamp);
INSERT INTO nation VALUES(11,'IRAQ',4,'nic deposits boost atop the quickly final requests? quickly regula','IQ',$update_timestamp);
INSERT INTO nation VALUES(12,'JAPAN',2,'ously. final express gifts cajole a','JP',$update_timestamp);
INSERT INTO nation VALUES(13,'JORDAN',4,'ic deposits are blithely about the carefully regular pa','JO',$update_timestamp);
INSERT INTO nation VALUES(14,'KENYA',0,' pending excuses haggle furiously deposits. pending express pinto beans wake fluffily past t','KE',$update_timestamp);
INSERT INTO nation VALUES(15,'MOROCCO',0,'rns. blithely bold courts among the closely regular packages use furiously bold platelets?','MA',$update_timestamp);
INSERT INTO nation VALUES(16,'MOZAMBIQUE',0,'s. ironic unusual asymptotes wake blithely r','MZ',$update_timestamp);
INSERT INTO nation VALUES(17,'PERU',1,'platelets. blithely pending dependencies use fluffily across the even pinto beans. carefully silent accoun','PE',$update_timestamp);
INSERT INTO nation VALUES(18,'CHINA',2,'c dependencies. furiously express notornis sleep slyly regular accounts. ideas sleep. depos','CN',$update_timestamp);
INSERT INTO nation VALUES(19,'ROMANIA',3,'ular asymptotes are about the furious multipliers. express dependencies nag above the ironically ironic account','RO',$update_timestamp);
INSERT INTO nation VALUES(20,'SAUDI ARABIA',4,'ts. silent requests haggle. closely express packages sleep across the blithely','SA',$update_timestamp);
INSERT INTO nation VALUES(21,'VIETNAM',2,'hely enticingly express accounts. even final ','VN',$update_timestamp);
INSERT INTO nation VALUES(22,'RUSSIA',3,' requests against the platelets use never according to the quickly regular pint','RU',$update_timestamp);
INSERT INTO nation VALUES(23,'UNITED KINGDOM',3,'eans boost carefully special requests. accounts are. carefull','GB',$update_timestamp);
INSERT INTO nation VALUES(24,'UNITED STATES',1,'y final packages. slow foxes cajole quickly. quickly silent platelets breach ironic accounts. unusual pinto be','US',$update_timestamp);
COMMIT;

nationテーブルを確認します。データが入ってますね。

nationテーブルに20数件のデータが「挿入」されました。ということは、それらはストリームにも記録されているはずです。ということで、ストリームを見てみましょう。

ストリームがテーブルに対する変更処理を記録しています。METADATA$ACTIONがINSERTとなっているところに注目です。

ストリームを使う

ストリームに変更履歴が記録されました。では、この記録データを使って、nation_historyテーブルにデータをロードしてみましょう。先程用意したMERGE文を再度実行します。

25件分実行されました。つまり、nation_historyテーブルに25件挿入されたということです。

nation_historyテーブルを見てみましょう。

start_timeやcurrent_flgにデータがちゃんと入っていますね。ストリームを利用して、変更があったデータを確実に処理できています。

ここで、もう一度ストリームを確認します。

なんと空っぽになっています。実は、ストリームの仕様として、ストリームのデータをDML文で使用すると、使用した分のデータはストリームから外れます(消費扱い)。これは、1度実施した処理を誤って再度行わないためです。

こういった形で、ストリームを利用して、変更があったデータだけを確実に狙って処理することができます。

ストリームの使いどころ

Snowflake上でデータパイプラインを簡単に構築できる

冒頭でも触れましたが、Snowflakeのストリームは、ELT処理で使われることを想定しています。

一旦ストリームを設置してしまえば、後は挿入や更新等を自動でキャッチしてくれます。先程の例では、手動で一回データを入れただけなので、その恩恵が分かりにくいと思いますが、これが例えば「自動で定期的にデータが入ってくる」ようなケースだとどうでしょうか。昨今、SaaSやIoTなど、データがひっきりなしに送られてくる環境は珍しくありません。

Snowflakeには「Snowpipe」という機能があります。ステージというオブジェクトに置かれたファイルを継続的にテーブルにロードされるようにできる、というものです。ここにストリームを加えることで、Snowpipeで送られてくるデータの流れをストリームで補足し続けることができます。Snowpipeの対象にはAmazon S3等も含まれているため、「S3に定期的にファイルを配置する」→「Snowpipeで継続的にロード」→「ストリームで変更を自動追跡」ということが簡単にできます。

さらに、ここに「タスク」という機能も加えることができます。簡単にいうと、事前に定義した処理(SQLやストアド)をスケジュール実行させるというものです。先程「ストリームのデータを消費して処理する」ということをやりましたが、こういった処理をタスク化することができます。つまり、「S3に定期的にファイルを配置する」→「Snowpipeで継続的にロード」→「ストリームで変更を自動追跡」→「ストリームデータを利用したデータ前処理をタスク化」という流れができます。

要するに、いわゆる「データパイプライン」という一連の処理を、Snowflakeだけで簡単に構築できてしまいます。例えば「センサーから吐き出されるログファイル→BIツール用のデータマート」という定期的な処理を、Snowflake上で実行させ続けることができます。非常に強力な機能群だと思います。

ストリームその他もろもろ

APPEND_ONLYオプション

ストリームに対して、こちらのオプションをTRUEとすると、そのストリームの記録対象を「データの挿入のみ」にすることができます(更新や削除は追わない)。データの性質や処理内容に合わせて使い分けたいですね。

おわりに

Snowflakeのストリームを体感しました。Snowpipeやタスクとの組み合わせは本当に凄いと思うので、そちらもブログにしたいですね。