Snowflakeで共有されたテーブルの変更を追跡して別テーブルを自動更新する

SnowflakeのDirect Share機能で共有されたテーブルに対し、変更追跡を行うためのストリームオブジェクトを設定し、変更履歴をもとに別テーブルを自動で更新する処理を検証しました。

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

DA事業本部のnkhrです。Snowflakeではアカウント間でデータ共有ができます。本ブログでは、共有されたテーブルに対して変更追跡を行うためのストリームオブジェクトを設定し、変更履歴をもとに別テーブルを自動で更新する処理を検証しました。

Snowflakeでデータ共有する方法は3種類(Direct Share、Marketplace、Data Exchange)あります。本ブログで検証するデータ共有はDirect Shareです。

データ共有では、提供元をProviderアカウント、提供先をConsumerアカウントと呼びます。共有されたデータベースの実体はProviderアカウントにあります。Consumer側では、共有されたデータベース内にオブジェクトを作成したり、共有オブジェクトを修正することはできません。

今回の検証では、以下の図のイメージを実装します。

Providerアカウントの共有設定

共有したテーブルの変更履歴(STREAM)をConsumer側で追跡するためには共有するテーブルの「CHANGE_TRACKING」を「TRUE」にする必要があります。

共有設定の詳細は、以下のブログを参照してください。

Provider側では以下のオブジェクトを共有します。

  • Database: test_db
  • Schema: test
  • Table: share
  • Share object: test_s

実行コードは以下の通りです。

USE ROLE accountadmin;
GRANT CREATE SHARE ON ACCOUNT TO sysadmin;

USE ROLE SYSADMIN;
CREATE DATABASE test_db;
CREATE SCHEMA test;

CREATE table test_db.test.share(id INTEGER, name VARCHAR(20), class_number INTEGER);
INSERT INTO test.share VALUES (1, 'Juddy', 3), (2, 'Bob', 3), (3, 'Sam', 5);

ALTER TABLE test.SHARE SET CHANGE_TRACKING = TRUE;
CREATE OR REPLACE SHARE test_share;
GRANT USAGE ON DATABASE test_db TO SHARE test_share;
GRANT USAGE ON SCHEMA test_db.test TO SHARE test_share;
GRANT SELECT ON TABLE test_db.test.share TO SHARE test_share;
ALTER SHARE test_share SET ACCOUNTS = <sonsumer_accounts>;

Consumer側アカウントの設定

Consumer側では以下の作業を行います。

  • Providerアカウントから共有されたShare Objectからデータベースを作成
  • 共有テーブルに対してSTREAMを作成
  • STREAMの変更をもとにConsumer側のテーブルを更新するTASKを作成

ストリームとタスクを利用したデータパイプラインの詳細については、下記のブログが参考になります。

ProviderアカウントのShare Objectからデータベース作成

Consumer側で共有データベースを参照するためには「IMPORTED PRIVILEGES」権限が必要です。共有データベースの利用権限はProvider側でShare Objectに設定されています。

USE ROLE ACCOUNTADMIN;
CREATE DATABASE shared_db FROM SHARE <provider account>.test_share;
GRANT IMPORTED PRIVILEGES ON DATABASE shared_db TO ROLE sysadmin;

共有データベースのテーブルに対してSTRAMを作成

共有されたデータベース内に、Consumer側でオブジェクトを作成できないため、STREAMはConsumer所有のデータベースに作成します。

Consumer側では、ユーザ一覧とユーザの変更日付を保持するuserlistテーブルを作成します。

USE ROLE SYSADMIN;
CREATE DATABASE c_db;
CREATE SCHEMA c_db.s_test;

CREATE TABLE c_db.s_test.userlist(id, name, updated_at)
  AS SELECT id, name current_timestamp(0) FROM shared_db.test.share;
CREATE STREAM c_db.s_test.mystream ON TABLE shared_db.test.share;

変更履歴からテーブルを更新するタスク作成

2021/12時点では、タスクの実行方法はスケジュール実行のみが提供されています。タスクの実行中に次のスケジュールの実行時間となった場合は、次の実行はスキップされます。TASK作成のWHEN句に「SYSTEM$STREAM_HAS_DATA(<stream name>)」を指定することで、対象のストリーム内にレコードが存在しない場合はタスク実行をスキップしてくれます。

スケジュールの指定は「X分ごとの実行」または「Cron式による実行」が指定できます。CREATE TASKの指定パラメータについてはリンク先をご参照ください。

2021/12時点では、アカウント内でStarted状態(実行中)のタスクは上限10000です(Suspend状態の場合はこのカウントに含まれません)

CREATE OR REPLACE TASK c_db.s_test.update_userlist
WAREHOUSE = TEST_WH
SCHEDULE = '1 minute'
ALLOW_OVERLAPPING_EXECUTION = FALSE
USER_TASK_TIMEOUT_MS = 600000
WHEN
SYSTEM$STREAM_HAS_DATA('c_db.s_test.mystream')
AS
MERGE INTO c_db.s_test.userlist u
// ストリームに同じid, nameのレコードが2レコード存在するのは
// class_numberカラムのみをUpdateした場合のため利用しないレコードとみなす
USING (
SELECT id, name, METADATA$ACTION AS operation_type
FROM c_db.s_test.mystream s1
WHERE NOT EXISTS (
    SELECT 1
    FROM c_db.s_test.mystream s2
    WHERE s1.id = s2.id AND s1.name = s2.name
    GROUP BY id, name
    HAVING COUNT(*) > 1
)) s
ON u.id = s.id AND u.name = s.name
WHEN MATCHED AND s.operation_type = 'DELETE' THEN DELETE
WHEN NOT MATCHED AND s.operation_type = 'INSERT'
THEN INSERT (id, name, updated_at) VALUES (s.id, s.name,  current_timestamp(0))
;

ALTER TASK IF EXISTS c_db.s_test.update_userlist RESUME;

動作確認

  • Provider側での実行(2分ごとに各行を実行)
// ①追加
INSERT INTO test.share VALUES (4, 'Ken', 3), (5, 'Tonny', 5);

// ②削除
DELETE FROM test.share WHERE id = 1;

// ③更新
UPDATE test.share SET id = 1 WHERE id = 5;

// ④classnumberカラムの更新
UPDATE test.share SET class_number = 10 WHERE id = 2;
  • consumer側での実行(Provider側で実行後に1分経過以降)
SELECT * FROM c_db.s_test.userlist;

①の結果(既存3行+追加2行=5行)

②の結果(1行削除=>4行、id = 2, 3, 4, 5)

③の結果(更新⇒4行、id = 1, 2, 3, 4)

④の結果(変化なし、id = 1, 2, 3, 4)

まとめ

共有されたテーブルの変更履歴から自動で、Consumer側のテーブルを更新する方法を検証しました。SnowflakeではOrganizationやReader Account機能により、複数アカウントの作成が簡単にできるため、アカウント間でのデータ共有をうまく利用することで、データの信頼性や処理効率化が図れるようになったら良いですね。

以上、@nkhrでした。