ストリームを参照するタスクで当日の変更履歴がある場合に処理を実行する #SnowflakeDB

ストリームを参照するタスクで当日の変更履歴がある場合に処理を実行する #SnowflakeDB

Clock Icon2025.03.29

はじめに

Snowflake のタスクとストリームを使用したデータパイプライン構築時に、あるタスクが参照するストリームにタスク実行日の変更履歴がある場合に処理を実行したいことがあったので、本記事で対応案をまとめてみます。

背景

よく用いられるパターンであればストリームに変更履歴があるかどうか、がタスク実行のトリガーとなると思います。

この場合は、システム関数である SYSTEM$STREAM_HAS_DATA を使用することで、タスク内の WHEN 句で履歴があるかどうかで処理を実行するか判定できます。WHEN 句での判定にはクラウドコンピューティングリソースが使われるので、ユーザー管理の仮想ウェアハウスが起動することもありません。なので、タスク用にサイズの大きなウェアハウスを割り当てていても、大きな課金が発生することを防ぐことができます。

https://docs.snowflake.com/ja/sql-reference/sql/create-task#optional-parameters

あるパターンとして、ソーステーブルは日次などで更新されるが、ソーステーブルのストリームを参照する後続のマートテーブル更新用タスクは週次などで実行されるようなケースを想定します。

また、この時マートテーブル更新用タスク実行時は、当日の履歴データが含まれることを条件にしたい場面があるとすると、単純にストリーム履歴の有無を実行の条件に使用することはできません(当日の更新がエラーとなり、当日の変更履歴がなくとも、前日など過去の変更履歴があれば処理が実行される)。

そこで上記のようなケースでどういった対応がとれそうか案を記載してみます。

ソーステーブルに更新日時を表すカラムがある場合

この場合、タスクではストアドプロシージャを実行することとし、プロシージャ内でストリームを参照し以下のような仕組みで当日のレコードがあるかを確認することで処理を実行、または例外としてエラーを発生させるなどの処理が可能です。

条件判定のためであっても、参照のみであればストリームが消費されることはありません。

SELECT EXISTS (
    SELECT 1 
    FROM orders_stream --ストリームを参照
    WHERE DATE(updated_at) = CURRENT_DATE --タスク実行日と合致する更新フラグがあればtrue
);

ソーステーブルに更新日時を表すカラムがない場合

このような情報を取得するできるカラムが無い場合や、ロード時点でタイムスタンプを付与するなどの仕組みが無い(使えない)場合は、別の仕組みが必要です。例えば日次でソーステーブルがタスクにより更新されるのであれば、タスク実行履歴からエラーの場合、後続のタスクでもエラーとする(ストリームを使用する更新処理を行わない)などもできると思います。

簡単なのはタスク実行前に最後の更新履歴を取得し、それを利用することと思います。Snowflake では SYSTEM$STREAM_BACKLOG という関数があるので、以降ではこちらを試してみます。

SYSTEM$STREAM_BACKLOG

公式ドキュメントによれば、「指定されたストリームの現在のオフセットと現在のタイムスタンプの間におけるテーブルバージョンのセットを返す」関数です。

各テーブルバージョンとして変更データキャプチャ(CDC)記録の推定数と、テーブルバージョンに関連付けられた DML 操作(INSERT、 UPDATE、 DELETE、 TRUNCATE)を返してくれます。

あるストリームに対してこの関数を使用すると以下のような結果を返します。

>SELECT * FROM TABLE(SYSTEM$STREAM_BACKLOG('orders_stream'));
+-----------------------+-------------------------------+----------+------------+--------------+---------------------------+
| BASE_TABLE_NAME       | STREAM_OFFSET_TIME            | DML_MODE | ROWS_ADDED | ROWS_REMOVED | ESTIMATED_ROWS_TO_PROCESS |
|-----------------------+-------------------------------+----------+------------+--------------+---------------------------|
| TEST_DB.PUBLIC.ORDERS | 2025-03-25 21:05:01.323 +0900 | INSERT   |          4 |            3 |                         1 |
| TEST_DB.PUBLIC.ORDERS | 2025-03-25 21:05:02.208 +0900 | INSERT   |          5 |            4 |                         1 |
| TEST_DB.PUBLIC.ORDERS | 2025-03-26 09:11:40.261 +0900 | INSERT   |          6 |            5 |                         1 |
| TEST_DB.PUBLIC.ORDERS | 2025-03-26 22:06:16.410 +0900 | UPDATE   |          6 |            6 |                         4 |
| TEST_DB.PUBLIC.ORDERS | 2025-03-27 09:09:26.385 +0900 | INSERT   |          7 |            6 |                         1 |
| TEST_DB.PUBLIC.ORDERS | 2025-03-28 19:00:29.493 +0900 | INSERT   |          8 |            7 |                         1 |
| TEST_DB.PUBLIC.ORDERS | 2025-03-29 11:33:54.623 +0900 | UPDATE   |          8 |            8 |                         5 |
| TEST_DB.PUBLIC.ORDERS | 2025-03-29 11:36:09.168 +0900 | UPDATE   |          8 |            8 |                         5 |
| TEST_DB.PUBLIC.ORDERS | 2025-03-29 11:36:43.984 +0900 | INSERT   |          9 |            8 |                         1 |
| TEST_DB.PUBLIC.ORDERS | 2025-03-29 11:36:51.662 +0900 | INSERT   |         10 |            9 |                         1 |
+-----------------------+-------------------------------+----------+------------+--------------+---------------------------+

この例では 3/25-3/29 の間にソーステーブルに DML 操作を実行しており、ストリームはそれらをすべて追跡しています。ストリーム自体はまだ消費されていないため、変更履歴が「溜まっている」状態です。

ポイントとして出力には [STREAM_OFFSET_TIME] 列が含まれます。

これは SYSTEM$STREAM_BACKLOG の出力における各レコードの変更が、どのオフセット(=追跡の開始時点)で記録されたかを示すタイムスタンプです。実際に値を見ると、ソーステーブルに対して各変更を加えたタイムスタンプが記録されています。

ちなみに Snowflake のストリームにおけるオフセットとは、ストリーム作成時に取得されるソーステーブルのトランザクションバージョン(論理スナップショット)を指し、それ以降に発生した DML 操作(INSERT、UPDATE、DELETE)を追跡するための起点となる情報です。

https://docs.snowflake.com/ja/user-guide/streams-intro#label-streams-introduction-offset

以降ではこの値を基準にストリームに当日の変更履歴があるかを判定してみます。

SYSTEM$STREAM_BACKLOG をタスクで使用する際の考慮事項

タスクでこのシステム関数を使用するには以下に注意します。

この点に注意して SYSTEM$STREAM_BACKLOG を条件判定に使用するタスクを定義してみます。

タスクを定義

事前準備

以下のオブジェクトを作成しておきます。

  • 検証用のデータベース
  • ソーステーブル
  • ソーステーブルを参照するストリーム
  • ストリームを消費するための宛先テーブル
CREATE DATABASE IF NOT EXISTS test_db;
--ソーステーブルを作成
CREATE TABLE IF NOT EXISTS orders(
    order_id INT AUTOINCREMENT PRIMARY KEY,
    order_date DATE,
    amount DECIMAL(10,2)
);
--ストリームを作成
CREATE STREAM orders_stream ON TABLE orders;

--ストリーム消費用に変更履歴を格納するテーブルを作成
CREATE OR REPLACE TABLE orders_history (
    history_id INT AUTOINCREMENT PRIMARY KEY,  
    order_id INT,  
    order_date DATE, 
    amount DECIMAL(10,2),  
    action_type STRING,  
    is_update BOOLEAN,   
    processed_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP  -- データ処理時刻
);

--復元用にストリームのクローンを作成しておく
CREATE STREAM orders_stream_clone CLONE orders_stream;

ストリームの現在の状態を確認

事前にソーステーブルには数日分の変更を加えたので現在のストリームを参照すると以下のような出力です。

※order_date に変更履歴より前の日(3/24)がありますが、検証用に前日のレコードとして挿入したものです

--ストリームを確認:3/25-3/29に変更を加えたので履歴データがある
>SELECT * FROM orders_stream;
+----------+------------+---------+-----------------+-------------------+------------------------------------------+
| ORDER_ID | ORDER_DATE |  AMOUNT | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID                          |
|----------+------------+---------+-----------------+-------------------+------------------------------------------|
|        2 | 2025-03-25 |  300.00 | INSERT          | True              | 9fae931e2d23c58362273e9c9a3ebd3786af5c1f |
|      201 | 2025-03-24 |  100.00 | INSERT          | False             | d139cdda52c358b97e70584a98807354a0f99955 |
|      202 | 2025-03-25 |  200.00 | INSERT          | False             | e98037c7765624d1877b472394f7c0b0eb4e4071 |
|      301 | 2025-03-25 |  200.00 | INSERT          | False             | 53a6dcdea070a0f8ea6b38a09320cb18d5408490 |
|      401 | 2025-03-27 | 1200.00 | INSERT          | False             | e35aed7323454b009416570019bb192ce11722b3 |
|      501 | 2025-03-28 |  800.00 | INSERT          | False             | baff525ffea5a0b282cad2b96438bebc33a81b63 |
|      601 | 2025-03-29 | 1200.00 | INSERT          | False             | c42695f114d923df413df09bf60942fc84c6b04f |
|      701 | 2025-03-29 | 1500.00 | INSERT          | False             | fddb63053107b98c8f8e7b124f2d008bec8adcf3 |
|        2 | 2025-03-25 |  200.00 | DELETE          | True              | 9fae931e2d23c58362273e9c9a3ebd3786af5c1f |
+----------+------------+---------+-----------------+-------------------+------------------------------------------+

--SYSTEM$STREAM_BACKLOG
>SELECT * FROM TABLE(SYSTEM$STREAM_BACKLOG('orders_stream'));
+-----------------------+-------------------------------+----------+------------+--------------+---------------------------+
| BASE_TABLE_NAME       | STREAM_OFFSET_TIME            | DML_MODE | ROWS_ADDED | ROWS_REMOVED | ESTIMATED_ROWS_TO_PROCESS |
|-----------------------+-------------------------------+----------+------------+--------------+---------------------------|
| TEST_DB.PUBLIC.ORDERS | 2025-03-25 21:05:01.323 +0900 | INSERT   |          4 |            3 |                         1 |
| TEST_DB.PUBLIC.ORDERS | 2025-03-25 21:05:02.208 +0900 | INSERT   |          5 |            4 |                         1 |
| TEST_DB.PUBLIC.ORDERS | 2025-03-26 09:11:40.261 +0900 | INSERT   |          6 |            5 |                         1 |
| TEST_DB.PUBLIC.ORDERS | 2025-03-26 22:06:16.410 +0900 | UPDATE   |          6 |            6 |                         4 |
| TEST_DB.PUBLIC.ORDERS | 2025-03-27 09:09:26.385 +0900 | INSERT   |          7 |            6 |                         1 |
| TEST_DB.PUBLIC.ORDERS | 2025-03-28 19:00:29.493 +0900 | INSERT   |          8 |            7 |                         1 |
| TEST_DB.PUBLIC.ORDERS | 2025-03-29 11:33:54.623 +0900 | UPDATE   |          8 |            8 |                         5 |
| TEST_DB.PUBLIC.ORDERS | 2025-03-29 11:36:09.168 +0900 | UPDATE   |          8 |            8 |                         5 |
| TEST_DB.PUBLIC.ORDERS | 2025-03-29 11:36:43.984 +0900 | INSERT   |          9 |            8 |                         1 |
| TEST_DB.PUBLIC.ORDERS | 2025-03-29 11:36:51.662 +0900 | INSERT   |         10 |            9 |                         1 |
+-----------------------+-------------------------------+----------+------------+--------------+---------------------------+

SYSTEM$STREAM_BACKLOG を使用し、当日の変更履歴があるか判定したい場合は、以下のようにクエリできます。

>SELECT EXISTS (
    SELECT 1 
    FROM TABLE(SYSTEM$STREAM_BACKLOG('orders_stream')) 
    WHERE DATE(STREAM_OFFSET_TIME) = CURRENT_DATE
) AS has_today_records;
+-------------------+
| HAS_TODAY_RECORDS |
|-------------------|
| True              |
+-------------------+

ストアドプロシージャ・タスクを定義

ここでは以下の通りストアドプロシージャとタスクを定義してみます。タスクそのものをわけていますが、一つのプロシージャ(タスク)で完結させることも可能です。

  • ルートタスク
    • タスク実行タイミングで当日の変更履歴があるか判定値を SYSTEM$SET_RETURN_VALUE で子タスクに渡すストアドプロシージャを実行
  • 子タスク

はじめにルートタスクとルートタスクで実行するストアドプロシージャを定義します。

--ストリームをチェックし当日(チェック実行日)のストリーム履歴があるか確認するストアドプロシージャを定義
--CALLERとして実行
CREATE OR REPLACE PROCEDURE check_today_records()
RETURNS STRING
LANGUAGE SQL
EXECUTE AS CALLER   
AS
$$
DECLARE 
    --SYSTEM$SET_RETURN_VALUEに与えるために文字列として定義
    has_today_data STRING;
BEGIN
    -- ストリームに当日のレコードがあるかを判定
    -- has_today_data には true か false が入る
    has_today_data :=(
        SELECT EXISTS (
            SELECT 1 
            FROM TABLE(SYSTEM$STREAM_BACKLOG('orders_stream')) 
            WHERE DATE(STREAM_OFFSET_TIME) = CURRENT_DATE 
        )
        );

    -- 結果を SYSTEM$SET_RETURN_VALUE で後続のタスクに返す
    has_today_data := (CALL SYSTEM$SET_RETURN_VALUE(:has_today_data));
RETURN has_today_data;
END;
$$;

--ルートタスクを定義:
CREATE OR REPLACE TASK my_root_task
WAREHOUSE = compute_wh
SCHEDULE = '1 hour'
AS 
call check_today_records();

ポイントは以下です。

  • ここで使用するシステム関数をプロシージャで使用する場合は、呼び出し元の権限でストアドプロシージャを定義する必要がある
  • SYSTEM$SET_RETURN_VALUE には文字列定数として値を渡す必要がある

続けて子タスクを定義します。

SYSTEM$GET_PREDECESSOR_RETURN_VALUE は WHEN 句で使用できます。真の場合はタスクが実行され、偽の場合、タスクはスキップされます。

--子タスクを定義:ルートタスクから渡された値で処理を実行するか判定
CREATE OR REPLACE TASK my_predecessor_task
WAREHOUSE = compute_wh
AFTER my_root_task
WHEN SYSTEM$GET_PREDECESSOR_RETURN_VALUE()::BOOLEAN
AS 
--true:当時のストリームレコードがある場合はストリームを消費するDMLを実行
INSERT INTO orders_history (
    order_id, order_date, amount, action_type, is_update, processed_at
)
SELECT 
    order_id, 
    order_date, 
    amount, 
    METADATA$ACTION, 
    METADATA$ISUPDATE, 
    CURRENT_TIMESTAMP
FROM orders_stream;
;

タスクを実行

当日の更新履歴がある場合

子タスクを起動しタスクを手動実行します。

--動作確認
--子タスクを起動
ALTER TASK my_predecessor_task resume;

--ストリームを再確認:レコードは追加済みなので履歴データ・当日も追跡開始履歴がある
SELECT * FROM orders_stream;
SELECT * FROM TABLE(SYSTEM$STREAM_BACKLOG('orders_stream'));

--ターゲットテーブルを確認:空
SELECT * FROM orders_history;

--タスクを手動実行
EXECUTE TASK my_root_task;

タスクの実行履歴を確認します。この場合、タスク実行タイミングでソーステーブルの当日の更新履歴があるので、タスクはすべて成功します。

親タスクから渡される値は [RETURN_VALUE] で確認できます。

--実行履歴を確認:直近実行された上記2件を取得
--RETURN_VALUE:true
>SELECT 
    NAME
    ,STATE
    ,ERROR_MESSAGE
    ,RETURN_VALUE
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY())
WHERE DATABASE_NAME = 'TEST_DB'
ORDER BY SCHEDULED_TIME DESC
LIMIT 2;
+---------------------+-----------+---------------+--------------+
| NAME                | STATE     | ERROR_MESSAGE | RETURN_VALUE |
|---------------------+-----------+---------------+--------------|
| MY_PREDECESSOR_TASK | SUCCEEDED | NULL          | NULL         |
| MY_ROOT_TASK        | SUCCEEDED | NULL          | true         |
+---------------------+-----------+---------------+--------------+

ストリームを確認します。

ストリームをソースにターゲットテーブルにレコードを追加するので、ストリームは消費され空になります。ストリームが空になると、SYSTEM$STREAM_BACKLOG も空になります。

--ストリームを確認:消費され空になる
>SELECT * FROM orders_stream;
+----------+------------+--------+-----------------+-------------------+-----------------+
| ORDER_ID | ORDER_DATE | AMOUNT | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID |
|----------+------------+--------+-----------------+-------------------+-----------------|
+----------+------------+--------+-----------------+-------------------+-----------------+

--ストリームlogを確認:ストリームが空なのでこちらも空
>SELECT * FROM TABLE(SYSTEM$STREAM_BACKLOG('orders_stream'));
+-----------------+--------------------+----------+------------+--------------+---------------------------+
| BASE_TABLE_NAME | STREAM_OFFSET_TIME | DML_MODE | ROWS_ADDED | ROWS_REMOVED | ESTIMATED_ROWS_TO_PROCESS |
|-----------------+--------------------+----------+------------+--------------+---------------------------|
+-----------------+--------------------+----------+------------+--------------+---------------------------+

ターゲットテーブルにストリームからレコードが追加されています。

>SELECT * FROM orders_history;
+------------+----------+------------+---------+-------------+-----------+-------------------------+
| HISTORY_ID | ORDER_ID | ORDER_DATE |  AMOUNT | ACTION_TYPE | IS_UPDATE | PROCESSED_AT            |
|------------+----------+------------+---------+-------------+-----------+-------------------------|
|          1 |        2 | 2025-03-25 |  300.00 | INSERT      | True      | 2025-03-29 14:27:11.612 |
|          2 |      201 | 2025-03-24 |  100.00 | INSERT      | False     | 2025-03-29 14:27:11.612 |
|          3 |      202 | 2025-03-25 |  200.00 | INSERT      | False     | 2025-03-29 14:27:11.612 |
|          4 |      301 | 2025-03-25 |  200.00 | INSERT      | False     | 2025-03-29 14:27:11.612 |
|          5 |      401 | 2025-03-27 | 1200.00 | INSERT      | False     | 2025-03-29 14:27:11.612 |
|          6 |      501 | 2025-03-28 |  800.00 | INSERT      | False     | 2025-03-29 14:27:11.612 |
|          7 |      601 | 2025-03-29 | 1200.00 | INSERT      | False     | 2025-03-29 14:27:11.612 |
|          8 |      701 | 2025-03-29 | 1500.00 | INSERT      | False     | 2025-03-29 14:27:11.612 |
|          9 |        2 | 2025-03-25 |  200.00 | DELETE      | True      | 2025-03-29 14:27:11.612 |
+------------+----------+------------+---------+-------------+-----------+-------------------------+

当日の更新履歴がない場合

次にストリームに履歴データはあるが、タスク実行タイミング当日の履歴データがない設定でタスクを実行します。

はじめに消費したストリームを復元しておきます。

--ストリームを復元
DROP stream orders_stream;
CREATE stream orders_stream CLONE orders_stream_clone;

--ストリームを確認
SELECT * FROM orders_stream;
>SELECT * FROM TABLE(SYSTEM$STREAM_BACKLOG('orders_stream'));
+-----------------------+-------------------------------+----------+------------+--------------+---------------------------+
| BASE_TABLE_NAME       | STREAM_OFFSET_TIME            | DML_MODE | ROWS_ADDED | ROWS_REMOVED | ESTIMATED_ROWS_TO_PROCESS |
|-----------------------+-------------------------------+----------+------------+--------------+---------------------------|
| TEST_DB.PUBLIC.ORDERS | 2025-03-25 21:05:01.323 +0900 | INSERT   |          4 |            3 |                         1 |
| TEST_DB.PUBLIC.ORDERS | 2025-03-25 21:05:02.208 +0900 | INSERT   |          5 |            4 |                         1 |
| TEST_DB.PUBLIC.ORDERS | 2025-03-26 09:11:40.261 +0900 | INSERT   |          6 |            5 |                         1 |
| TEST_DB.PUBLIC.ORDERS | 2025-03-26 22:06:16.410 +0900 | UPDATE   |          6 |            6 |                         4 |
| TEST_DB.PUBLIC.ORDERS | 2025-03-27 09:09:26.385 +0900 | INSERT   |          7 |            6 |                         1 |
| TEST_DB.PUBLIC.ORDERS | 2025-03-28 19:00:29.493 +0900 | INSERT   |          8 |            7 |                         1 |
| TEST_DB.PUBLIC.ORDERS | 2025-03-29 11:33:54.623 +0900 | UPDATE   |          8 |            8 |                         5 |
| TEST_DB.PUBLIC.ORDERS | 2025-03-29 11:36:09.168 +0900 | UPDATE   |          8 |            8 |                         5 |
| TEST_DB.PUBLIC.ORDERS | 2025-03-29 11:36:43.984 +0900 | INSERT   |          9 |            8 |                         1 |
| TEST_DB.PUBLIC.ORDERS | 2025-03-29 11:36:51.662 +0900 | INSERT   |         10 |            9 |                         1 |
+-----------------------+-------------------------------+----------+------------+--------------+---------------------------+

ここでは検証のため、ストアドプロシージャを翌日の変更履歴があるかを判定するように変更します。

CREATE OR REPLACE PROCEDURE check_today_records()
RETURNS STRING
LANGUAGE SQL
EXECUTE AS CALLER
AS
$$
DECLARE 
    --SYSTEM$SET_RETURN_VALUEに与えるために文字列として定義
    has_today_data STRING;
BEGIN
    -- ストリームに当日のレコードがあるかを判定
    -- has_today_data には true か false が入る
    has_today_data :=(
        SELECT EXISTS (
            SELECT 1 
            FROM TABLE(SYSTEM$STREAM_BACKLOG('orders_stream')) 
            WHERE DATE(STREAM_OFFSET_TIME) = CURRENT_DATE + 1 --翌日の変更履歴があるか確認するように変更
        )
        );

    -- 結果を SYSTEM$SET_RETURN_VALUE で後続のタスクに返す
    has_today_data := (CALL SYSTEM$SET_RETURN_VALUE(:has_today_data));
RETURN has_today_data;
END;
$$;

この状態でタスクを実行します。

--タスクを手動実行
EXECUTE TASK my_root_task;

実行履歴を確認します。当然、翌日の更新履歴はないので、条件判定の結果として false が後続のタスクに渡され、処理がスキップされています。

--実行履歴を確認
--後続のタスクはskipされる
>SELECT 
    NAME
    ,STATE
    ,ERROR_MESSAGE
    ,RETURN_VALUE
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY())
WHERE DATABASE_NAME = 'TEST_DB'
ORDER BY SCHEDULED_TIME DESC
LIMIT 2;
+---------------------+-----------+-----------------------------------------------------+--------------+
| NAME                | STATE     | ERROR_MESSAGE                                       | RETURN_VALUE |
|---------------------+-----------+-----------------------------------------------------+--------------|
| MY_PREDECESSOR_TASK | SKIPPED   | Conditional expression for task evaluated to false. | NULL         |
| MY_ROOT_TASK        | SUCCEEDED | NULL                                                | false        |
+---------------------+-----------+-----------------------------------------------------+--------------+

当然、ストリームも消費されません。

--ストリームを確認:消費されないので履歴は存在
>SELECT * FROM orders_stream;
+----------+------------+---------+-----------------+-------------------+------------------------------------------+
| ORDER_ID | ORDER_DATE |  AMOUNT | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID                          |
|----------+------------+---------+-----------------+-------------------+------------------------------------------|
|        2 | 2025-03-25 |  300.00 | INSERT          | True              | 9fae931e2d23c58362273e9c9a3ebd3786af5c1f |
|      201 | 2025-03-24 |  100.00 | INSERT          | False             | d139cdda52c358b97e70584a98807354a0f99955 |
|      202 | 2025-03-25 |  200.00 | INSERT          | False             | e98037c7765624d1877b472394f7c0b0eb4e4071 |
|      301 | 2025-03-25 |  200.00 | INSERT          | False             | 53a6dcdea070a0f8ea6b38a09320cb18d5408490 |
|      401 | 2025-03-27 | 1200.00 | INSERT          | False             | e35aed7323454b009416570019bb192ce11722b3 |
|      501 | 2025-03-28 |  800.00 | INSERT          | False             | baff525ffea5a0b282cad2b96438bebc33a81b63 |
|      601 | 2025-03-29 | 1200.00 | INSERT          | False             | c42695f114d923df413df09bf60942fc84c6b04f |
|      701 | 2025-03-29 | 1500.00 | INSERT          | False             | fddb63053107b98c8f8e7b124f2d008bec8adcf3 |
|        2 | 2025-03-25 |  200.00 | DELETE          | True              | 9fae931e2d23c58362273e9c9a3ebd3786af5c1f |
+----------+------------+---------+-----------------+-------------------+------------------------------------------+

注意点として、この構成の場合、子タスクからリトライすることはできません(スキップとなり、エラーやキャンセルではないため)。
この場合、子タスク側でも当日のレコードがない場合、例外などエラーを発生する処理とする必要があります。

>EXECUTE TASK my_root_task RETRY LAST;
091456 (55000): Cannot perform retry: run (RUN_ID = 1,743,227,184,727, attempt = 0) of graph with root task MY_ROOT_TASK had no failures.

さいごに

ストリームを参照するタイミングで、当日の変更履歴がある場合にタスクを実行する手順を検証してみました。
こちらの内容が何かの参考になれば幸いです。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.