[新機能]対象のテーブルに新しいレコードが追加されたときに指定した条件式を実行してアラート出来るようになったのでタスクとDynamic Tableのエラー通知を行ってみた
さがらです。
Snowflakeの新機能として、対象のテーブルに新しいレコードが追加されたときに指定した条件式を実行してアラート出来るようになりました。(2025年3月23日時点、プレビュー機能です。)
この機能を使って、タスクとDynamic Tableのエラー通知を行ってみたのでその内容をまとめてみます。
イベントテーブルの設定
事前準備:テスト用のデータベース・スキーマ・テーブルの作成
まず、検証用のデータベース・スキーマ・テーブルを作成します。
-- 各層のデータベース作成
use role sysadmin;
create or replace database raw_db;
create or replace database stg_db;
create or replace database mart_db;
-- 各層のスキーマ作成
use role sysadmin;
create or replace schema raw_db.sales;
create or replace schema stg_db.sales;
create or replace schema mart_db.sales;
-- raw層でテーブルを1つ適当に作成
use role sysadmin;
create or replace table raw_db.sales.orders (
order_id varchar(50),
customer_id varchar(50),
order_date timestamp_ntz,
product_id varchar(50),
quantity integer,
price decimal(18,2),
load_timestamp timestamp_ntz default current_timestamp()
);
insert into raw_db.sales.orders (order_id, customer_id, order_date, product_id, quantity, price)
values
('ord-001', 'cust-100', current_timestamp(), 'prod-a', 5, 19.99);
イベントテーブルの作成
次に、イベントテーブルを作成します。
-- イベントテーブル作成
use role sysadmin;
create or replace database event_db;
create or replace schema event_db.sales_dept;
create or replace event table event_db.sales_dept.datapipeline_events;
作成したイベントテーブルを各データベースに紐づけ
次に、作成したイベントテーブルを各データベースに紐づけます。
イベントテーブルを紐づける際は、以下の仕様となっています。
- この紐づけ設定は
ACCOUNTADMIN
が必要(2025年3月23日時点、対応するアカウントレベルの権限がないようです。) - イベントテーブルと紐づけ先のテーブルは別のデータベース・スキーマで問題ない
- 1つのイベントテーブルを複数のデータベース・スキーマに紐づけることは可能
以下のブログも参考になると思います。
改めて、作成したイベントテーブルを各データベースに紐づけるために以下のクエリを実行します。
-- 各層のデータベースをevent_db.sales_dept.datapipeline_eventsに紐づけ
use role accountadmin;
alter database raw_db set event_table = event_db.sales_dept.datapipeline_events;
show parameters like 'event_table' in database raw_db;
alter database stg_db set event_table = event_db.sales_dept.datapipeline_events;
show parameters like 'event_table' in database stg_db;
alter database mart_db set event_table = event_db.sales_dept.datapipeline_events;
show parameters like 'event_table' in database mart_db;
各スキーマレベルでLog Levelを設定
イベントテーブルを紐づけただけではログが収集されないため、今回はスキーマレベルでLog Levelを設定します。
Log Levelの設定には、アカウントレベルのmodify log level
権限が必要となるためご注意ください。
-- sysadminがLog Levelを変更できるように権限付与 ※実運用環境ではどう権限管理をするかご注意ください
use role accountadmin;
grant modify log level on account to role sysadmin;
-- 各スキーマレベルでLog Levelを設定
use role sysadmin;
alter schema raw_db.sales set log_level = error;
show parameters like '%log_level%' in schema raw_db.sales;
alter schema stg_db.sales set log_level = error;
show parameters like '%log_level%' in schema stg_db.sales;
alter schema mart_db.sales set log_level = error;
show parameters like '%log_level%' in schema mart_db.sales;
イベントテーブルでChange Trackingを有効化
次に、作成したイベントテーブルでChange Trackingを有効化します。
-- イベントテーブルでChange Trackingを有効化
alter table event_db.sales_dept.datapipeline_events set change_tracking = true;
イベントテーブルにログが貯まるかを確認
次に、以下のタスクとDynamic Tableを定義するクエリを実行します。(Dynamic Tableのクエリはどちらも実行時にエラーとなりますが、エラーログは出力されるため今回の検証においては想定通りの挙動となります。)
-- raw層でエラーになるタスク作成
create or replace task raw_db.sales.error_orders_task
warehouse = compute_wh
schedule = 'using cron 0 */1 * * * utc'
as
-- 実行時にエラーが発生するクエリ(カラム数が明らかに足りない)
insert into raw_db.sales.orders (order_id, customer_id, order_date, product_id, quantity, price)
select
'ord-' || to_varchar(uniform(1000, 9999, random()));
-- タスクを有効化して手動実行
alter task raw_db.sales.error_orders_task resume;
execute task raw_db.sales.error_orders_task;
-- stg層でエラーになるdynamic table作成(データ型エラー)※実行するとエラーになります
create or replace dynamic table stg_db.sales.error_orders
target_lag = '1 minute'
warehouse = compute_wh
as
select
order_id,
customer_id,
-- 意図的にデータ型エラーを発生させる(文字列を日付型に変換)
'not-a-valid-date'::date as order_date,
product_id,
quantity,
price,
price * quantity as total_amount,
current_timestamp() as processed_timestamp
from raw_db.sales.orders;
-- mart層でエラーになるdynamic table作成(ゼロ除算エラー)※実行するとエラーになります
create or replace dynamic table mart_db.sales.error_sales_summary
target_lag = '5 minutes'
warehouse = compute_wh
as
select
product_id,
sum(quantity) as total_quantity,
sum(price) as total_price,
-- 意図的にゼロ除算エラーを発生させる
sum(price) / (sum(quantity) - sum(quantity)) as avg_price -- ゼロ除算
from raw_db.sales.orders
group by product_id;
この上で1分ほど待った上で以下のクエリを実行すると、タスクとDynamic Tableのエラーログが入ってきていることがわかります。
-- イベントテーブルの確認
select * from event_db.sales_dept.datapipeline_events;
アラートの設定
事前準備:EメールのNotification Integrationの作成
今回はアラートの条件文に合致するときにEメールを送信して通知したいため、以下のクエリを実行してEメールのNotification Integrationを作成しておきます。
-- メール送信用のNotification Integration作成
use role accountadmin;
create or replace notification integration email_int
type = email
enabled = true;
grant usage on integration email_int to role sysadmin;
アラートの設定
以下のクエリを実行してアラートを設定します。
今回はwarehouse
パラメータを入れていないため、サーバーレスのアラートになります。
create or replace alert event_db.sales_dept.datapipeline_error_alert
if (exists(
select * from event_db.sales_dept.datapipeline_events
where
record:severity_text = 'ERROR' and
record_type = 'EVENT' and
value:state = 'FAILED'
))
then
begin
let error_details varchar;
select array_to_string(array_agg(error_info), chr(10) || chr(10)) into :error_details
from (
select
'Error in ' || resource_attributes:"snow.executable.type" || ': ' ||
resource_attributes:"snow.executable.name" ||
chr(10) || 'Database: ' || resource_attributes:"snow.database.name" ||
chr(10) || 'Schema: ' || resource_attributes:"snow.schema.name" ||
chr(10) || 'Error Message: ' || value:"message" ||
chr(10) || 'State: ' || value:"state" ||
chr(10) || 'Timestamp: ' || timestamp as error_info
from table(result_scan(snowflake.alert.get_condition_query_uuid()))
order by timestamp desc
limit 10
);
call system$send_email(
'email_int',
'aaa@example.jp',
'Data Pipeline Error Alert',
'The following errors were detected in your data pipelines:' || chr(10) || chr(10) || :error_details
);
end;
アラートの有効化
作成したアラートは有効化する必要があるため、以下のクエリで実行します。
-- アラートの有効化
alter alert event_db.sales_dept.datapipeline_error_alert resume;
アラートを動かしてみた
先ほどエラーログを出力した際に用いた、以下のクエリをもう一度実行します。(Dynamic Tableのクエリはどちらも実行時にエラーとなりますが、エラーログは出力されるため今回の検証においては想定通りの挙動となります。)
-- raw層でエラーになるタスク作成
create or replace task raw_db.sales.error_orders_task
warehouse = compute_wh
schedule = 'using cron 0 */1 * * * utc'
as
-- 実行時にエラーが発生するクエリ(カラム数が明らかに足りない)
insert into raw_db.sales.orders (order_id, customer_id, order_date, product_id, quantity, price)
select
'ord-' || to_varchar(uniform(1000, 9999, random()));
-- タスクを有効化して手動実行
alter task raw_db.sales.error_orders_task resume;
execute task raw_db.sales.error_orders_task;
-- stg層でエラーになるdynamic table作成(データ型エラー)※実行するとエラーになります
create or replace dynamic table stg_db.sales.error_orders
target_lag = '1 minute'
warehouse = compute_wh
as
select
order_id,
customer_id,
-- 意図的にデータ型エラーを発生させる(文字列を日付型に変換)
'not-a-valid-date'::date as order_date,
product_id,
quantity,
price,
price * quantity as total_amount,
current_timestamp() as processed_timestamp
from raw_db.sales.orders;
-- mart層でエラーになるdynamic table作成(ゼロ除算エラー)※実行するとエラーになります
create or replace dynamic table mart_db.sales.error_sales_summary
target_lag = '5 minutes'
warehouse = compute_wh
as
select
product_id,
sum(quantity) as total_quantity,
sum(price) as total_price,
-- 意図的にゼロ除算エラーを発生させる
sum(price) / (sum(quantity) - sum(quantity)) as avg_price -- ゼロ除算
from raw_db.sales.orders
group by product_id;
このクエリを実行して約1分後、メールが下図のように届きました!3つのエラーを発生させていたので、3件分のエラーがまとまって届きました。
サーバーレスアラートのコストを確認
今回、必ずエラーが起きるタスクraw_db.sales.error_orders_task
を1時間毎にスケジュールするようにしていたのですが、エラーログが登録されてアラートが起動した際のコストをSERVERLESS_ALERT_HISTORY
テーブル関数で確認してみました。
select *
from table(information_schema.serverless_alert_history(
date_range_start=>'2025-03-20 00:00:00.000 -0700',
date_range_end=>'2025-03-30 23:59:59.999 -0700'))
order by start_time desc;
ざっくりと、1時間に1回エラーログが発行されるタスクのアラートで「0.0015~0.0040クレジット/h」が消費されていました。
おまけ:アラートの実行記録も確認してみた
おまけですが、アラートの実行記録も以下のクエリで確認してみました。
select *
from
table(information_schema.alert_history(
scheduled_time_range_start
=>dateadd('hour',-1,current_timestamp())))
order by scheduled_time desc;
下図のように記録されていました。TRIGGERED
となっているレコード1つにつき、メールが1つ送信されていました。(いくつも実行ログがあったり、state
列がaction_failed
となっているのは私がアラートの送信がうまくいかなかったため試行錯誤した結果です…)
最後に
Snowflakeの新機能として、対象のテーブルに新しいレコードが追加されたときに指定した条件式を実行してアラート出来るようになったので、タスクとDynamic Tableのエラー通知を行ってみました。
これまで、タスクはNotification Integrationと連携して簡単にエラー通知が出来ましたが、Dynamic Tableの通知はこのブログのようなストアドプロシージャとタスクの設定をするしかなかったため、今回の新機能を用いることで簡単に実用的な監視ができるようになったのがとても嬉しいです!
また、アラートはこれまでユーザー側で定義したクエリを定期的に実行して条件に合致した場合に指定したクエリを実行する仕様であったためウェアハウスのリソースもかなり消費する仕様でしたが、今回の新機能により「新しいレコードが追加されたときにサーバーレスのリソースを使用するだけ」が可能となりましたので、かなりお財布にも優しい仕様になりました。
とても汎用性が高い機能だと思いますので、ぜひ活用してみてください!