dbt Coreのモデル作成失敗時にSlack通知を行うLambda関数を作成する
データ事業本部のueharaです。
今回は、dbt Coreのモデル作成失敗時にSlack通知を行うLambda関数を作成してみたいと思います。
はじめに
dbt Cloudであればジョブ失敗時にメールで通知するといったことが簡単に設定可能なのですが、dbt Coreの場合はそのような機構を自分で用意する必要があります。
dbt向けのデータ監視・モニタリングを行う有名なサードパーティツールとしてelementaryがあり、こちらに send-report
という機能があるのですが、こちらはテスト結果を通知するものとなっています。
モデル作成の実行成否についてもelementaryのダッシュボードから確認できはするのですが、失敗時には別途Slackで通知を受け取りたいので、今回はその機構をLambdaを利用して作成したいと思います。
方針
dbtは run
や test
の実行後に target/run_results.json
として実行結果を出力します。
今回は実行処理の中でこの run_results.json
をS3にPutし、それを契機に起動するLambdaを作成し、失敗時に通知を試みることにします。
アーキテクチャのイメージは以下のような感じです。
上記のECSでの処理イメージとしては、以下のように dbt run
実行後に指定バケットに run_results.json
をPutする形です。
# dbtモデルの実行
dbt run
# S3へファイルをput
aws s3 cp target/run_results.json s3://uehara-dbt-test-bucket/dbt_run_results/
やってみた
リソース準備
dbtの実行環境
dbtの実行環境は以下で紹介したように既にAWS上に構築されているとし、上記の方針に記載した通りdbtの実行後に run_results.json
を指定のバケットに送信するように設定しておきます。
通知用のLambda
通知用のLambdaはAWS SAMで作成します。
ついでに、格納用のS3バケットもこのSAMテンプレート内に作成しようと思います。
デプロイのためのフォルダ構成は以下の通りです。
.
├── handler
│ └── app.py
├── samconfig.toml
└── template.yaml
まず、 samconfig.toml
は以下の通りです。
version = 0.1
[default]
region = "ap-northeast-1"
[default.build.parameters]
debug = true
[default.deploy.parameters]
stack_name = "dbt-result-notification"
s3_bucket = <YOUR S3 BUCKET>
s3_prefix = <YOUR PREFIX>
capabilities = "CAPABILITY_NAMED_IAM"
confirm_changeset = true
デプロイ用のS3バケットやプレフィックスは、適宜ご自身の環境に合わせて下さい。
次に、リソースを定義した template.yaml
は以下の通りです。
AWSTemplateFormatVersion: "2010-09-09"
Transform: AWS::Serverless-2016-10-31
Description: "SAM Application"
Parameters:
BucketName:
Type: String
Default: <YOUR S3 BUCKET NAME>
LambdaName:
Type: String
Default: <YOUR LAMBDA NAME>
Globals:
Function:
Timeout: 180 # 180 seconds
MemorySize: 128
Resources:
# S3
MyS3Bucket:
Type: AWS::S3::Bucket
Properties:
BucketName: !Sub "${BucketName}"
BucketEncryption:
ServerSideEncryptionConfiguration:
- ServerSideEncryptionByDefault:
SSEAlgorithm: AES256
PublicAccessBlockConfiguration:
BlockPublicAcls: true
BlockPublicPolicy: true
IgnorePublicAcls: true
RestrictPublicBuckets: true
NotificationConfiguration:
EventBridgeConfiguration:
EventBridgeEnabled: true
# S3 Bucket Policy
MyS3BucketPolicy:
Type: AWS::S3::BucketPolicy
Properties:
Bucket: !Ref MyS3Bucket
PolicyDocument:
Version: "2012-10-17"
Statement:
- Sid: AllowSSLRequestsOnly
Action:
- "s3:*"
Effect: Deny
Resource:
- !Sub "arn:aws:s3:::${MyS3Bucket}"
- !Sub "arn:aws:s3:::${MyS3Bucket}/*"
Principal: "*"
Condition:
Bool:
"aws:SecureTransport": "false"
# Lambda
MyLambdaFunction:
Type: AWS::Serverless::Function
Properties:
FunctionName: !Sub "${LambdaName}"
Role: !GetAtt MyLambdaFunctionRole.Arn
CodeUri: handler/
Handler: app.lambda_handler
Runtime: python3.9
Architectures:
- x86_64
Layers:
- arn:aws:lambda:ap-northeast-1:336392948345:layer:AWSSDKPandas-Python39:15
Environment:
Variables:
WEBHOOK_URL: <YOUR SLACK WEBHOOK URL>
Events:
S3PutEvent:
Type: S3
Properties:
Bucket: !Ref MyS3Bucket
Events: "s3:ObjectCreated:*"
Filter:
S3Key:
Rules:
# 監視したいプレフィックスとサフィックス
- Name: prefix
Value: "dbt_run_results/"
- Name: suffix
Value: ".json"
# IAM Role for Lambda
MyLambdaFunctionRole:
Type: AWS::IAM::Role
Properties:
RoleName: !Sub "${LambdaName}-role"
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action: sts:AssumeRole
Principal:
Service:
- lambda.amazonaws.com
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
- arn:aws:iam::aws:policy/AmazonS3FullAccess
run_results.json
を格納するためのS3バケット名や、Lambda関数名はご自身の任意の値を設定してください。
また、Slackへの通知はWebhookを利用するものとし、そのURLを WEBHOOK_URL
としてLamndaの環境変数に設定を行います。
最後に、肝心の app.py
は以下の通りです。
import json
import logging
import os
import urllib
import boto3
logger = logging.getLogger()
logger.setLevel("INFO")
def send_dbt_errors_to_slack(failed_jobs, webhook_url):
"""
dbtの失敗した実行結果をSlackに通知する
Args:
failed_jobs (Dict[str, List[Dict]]): エラーと失敗のジョブ情報
webhook_url (str): SlackのWebhook URL
"""
# エラーがない場合は終了
if not failed_jobs["errors"]:
logger.info("Error is not found.")
return
# 各エラーの情報を文字列として結合
error_messages = []
for error in failed_jobs["errors"]:
error_text = (
f"*Relation:* {error['relation_name']}\n"
f"*Status:* {error['status']}\n"
f"*Message:* \n```{error['message']}```\n"
# f"*Compiled Code:*\n```{error['compiled_code']}```"
"\n" + "─" * 40 + "\n" # 区切り線
)
error_messages.append(error_text)
# Slackメッセージの作成
send_data = {
"attachments": [
{
"color": "#E01E5A", # 赤色
"pretext": "dbtのモデル実行でエラーが発生しました",
"text": "\n".join(error_messages),
}
]
}
try:
# Slackに送信
send_text = json.dumps(send_data)
request = urllib.request.Request(
webhook_url, data=send_text.encode("utf-8"), method="POST"
)
with urllib.request.urlopen(request) as response:
if response.status != 200:
logger.error(
f"Failed to send Slack notification. Status code: {response.status}"
)
logger.error(f"Response: {response.read().decode('utf-8')}")
except Exception as e:
logger.error(f"Error sending Slack notification: {str(e)}")
raise
def parse_dbt_results(file_path):
"""
dbtのrun_results.jsonをパースして失敗したジョブの情報を抽出する
Args:
file_path (str): run_results.jsonのファイルパス
Returns:
Dict[str, List[Dict]]: エラーと失敗のジョブ情報
"""
results = {
"errors": [], # 実行エラーを格納する
"failures": [], # テスト失敗を格納する
}
try:
# JSONファイルを読み込む
with open(file_path, "r") as f:
data = json.load(f)
# resultsセクションを取得してループ処理
for result in data.get("results", []):
status = result.get("status")
# エラーまたは失敗したジョブを抽出
if status in ["error", "failure"]:
job_info = {
"unique_id": result.get("unique_id", "Unknown"),
"status": status,
"message": result.get("message", "No message provided"),
"execution_time": result.get("execution_time", 0),
"thread_id": result.get("thread_id", "Unknown"),
"compiled_code": result.get("compiled_code", "No code available"),
"failures": result.get("failures"),
"relation_name": result.get("relation_name", "No relation name"),
}
if status == "error":
results["errors"].append(job_info)
else:
results["failures"].append(job_info)
return results
except Exception as e:
logger.error(f"Error: An unexpected error occurred: {str(e)}")
raise
def lambda_handler(event, context):
s3_bucket = event["Records"][0]["s3"]["bucket"]["name"]
object_key = event["Records"][0]["s3"]["object"]["key"]
logger.info(f"s3_bucket: {s3_bucket}, object_key: {object_key}")
# ファイルのダウンロード
s3_resource = boto3.resource("s3")
file_path = "/tmp/run_results.json"
s3_resource.Object(s3_bucket, object_key).download_file(file_path)
# 失敗したジョブを抽出
failed_jobs = parse_dbt_results(file_path)
# Slack通知
webhook_url = os.getenv("WEBHOOK_URL")
send_dbt_errors_to_slack(failed_jobs, webhook_url)
return {"message": "success"}
jsonのパースを行う parse_dbt_results
関数では一応 error
と failure
の両方を捕捉するようにしていますが、今回は error
のみしか通知をしません。( failure
の通知も必要とならばできるように捕捉しています)
また、通知する情報も個人的に知りたい情報にのみに絞っています。
Slackには送信文字数の制限があるためコンパイルしたコードはメッセージに含めないようにしていますが、一応入れる場合の例として f"*Compiled Code:*\n```{error['compiled_code']}```"
をコメントアウトして残しております。
その他、今回はSlackのメッセージではエラーを区切り線で1つに結合して送信するようにしていますが、ジョブごとに通知を行いたい場合はまとめずにエラー毎にSlackのAPIを叩くよう修正してください。
ここまで作成できれば、デプロイを行います。
デプロイは以下のコマンドで実行可能です。
$ sam deploy
デプロイが完了すると、以下のようなリソースが作成されていることをCloudFormationから確認することができると思います。
実行確認
私がデプロイしたリソースでは、S3バケットの uehara-dbt-test-bucket
バケットに dbt_run_results
のプレフィックスを持つjsonファイルのオブジェクトがPutされればLambdaが起動するようになっているため、dbt実行後にそちらにファイルをPutするようにします。
# dbtモデルの実行
dbt run
# S3へファイルをput
aws s3 cp target/run_results.json s3://uehara-dbt-test-bucket/dbt_run_results/
結果、以下のように無事通知を受け取ることができました。
送信文字数の制限に抵触しないことを前提として、コンパイルしたSQLも記載した場合は以下の通りです。
最後に
今回は、dbt Coreのモデル作成失敗時にSlack通知を行うLambda関数を作成してみました。
参考になりましたら幸いです。