Snowflake の通知統合を使ったら、秒でメール送信できてびっくりした話。
こんにちは、みかみです。
7月7日はクラスメソッドの創立記念日です。
おかげさまで創立21周年を迎えることができました!
今後とも、どうぞよろしくお願いいたします。
はじめに
Snowflake や Databricks など、オールインワンなデータプラットフォームのお話、最近よく耳にします。
データ分析基盤を本番運用するにあたっては、もちろん運用時のエラーアラートもセットで検討する必要があります。
Snowflake でエラーが発生した場合にアラートメールを送信するには?
と確認していたら、
SQL 実行だけで簡単にメール送信できるらしい!
ので、ためしてみました。
前提
本記事内の操作では、Snowflake の ACCOUNTADMIN ロール、および AWS の AdministratorAccess ポリシーを付与したユーザーを使用しています。
また、Snowflake は、AWS US West(Oregon)で作成したトライアルアカウントを使用しています。
SQL 実行して、メールを送信してみる。
まずは、以下の SQL で、通知統合を作成します。
create notification integration my_notify_int
type = EMAIL
ENABLED = TRUE
ALLOWED_RECIPIENTS = ('****@classmethod.jp')
※メールアドレスは一部伏せ字に変更しています。
続いて、以下のSQLを実行して、先ほど通知統合で設定したアドレスにメールを送信してみます。
call system$send_email(
'my_notify_int',
'****@classmethod.jp',
'Email Alert from Snowflake: Hello!',
'This is Test for Alert from Snowflake...'
)
なんと、たったこれだけで、ちゃんとメールが送信できました!
なお、通知統合作成時に指定していないアドレスにメール送信しようとしてみると
call system$send_email(
'my_notify_int',
-- '****@classmethod.jp',
'aaa@test.co.jp',
'Email Alert from Snowflake: Hello!',
'This is Test for Alert from Snowflake...'
)
ちゃんとエラーになるので、意図しない宛先への誤送信の心配はありません。
また、メール送信先には、同アカウント内のユーザー登録済みのメールアドレスのみ指定可能なので、ご注意ください。
メールアドレスを Secret Manager で管理するよう変更してみる。
というわけで、SQL 2つ実行するだけで、簡単にメール送信できてしまいました。
とはいえ、メールアドレスはセンシティブな情報なので、取り扱いには注意が必要です。
SQL で直接指定するのはちょっと心配です。
送信先のメールアドレスは AWS Secret Manager に保存するようにしてみます。
メールアドレスのシークレットを作成
AWS 管理コンソールから、メールアドレスを保存する Secret Manager のシークレットを作成しました。
Secret Manager にアクセスする外部関数を作成
以下のドキュメントに従って、Secret Manager に保存したメールアドレスを取得する、外部関数を作成します。
シークレットを取得する AWS Lambda 関数を作成
先ほど作成したシークレットを取得する Lambda 関数をデプロイします。
import boto3
from botocore.exceptions import ClientError
import json
def lambda_handler(event, context):
secret_name = "cm-mikami-secret"
region_name = "ap-northeast-1"
status_code = 200
array_of_rows_to_return = []
try:
if "body" in event:
# API Gateway経由
event_body = event["body"]
payload = json.loads(event_body)
else:
# テスト実行
payload = event
rows = payload["data"]
# AWS Secrets Manager client setup
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager',
region_name=region_name
)
# Get secret
get_secret_value_response = client.get_secret_value(
SecretId=secret_name
)
secret_string = get_secret_value_response['SecretString']
secret = json.loads(secret_string)
emails = secret['emails'].split(',')
# For each input row
for row in rows:
row_number = row[0]
row_to_return = [row_number, emails]
array_of_rows_to_return.append(row_to_return)
json_compatible_string_to_return = json.dumps({"data": array_of_rows_to_return})
except ClientError as e:
status_code = 400
json_compatible_string_to_return = json.dumps({
"data": [[0, f"AWS error: {str(e)}"]]
})
except Exception as e:
status_code = 400
json_compatible_string_to_return = json.dumps({
"data": [[0, f"Error: {str(e)}"]]
})
return {
'statusCode': status_code,
'body': json_compatible_string_to_return
}
続いて Lambda 実行ロールにインラインポリシーを追加して、シークレットにアクセスできるようにします。
Lambda 関数をテスト実行してみると
期待通り、Secret Manager に保存したメールアドレスが取得できました。
Snowflake 用の AWS IAM ロールを作成
AWS 管理コンソールから、Snowflake が AWS にアクセスする際に使用する IAM ロールを作成します。
[信頼されたエンティティタイプ] で [AWS アカウント] を指定して、[別の AWS アカウント] に Snowflake の AWS アカウントを指定する必要がありますが、この段階ではとりあえず自身のAWSアカウントを入力しておきます。
作成したロールの ARN をひかえておきます。
Amazon API Gateway を作成
Snowflake から AWS にアクセスする際のエンドポイントとなる、API Gateway を作成します。
[API タイプ] で [REST API] を指定し、[新しい API] を作成後、任意の名前のリソースを作成してから、Lambda 統合の POST メソッドを作成します。
作成した API Gateway から Lambda 関数を実行して、ちゃんとシークレットを取得できるかテストしてみます。
期待通り動いているようです。
Snowflake の API 統合を作成
Snowflake で以下の SQL を実行して、API 統合を作成します。
create or replace api integration aws_api_get_secret
api_provider = aws_api_gateway
api_aws_role_arn = 'arn:aws:iam::************:role/cm-mikami-sf-integration'
api_allowed_prefixes = ('https://dihukvxk90.execute-api.ap-northeast-1.amazonaws.com/dev/get-emails')
enabled = true
※一部伏せ字に変更しています。
API 統合作成後、以下の SQL を実行して API_AWS_IAM_USER_ARN
と API_AWS_EXTERNAL_ID
を取得してひかえておきます。
desc integration aws_api_get_secret
AWS のロールに Snowflake からのアクセス許可を設定
Snowflake 用ロールの [信頼ポリシーを編集] し、[Principal] の AWS ARN を先ほど確認した API 統合の API_AWS_IAM_USER_ARN
に書き換えます。
また、[Condition] に、API 統合の API_AWS_EXTERNAL_ID
を追加しました。
Snowflake の外部関数を作成
Snowflake で以下の SQL を実行して、Secret Manager からメールアドレスを取得する外部関数を作成しました。
create or replace external function get_email_from_secrets()
returns array
API_INTEGRATION = aws_api_get_secret
as 'https://dihukvxk90.execute-api.ap-northeast-1.amazonaws.com/dev/get-emails'
テスト実行してみます。
select get_email_from_secrets()
Secret Manager で管理しているメールアドレスが取得できることを確認できました。
通知統合を作成してメール送信するストアドプロシージャを作成
以下の SQL を実行して、外部関数を使って Secret Manager からメールアドレスを取得した後、通知統合を作成/更新してメールを送信する、ストアドプロシージャを作成しました。
CREATE OR REPLACE PROCEDURE send_email_with_dynamic_integration(
subject STRING DEFAULT 'Email Alert from Snowflake: Hello!',
message STRING DEFAULT 'This is Test for Alert from Snowflake...'
)
RETURNS STRING
AS
$$
DECLARE
email_array ARRAY;
email_list STRING;
integration_name STRING := 'my_notify_int';
sent_count INTEGER := 0;
result_msg STRING;
current_email STRING;
i INTEGER;
BEGIN
-- メールアドレス取得
email_array := (SELECT get_email_from_secrets());
email_list := '';
FOR i IN 0 TO ARRAY_SIZE(:email_array) - 1 DO
IF (:i > 0) THEN
email_list := :email_list || ',';
END IF;
email_list := :email_list || '''' || TRIM(:email_array[:i]::string) || '''';
END FOR;
-- 通知統合を作成/更新
EXECUTE IMMEDIATE
'CREATE OR REPLACE NOTIFICATION INTEGRATION ' || :integration_name ||
' TYPE = EMAIL ENABLED = TRUE ALLOWED_RECIPIENTS = (' || :email_list || ')';
result_msg := 'Integration created/updated. ';
-- メール送信
FOR i IN 0 TO ARRAY_SIZE(:email_array) - 1 DO
current_email := TRIM(:email_array[:i]::string);
CALL SYSTEM$SEND_EMAIL(
:integration_name,
:current_email,
:subject,
:message
);
sent_count := :sent_count + 1;
END FOR;
RETURN :result_msg || 'Sent ' || :sent_count || ' emails.';
EXCEPTION
WHEN OTHER THEN
RETURN 'Error: ' || SQLERRM;
END;
$$;
ストアドプロシージャを実行して挙動を確認してみます。
call send_email_with_dynamic_integration()
意図した通り、Secret Manager で管理しているメールアドレス宛に、メールが送信されることを確認できました。
まとめ(所感)
AWS や Google Cloud などのパブリッククラウドでも、それほど作り込む必要なく簡単にメール送信可能ですが、SQL レイヤだけでメール送信設定が完結する、この Snowflake の通知統合には驚愕しました。
今回はメール送信用のストアドプロシージャを作成するところまでしか試していませんが、データ処理タスクと連携してエラー発生時にアラートメールを送信したり、Alerts on new data でテーブルに新しいデータが格納されたことをトリガにデータ品質をチェックして問題がある場合はメール送信したり、シンプルな実装で運用の効率化が図れるのではないかと思います。
思えば10数年前、初めて AWS に出会った時に、インフラの知識がなくても簡単&安価でサーバーが構築できてしまうことに、世の中にはこんな便利なものがあるのかと、驚きと喜びを感じたものでした。
自分が知らなかった便利な技術に触れたときにはいつも嬉しくなり、独り言をつぶやいて飼い犬の注目を集めているような気がします。
生成 AI 全盛期に突入した昨今、今後ますます便利なものが増えてゆくと思いますが、新しい技術やツールに振り回されるだけではなく、きちんと使いこなせる自分で在れるよう、引き続きキャッチアップを続けてゆきたいと思っています。