[新機能]対象のテーブルに新しいレコードが追加されたときに指定した条件式を実行してアラート出来るようになったのでタスクとDynamic Tableのエラー通知を行ってみた

[新機能]対象のテーブルに新しいレコードが追加されたときに指定した条件式を実行してアラート出来るようになったのでタスクとDynamic Tableのエラー通知を行ってみた

Clock Icon2025.03.23

さがらです。

Snowflakeの新機能として、対象のテーブルに新しいレコードが追加されたときに指定した条件式を実行してアラート出来るようになりました。(2025年3月23日時点、プレビュー機能です。)

https://docs.snowflake.com/en/release-notes/2025/other/2025-03-19-alerts-on-new-data

https://docs.snowflake.com/en/user-guide/alerts#creating-an-alert-on-new-data

この機能を使って、タスクとDynamic Tableのエラー通知を行ってみたのでその内容をまとめてみます。

イベントテーブルの設定

事前準備:テスト用のデータベース・スキーマ・テーブルの作成

まず、検証用のデータベース・スキーマ・テーブルを作成します。

-- 各層のデータベース作成
use role sysadmin;
create or replace database raw_db;
create or replace database stg_db;
create or replace database mart_db;

-- 各層のスキーマ作成
use role sysadmin;
create or replace schema raw_db.sales;
create or replace schema stg_db.sales;
create or replace schema mart_db.sales;

-- raw層でテーブルを1つ適当に作成
use role sysadmin;
create or replace table raw_db.sales.orders (
    order_id varchar(50),
    customer_id varchar(50),
    order_date timestamp_ntz,
    product_id varchar(50),
    quantity integer,
    price decimal(18,2),
    load_timestamp timestamp_ntz default current_timestamp()
);

insert into raw_db.sales.orders (order_id, customer_id, order_date, product_id, quantity, price)
values
 ('ord-001', 'cust-100', current_timestamp(), 'prod-a', 5, 19.99);

イベントテーブルの作成

次に、イベントテーブルを作成します。

-- イベントテーブル作成
use role sysadmin;
create or replace database event_db;
create or replace schema event_db.sales_dept;
create or replace event table event_db.sales_dept.datapipeline_events;

作成したイベントテーブルを各データベースに紐づけ

次に、作成したイベントテーブルを各データベースに紐づけます。

イベントテーブルを紐づける際は、以下の仕様となっています。

  • この紐づけ設定はACCOUNTADMINが必要(2025年3月23日時点、対応するアカウントレベルの権限がないようです。)
  • イベントテーブルと紐づけ先のテーブルは別のデータベース・スキーマで問題ない
  • 1つのイベントテーブルを複数のデータベース・スキーマに紐づけることは可能

以下のブログも参考になると思います。

https://dev.classmethod.jp/articles/associate--event-table-with-database-snowflakedb/

改めて、作成したイベントテーブルを各データベースに紐づけるために以下のクエリを実行します。

-- 各層のデータベースをevent_db.sales_dept.datapipeline_eventsに紐づけ
use role accountadmin; 
alter database raw_db set event_table = event_db.sales_dept.datapipeline_events;
show parameters like 'event_table' in database raw_db;
alter database stg_db set event_table = event_db.sales_dept.datapipeline_events;
show parameters like 'event_table' in database stg_db;
alter database mart_db set event_table = event_db.sales_dept.datapipeline_events;
show parameters like 'event_table' in database mart_db;

各スキーマレベルでLog Levelを設定

イベントテーブルを紐づけただけではログが収集されないため、今回はスキーマレベルでLog Levelを設定します。

https://docs.snowflake.com/ja/developer-guide/logging-tracing/telemetry-levels#label-telemetry-level-object-privileges

Log Levelの設定には、アカウントレベルのmodify log level権限が必要となるためご注意ください。

-- sysadminがLog Levelを変更できるように権限付与 ※実運用環境ではどう権限管理をするかご注意ください
use role accountadmin;
grant modify log level on account to role sysadmin;

-- 各スキーマレベルでLog Levelを設定
use role sysadmin;
alter schema raw_db.sales set log_level = error;
show parameters like '%log_level%' in schema raw_db.sales;
alter schema stg_db.sales set log_level = error;
show parameters like '%log_level%' in schema stg_db.sales;
alter schema mart_db.sales set log_level = error;
show parameters like '%log_level%' in schema mart_db.sales;

イベントテーブルでChange Trackingを有効化

次に、作成したイベントテーブルでChange Trackingを有効化します。

https://docs.snowflake.com/en/user-guide/streams-manage#label-enabling-change-tracking-views

-- イベントテーブルでChange Trackingを有効化
alter table event_db.sales_dept.datapipeline_events set change_tracking = true;

イベントテーブルにログが貯まるかを確認

次に、以下のタスクとDynamic Tableを定義するクエリを実行します。(Dynamic Tableのクエリはどちらも実行時にエラーとなりますが、エラーログは出力されるため今回の検証においては想定通りの挙動となります。)

-- raw層でエラーになるタスク作成
create or replace task raw_db.sales.error_orders_task
    warehouse = compute_wh
    schedule = 'using cron 0 */1 * * * utc'
as
-- 実行時にエラーが発生するクエリ(カラム数が明らかに足りない)
insert into raw_db.sales.orders (order_id, customer_id, order_date, product_id, quantity, price)
select 
    'ord-' || to_varchar(uniform(1000, 9999, random()));

-- タスクを有効化して手動実行
alter task raw_db.sales.error_orders_task resume;
execute task raw_db.sales.error_orders_task;

-- stg層でエラーになるdynamic table作成(データ型エラー)※実行するとエラーになります
create or replace dynamic table stg_db.sales.error_orders
target_lag = '1 minute'
warehouse = compute_wh
as
select
    order_id,
    customer_id,
    -- 意図的にデータ型エラーを発生させる(文字列を日付型に変換)
    'not-a-valid-date'::date as order_date,
    product_id,
    quantity,
    price,
    price * quantity as total_amount,
    current_timestamp() as processed_timestamp
from raw_db.sales.orders;

-- mart層でエラーになるdynamic table作成(ゼロ除算エラー)※実行するとエラーになります
create or replace dynamic table mart_db.sales.error_sales_summary
target_lag = '5 minutes'
warehouse = compute_wh
as
select
    product_id,
    sum(quantity) as total_quantity,
    sum(price) as total_price,
    -- 意図的にゼロ除算エラーを発生させる
    sum(price) / (sum(quantity) - sum(quantity)) as avg_price -- ゼロ除算
from raw_db.sales.orders
group by product_id;

この上で1分ほど待った上で以下のクエリを実行すると、タスクとDynamic Tableのエラーログが入ってきていることがわかります。

-- イベントテーブルの確認
select * from event_db.sales_dept.datapipeline_events;

2025-03-23_11h02_59

アラートの設定

事前準備:EメールのNotification Integrationの作成

今回はアラートの条件文に合致するときにEメールを送信して通知したいため、以下のクエリを実行してEメールのNotification Integrationを作成しておきます。

-- メール送信用のNotification Integration作成
use role accountadmin;
create or replace notification integration email_int
  type = email
  enabled = true;
grant usage on integration email_int to role sysadmin;

アラートの設定

以下のクエリを実行してアラートを設定します。

今回はwarehouseパラメータを入れていないため、サーバーレスのアラートになります。

create or replace alert event_db.sales_dept.datapipeline_error_alert
  if (exists(
    select * from event_db.sales_dept.datapipeline_events
    where 
      record:severity_text = 'ERROR' and
      record_type = 'EVENT' and
      value:state = 'FAILED'
  ))
  then
    begin
      let error_details varchar;

      select array_to_string(array_agg(error_info), chr(10) || chr(10)) into :error_details
      from (
        select 
          'Error in ' || resource_attributes:"snow.executable.type" || ': ' || 
          resource_attributes:"snow.executable.name" || 
          chr(10) || 'Database: ' || resource_attributes:"snow.database.name" ||
          chr(10) || 'Schema: ' || resource_attributes:"snow.schema.name" ||
          chr(10) || 'Error Message: ' || value:"message" ||
          chr(10) || 'State: ' || value:"state" ||
          chr(10) || 'Timestamp: ' || timestamp as error_info
        from table(result_scan(snowflake.alert.get_condition_query_uuid()))
        order by timestamp desc
        limit 10
      );

      call system$send_email(
        'email_int',
        'aaa@example.jp',
        'Data Pipeline Error Alert',
        'The following errors were detected in your data pipelines:' || chr(10) || chr(10) || :error_details
      );
    end;

アラートの有効化

作成したアラートは有効化する必要があるため、以下のクエリで実行します。

-- アラートの有効化
alter alert event_db.sales_dept.datapipeline_error_alert resume;

アラートを動かしてみた

先ほどエラーログを出力した際に用いた、以下のクエリをもう一度実行します。(Dynamic Tableのクエリはどちらも実行時にエラーとなりますが、エラーログは出力されるため今回の検証においては想定通りの挙動となります。)

-- raw層でエラーになるタスク作成
create or replace task raw_db.sales.error_orders_task
    warehouse = compute_wh
    schedule = 'using cron 0 */1 * * * utc'
as
-- 実行時にエラーが発生するクエリ(カラム数が明らかに足りない)
insert into raw_db.sales.orders (order_id, customer_id, order_date, product_id, quantity, price)
select 
    'ord-' || to_varchar(uniform(1000, 9999, random()));

-- タスクを有効化して手動実行
alter task raw_db.sales.error_orders_task resume;
execute task raw_db.sales.error_orders_task;

-- stg層でエラーになるdynamic table作成(データ型エラー)※実行するとエラーになります
create or replace dynamic table stg_db.sales.error_orders
target_lag = '1 minute'
warehouse = compute_wh
as
select
    order_id,
    customer_id,
    -- 意図的にデータ型エラーを発生させる(文字列を日付型に変換)
    'not-a-valid-date'::date as order_date,
    product_id,
    quantity,
    price,
    price * quantity as total_amount,
    current_timestamp() as processed_timestamp
from raw_db.sales.orders;

-- mart層でエラーになるdynamic table作成(ゼロ除算エラー)※実行するとエラーになります
create or replace dynamic table mart_db.sales.error_sales_summary
target_lag = '5 minutes'
warehouse = compute_wh
as
select
    product_id,
    sum(quantity) as total_quantity,
    sum(price) as total_price,
    -- 意図的にゼロ除算エラーを発生させる
    sum(price) / (sum(quantity) - sum(quantity)) as avg_price -- ゼロ除算
from raw_db.sales.orders
group by product_id;

このクエリを実行して約1分後、メールが下図のように届きました!3つのエラーを発生させていたので、3件分のエラーがまとまって届きました。

2025-03-23_11h52_21

サーバーレスアラートのコストを確認

今回、必ずエラーが起きるタスクraw_db.sales.error_orders_taskを1時間毎にスケジュールするようにしていたのですが、エラーログが登録されてアラートが起動した際のコストをSERVERLESS_ALERT_HISTORYテーブル関数で確認してみました。

https://docs.snowflake.com/en/sql-reference/functions/serverless_alert_history

select *
from table(information_schema.serverless_alert_history(
    date_range_start=>'2025-03-20 00:00:00.000 -0700',
    date_range_end=>'2025-03-30 23:59:59.999 -0700'))
order by start_time desc;

ざっくりと、1時間に1回エラーログが発行されるタスクのアラートで「0.0015~0.0040クレジット/h」が消費されていました。

2025-03-23_15h34_58

おまけ:アラートの実行記録も確認してみた

おまけですが、アラートの実行記録も以下のクエリで確認してみました。

https://docs.snowflake.com/ja/user-guide/alerts#label-alerts-history

select *
from
  table(information_schema.alert_history(
    scheduled_time_range_start
      =>dateadd('hour',-1,current_timestamp())))
order by scheduled_time desc;

下図のように記録されていました。TRIGGEREDとなっているレコード1つにつき、メールが1つ送信されていました。(いくつも実行ログがあったり、state列がaction_failedとなっているのは私がアラートの送信がうまくいかなかったため試行錯誤した結果です…)

2025-03-23_12h04_47

2025-03-23_12h05_15

2025-03-23_12h06_00

最後に

Snowflakeの新機能として、対象のテーブルに新しいレコードが追加されたときに指定した条件式を実行してアラート出来るようになったので、タスクとDynamic Tableのエラー通知を行ってみました。

これまで、タスクはNotification Integrationと連携して簡単にエラー通知が出来ましたが、Dynamic Tableの通知はこのブログのようなストアドプロシージャとタスクの設定をするしかなかったため、今回の新機能を用いることで簡単に実用的な監視ができるようになったのがとても嬉しいです!

また、アラートはこれまでユーザー側で定義したクエリを定期的に実行して条件に合致した場合に指定したクエリを実行する仕様であったためウェアハウスのリソースもかなり消費する仕様でしたが、今回の新機能により「新しいレコードが追加されたときにサーバーレスのリソースを使用するだけ」が可能となりましたので、かなりお財布にも優しい仕様になりました。

とても汎用性が高い機能だと思いますので、ぜひ活用してみてください!

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.