dbt Coreのモデル作成失敗時にSlack通知を行うLambda関数を作成する

dbt Coreのモデル作成失敗時にSlack通知を行うLambda関数を作成する

Clock Icon2025.02.28

データ事業本部のueharaです。

今回は、dbt Coreのモデル作成失敗時にSlack通知を行うLambda関数を作成してみたいと思います。

はじめに

dbt Cloudであればジョブ失敗時にメールで通知するといったことが簡単に設定可能なのですが、dbt Coreの場合はそのような機構を自分で用意する必要があります。

dbt向けのデータ監視・モニタリングを行う有名なサードパーティツールとしてelementaryがあり、こちらに send-report という機能があるのですが、こちらはテスト結果を通知するものとなっています。

https://docs.elementary-data.com/oss/guides/generate-report-ui

モデル作成の実行成否についてもelementaryのダッシュボードから確認できはするのですが、失敗時には別途Slackで通知を受け取りたいので、今回はその機構をLambdaを利用して作成したいと思います。

方針

dbtは runtest の実行後に target/run_results.json として実行結果を出力します。

https://docs.getdbt.com/reference/artifacts/run-results-json

今回は実行処理の中でこの run_results.json をS3にPutし、それを契機に起動するLambdaを作成し、失敗時に通知を試みることにします。

アーキテクチャのイメージは以下のような感じです。

20250228_dbt_lambda_03

上記のECSでの処理イメージとしては、以下のように dbt run 実行後に指定バケットに run_results.json をPutする形です。

# dbtモデルの実行
dbt run

# S3へファイルをput
aws s3 cp target/run_results.json s3://uehara-dbt-test-bucket/dbt_run_results/

やってみた

リソース準備

dbtの実行環境

dbtの実行環境は以下で紹介したように既にAWS上に構築されているとし、上記の方針に記載した通りdbtの実行後に run_results.json を指定のバケットに送信するように設定しておきます。

https://dev.classmethod.jp/articles/use-dbt-with-ecs-sfn-redshift/

通知用のLambda

通知用のLambdaはAWS SAMで作成します。

ついでに、格納用のS3バケットもこのSAMテンプレート内に作成しようと思います。

デプロイのためのフォルダ構成は以下の通りです。

.
├── handler
│   └── app.py
├── samconfig.toml
└── template.yaml

まず、 samconfig.toml は以下の通りです。

samconfig.toml
version = 0.1

[default]
region = "ap-northeast-1"

[default.build.parameters]
debug = true

[default.deploy.parameters]
stack_name = "dbt-result-notification"
s3_bucket = <YOUR S3 BUCKET>
s3_prefix = <YOUR PREFIX>
capabilities = "CAPABILITY_NAMED_IAM"
confirm_changeset = true

デプロイ用のS3バケットやプレフィックスは、適宜ご自身の環境に合わせて下さい。

次に、リソースを定義した template.yaml は以下の通りです。

template.yaml
AWSTemplateFormatVersion: "2010-09-09"
Transform: AWS::Serverless-2016-10-31
Description: "SAM Application"

Parameters:
  BucketName:
    Type: String
    Default: <YOUR S3 BUCKET NAME>
  LambdaName:
    Type: String
    Default: <YOUR LAMBDA NAME>

Globals:
  Function:
    Timeout: 180 # 180 seconds
    MemorySize: 128

Resources:
  # S3
  MyS3Bucket:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: !Sub "${BucketName}"
      BucketEncryption:
        ServerSideEncryptionConfiguration:
          - ServerSideEncryptionByDefault:
              SSEAlgorithm: AES256
      PublicAccessBlockConfiguration:
        BlockPublicAcls: true
        BlockPublicPolicy: true
        IgnorePublicAcls: true
        RestrictPublicBuckets: true
      NotificationConfiguration:
        EventBridgeConfiguration:
          EventBridgeEnabled: true
  # S3 Bucket Policy
  MyS3BucketPolicy:
    Type: AWS::S3::BucketPolicy
    Properties:
      Bucket: !Ref MyS3Bucket
      PolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Sid: AllowSSLRequestsOnly
            Action:
              - "s3:*"
            Effect: Deny
            Resource:
              - !Sub "arn:aws:s3:::${MyS3Bucket}"
              - !Sub "arn:aws:s3:::${MyS3Bucket}/*"
            Principal: "*"
            Condition:
              Bool:
                "aws:SecureTransport": "false"
  # Lambda
  MyLambdaFunction:
    Type: AWS::Serverless::Function
    Properties:
      FunctionName: !Sub "${LambdaName}"
      Role: !GetAtt MyLambdaFunctionRole.Arn
      CodeUri: handler/
      Handler: app.lambda_handler
      Runtime: python3.9
      Architectures:
        - x86_64
      Layers:
        - arn:aws:lambda:ap-northeast-1:336392948345:layer:AWSSDKPandas-Python39:15
      Environment:
        Variables:
          WEBHOOK_URL: <YOUR SLACK WEBHOOK URL>
      Events:
        S3PutEvent:
          Type: S3
          Properties:
            Bucket: !Ref MyS3Bucket
            Events: "s3:ObjectCreated:*"
            Filter:
              S3Key:
                Rules:
                  # 監視したいプレフィックスとサフィックス
                  - Name: prefix
                    Value: "dbt_run_results/"
                  - Name: suffix
                    Value: ".json"
  # IAM Role for Lambda
  MyLambdaFunctionRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: !Sub "${LambdaName}-role"
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Action: sts:AssumeRole
            Principal:
              Service:
                - lambda.amazonaws.com
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
        - arn:aws:iam::aws:policy/AmazonS3FullAccess

run_results.json を格納するためのS3バケット名や、Lambda関数名はご自身の任意の値を設定してください。

また、Slackへの通知はWebhookを利用するものとし、そのURLを WEBHOOK_URL としてLamndaの環境変数に設定を行います。

最後に、肝心の app.py は以下の通りです。

app.py
import json
import logging
import os
import urllib

import boto3

logger = logging.getLogger()
logger.setLevel("INFO")

def send_dbt_errors_to_slack(failed_jobs, webhook_url):
    """
    dbtの失敗した実行結果をSlackに通知する

    Args:
        failed_jobs (Dict[str, List[Dict]]): エラーと失敗のジョブ情報
        webhook_url (str): SlackのWebhook URL
    """
    # エラーがない場合は終了
    if not failed_jobs["errors"]:
        logger.info("Error is not found.")
        return

    # 各エラーの情報を文字列として結合
    error_messages = []
    for error in failed_jobs["errors"]:
        error_text = (
            f"*Relation:* {error['relation_name']}\n"
            f"*Status:* {error['status']}\n"
            f"*Message:* \n```{error['message']}```\n"
            # f"*Compiled Code:*\n```{error['compiled_code']}```"
            "\n" + "─" * 40 + "\n"  # 区切り線
        )
        error_messages.append(error_text)

    # Slackメッセージの作成
    send_data = {
        "attachments": [
            {
                "color": "#E01E5A",  # 赤色
                "pretext": "dbtのモデル実行でエラーが発生しました",
                "text": "\n".join(error_messages),
            }
        ]
    }

    try:
        # Slackに送信
        send_text = json.dumps(send_data)
        request = urllib.request.Request(
            webhook_url, data=send_text.encode("utf-8"), method="POST"
        )

        with urllib.request.urlopen(request) as response:
            if response.status != 200:
                logger.error(
                    f"Failed to send Slack notification. Status code: {response.status}"
                )
                logger.error(f"Response: {response.read().decode('utf-8')}")

    except Exception as e:
        logger.error(f"Error sending Slack notification: {str(e)}")
        raise

def parse_dbt_results(file_path):
    """
    dbtのrun_results.jsonをパースして失敗したジョブの情報を抽出する

    Args:
        file_path (str): run_results.jsonのファイルパス

    Returns:
        Dict[str, List[Dict]]: エラーと失敗のジョブ情報
    """
    results = {
        "errors": [],  # 実行エラーを格納する
        "failures": [],  # テスト失敗を格納する
    }

    try:
        # JSONファイルを読み込む
        with open(file_path, "r") as f:
            data = json.load(f)

        # resultsセクションを取得してループ処理
        for result in data.get("results", []):
            status = result.get("status")
            # エラーまたは失敗したジョブを抽出
            if status in ["error", "failure"]:
                job_info = {
                    "unique_id": result.get("unique_id", "Unknown"),
                    "status": status,
                    "message": result.get("message", "No message provided"),
                    "execution_time": result.get("execution_time", 0),
                    "thread_id": result.get("thread_id", "Unknown"),
                    "compiled_code": result.get("compiled_code", "No code available"),
                    "failures": result.get("failures"),
                    "relation_name": result.get("relation_name", "No relation name"),
                }

                if status == "error":
                    results["errors"].append(job_info)
                else:
                    results["failures"].append(job_info)

        return results

    except Exception as e:
        logger.error(f"Error: An unexpected error occurred: {str(e)}")
        raise

def lambda_handler(event, context):
    s3_bucket = event["Records"][0]["s3"]["bucket"]["name"]
    object_key = event["Records"][0]["s3"]["object"]["key"]
    logger.info(f"s3_bucket: {s3_bucket}, object_key: {object_key}")

    # ファイルのダウンロード
    s3_resource = boto3.resource("s3")
    file_path = "/tmp/run_results.json"
    s3_resource.Object(s3_bucket, object_key).download_file(file_path)

    # 失敗したジョブを抽出
    failed_jobs = parse_dbt_results(file_path)

    # Slack通知
    webhook_url = os.getenv("WEBHOOK_URL")
    send_dbt_errors_to_slack(failed_jobs, webhook_url)

    return {"message": "success"}

jsonのパースを行う parse_dbt_results 関数では一応 errorfailure の両方を捕捉するようにしていますが、今回は error のみしか通知をしません。( failure の通知も必要とならばできるように捕捉しています)

また、通知する情報も個人的に知りたい情報にのみに絞っています。

Slackには送信文字数の制限があるためコンパイルしたコードはメッセージに含めないようにしていますが、一応入れる場合の例として f"*Compiled Code:*\n```{error['compiled_code']}```" をコメントアウトして残しております。

その他、今回はSlackのメッセージではエラーを区切り線で1つに結合して送信するようにしていますが、ジョブごとに通知を行いたい場合はまとめずにエラー毎にSlackのAPIを叩くよう修正してください。

ここまで作成できれば、デプロイを行います。

デプロイは以下のコマンドで実行可能です。

$ sam deploy

デプロイが完了すると、以下のようなリソースが作成されていることをCloudFormationから確認することができると思います。

20250228_dbt_lambda_04

実行確認

私がデプロイしたリソースでは、S3バケットの uehara-dbt-test-bucket バケットに dbt_run_results のプレフィックスを持つjsonファイルのオブジェクトがPutされればLambdaが起動するようになっているため、dbt実行後にそちらにファイルをPutするようにします。

# dbtモデルの実行
dbt run

# S3へファイルをput
aws s3 cp target/run_results.json s3://uehara-dbt-test-bucket/dbt_run_results/

結果、以下のように無事通知を受け取ることができました。

20250228_dbt_lambda_05

送信文字数の制限に抵触しないことを前提として、コンパイルしたSQLも記載した場合は以下の通りです。

20250228_dbt_lambda_06

最後に

今回は、dbt Coreのモデル作成失敗時にSlack通知を行うLambda関数を作成してみました。

参考になりましたら幸いです。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.