この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
DA事業本部のnkhrです。Snowflakeではアカウント間でデータ共有ができます。本ブログでは、共有されたテーブルに対して変更追跡を行うためのストリームオブジェクトを設定し、変更履歴をもとに別テーブルを自動で更新する処理を検証しました。
Snowflakeでデータ共有する方法は3種類(Direct Share、Marketplace、Data Exchange)あります。本ブログで検証するデータ共有はDirect Shareです。
データ共有では、提供元をProviderアカウント、提供先をConsumerアカウントと呼びます。共有されたデータベースの実体はProviderアカウントにあります。Consumer側では、共有されたデータベース内にオブジェクトを作成したり、共有オブジェクトを修正することはできません。
今回の検証では、以下の図のイメージを実装します。
Providerアカウントの共有設定
共有したテーブルの変更履歴(STREAM)をConsumer側で追跡するためには共有するテーブルの「CHANGE_TRACKING」を「TRUE」にする必要があります。
共有設定の詳細は、以下のブログを参照してください。
Provider側では以下のオブジェクトを共有します。
- Database: test_db
- Schema: test
- Table: share
- Share object: test_s
実行コードは以下の通りです。
USE ROLE accountadmin;
GRANT CREATE SHARE ON ACCOUNT TO sysadmin;
USE ROLE SYSADMIN;
CREATE DATABASE test_db;
CREATE SCHEMA test;
CREATE table test_db.test.share(id INTEGER, name VARCHAR(20), class_number INTEGER);
INSERT INTO test.share VALUES (1, 'Juddy', 3), (2, 'Bob', 3), (3, 'Sam', 5);
ALTER TABLE test.SHARE SET CHANGE_TRACKING = TRUE;
CREATE OR REPLACE SHARE test_share;
GRANT USAGE ON DATABASE test_db TO SHARE test_share;
GRANT USAGE ON SCHEMA test_db.test TO SHARE test_share;
GRANT SELECT ON TABLE test_db.test.share TO SHARE test_share;
ALTER SHARE test_share SET ACCOUNTS = <sonsumer_accounts>;
Consumer側アカウントの設定
Consumer側では以下の作業を行います。
- Providerアカウントから共有されたShare Objectからデータベースを作成
- 共有テーブルに対してSTREAMを作成
- STREAMの変更をもとにConsumer側のテーブルを更新するTASKを作成
ストリームとタスクを利用したデータパイプラインの詳細については、下記のブログが参考になります。
ProviderアカウントのShare Objectからデータベース作成
Consumer側で共有データベースを参照するためには「IMPORTED PRIVILEGES」権限が必要です。共有データベースの利用権限はProvider側でShare Objectに設定されています。
USE ROLE ACCOUNTADMIN;
CREATE DATABASE shared_db FROM SHARE <provider account>.test_share;
GRANT IMPORTED PRIVILEGES ON DATABASE shared_db TO ROLE sysadmin;
共有データベースのテーブルに対してSTRAMを作成
共有されたデータベース内に、Consumer側でオブジェクトを作成できないため、STREAMはConsumer所有のデータベースに作成します。
Consumer側では、ユーザ一覧とユーザの変更日付を保持するuserlistテーブルを作成します。
USE ROLE SYSADMIN;
CREATE DATABASE c_db;
CREATE SCHEMA c_db.s_test;
CREATE TABLE c_db.s_test.userlist(id, name, updated_at)
AS SELECT id, name current_timestamp(0) FROM shared_db.test.share;
CREATE STREAM c_db.s_test.mystream ON TABLE shared_db.test.share;
変更履歴からテーブルを更新するタスク作成
2021/12時点では、タスクの実行方法はスケジュール実行のみが提供されています。タスクの実行中に次のスケジュールの実行時間となった場合は、次の実行はスキップされます。TASK作成のWHEN句に「SYSTEM$STREAM_HAS_DATA(<stream name>)」
を指定することで、対象のストリーム内にレコードが存在しない場合はタスク実行をスキップしてくれます。
スケジュールの指定は「X分ごとの実行」または「Cron式による実行」が指定できます。CREATE TASKの指定パラメータについてはリンク先をご参照ください。
2021/12時点では、アカウント内でStarted状態(実行中)のタスクは上限10000です(Suspend状態の場合はこのカウントに含まれません)
CREATE OR REPLACE TASK c_db.s_test.update_userlist
WAREHOUSE = TEST_WH
SCHEDULE = '1 minute'
ALLOW_OVERLAPPING_EXECUTION = FALSE
USER_TASK_TIMEOUT_MS = 600000
WHEN
SYSTEM$STREAM_HAS_DATA('c_db.s_test.mystream')
AS
MERGE INTO c_db.s_test.userlist u
// ストリームに同じid, nameのレコードが2レコード存在するのは
// class_numberカラムのみをUpdateした場合のため利用しないレコードとみなす
USING (
SELECT id, name, METADATA$ACTION AS operation_type
FROM c_db.s_test.mystream s1
WHERE NOT EXISTS (
SELECT 1
FROM c_db.s_test.mystream s2
WHERE s1.id = s2.id AND s1.name = s2.name
GROUP BY id, name
HAVING COUNT(*) > 1
)) s
ON u.id = s.id AND u.name = s.name
WHEN MATCHED AND s.operation_type = 'DELETE' THEN DELETE
WHEN NOT MATCHED AND s.operation_type = 'INSERT'
THEN INSERT (id, name, updated_at) VALUES (s.id, s.name, current_timestamp(0))
;
ALTER TASK IF EXISTS c_db.s_test.update_userlist RESUME;
動作確認
- Provider側での実行(2分ごとに各行を実行)
// ①追加
INSERT INTO test.share VALUES (4, 'Ken', 3), (5, 'Tonny', 5);
// ②削除
DELETE FROM test.share WHERE id = 1;
// ③更新
UPDATE test.share SET id = 1 WHERE id = 5;
// ④classnumberカラムの更新
UPDATE test.share SET class_number = 10 WHERE id = 2;
- consumer側での実行(Provider側で実行後に1分経過以降)
SELECT * FROM c_db.s_test.userlist;
①の結果(既存3行+追加2行=5行)
②の結果(1行削除=>4行、id = 2, 3, 4, 5)
③の結果(更新⇒4行、id = 1, 2, 3, 4)
④の結果(変化なし、id = 1, 2, 3, 4)
まとめ
共有されたテーブルの変更履歴から自動で、Consumer側のテーブルを更新する方法を検証しました。SnowflakeではOrganizationやReader Account機能により、複数アカウントの作成が簡単にできるため、アカウント間でのデータ共有をうまく利用することで、データの信頼性や処理効率化が図れるようになったら良いですね。
以上、@nkhrでした。