特定のAWS Glue CrawlerのCloudWatch Logsの文字列を検知して通知する

サブスクリプションフィルターとLambdaを使います
2023.10.31

データアナリティクス事業本部インテグレーション部コンサルティングチーム・新納(にいの)です。

AWS Glue Crawlerのログ内容を通知させるには、CloudWatch Logsでメトリクスフィルターを設定する方法があります。

しかし、ちょっとした「どうしたらいいんだろう」ポイントがあります。というのも、Glueクローラーのログは/aws/glue/crawlersというロググループの配下にクローラー名でログストリームが作成されます。

メトリクスフィルターはロググループに対して設定するため、全てのクローラーに対してログ検知が行われます。「本番環境のGlueクローラーだけ通知させたいな…」などといった、特定のGlueクローラーのみ通知が必要な場合には少し困りますよね。

本エントリでは、CloudWatch LogsのサブスクリプションフィルターとAWS Lambdaの組み合わせで、特定のGlueクローラーのみログ検知する方法を試してみました。

前提

やりたいこと

今回の構成は以下の通り。

ポイントは以下の通りです。

  • GlueクローラーからCloudWatch Logsにログが出力される
  • サブスクリプションフィルターで特定の文言とマッチした際にLambda関数を起動
    • 今回はINFO : Discovered schema changesを検知
  • 特定のログストリーム(Glueクローラー名)の場合のみSNSでメール通知

サブスクリプションフィルターとは

CloudWatch Logsのログデータに指定した文字列が含まれている場合、リアルタイムに検知してKinesisやLambdaといったサービスに転送できます。ログはbase64でエンコード・gzip形式で圧縮されています。

CloudWatch Logsには別のログフィルタ方法として、メトリクスフィルターがあります(冒頭で紹介したブログはメトリクスフィルターを使っています)。メトリクスフィルターを使えば簡単に設定できて便利なのですが、複雑な条件での文字列のマッチが必要な場合などで対応できないケースがあります。今回のGlueクローラーのログ検知のケースでは、同一ログイベント(レコード)内にクローラー名と検知したい文字列が存在していないため、メトリクスフィルターでは対応できません。サブスクリプションフィルターでは柔軟な検知を実現できたり、通知内容をカスタマイズできたりします。

今回はサブスクリプションフィルターを使ってログをLambdaに転送することで特定のログストリーム(Glueクローラー名)に絞り、通知内容にクローラー名やログ内容を含めてわかりやすくしてみました。

SNSトピックの作成

通知用のSNSトピックを作成します。以下のドキュメントに沿って作成します。

作成したSNSトピックのARNは後続の処理で使います。

Lambda関数の作成

CloudWatch Logsからのイベントをデコードして解凍処理して、特定のログストリームのログのみをSNSトピックに通知するためのLambda関数を作成します。今回はPython 3.9を使っています。

コードは以下のエントリをベースとしてカスタマイズしています。

import base64
import json
import zlib
import os
import boto3

sns = boto3.client("sns")

def lambda_handler(event, context):
    data = zlib.decompress(
        base64.b64decode(event["awslogs"]["data"]), 16 + zlib.MAX_WBITS
    )
    data_json = json.loads(data)
    log_stream = data_json["logStream"]

    # 対象のlogStreamの場合のみ処理を実行
    if log_stream == "<クローラー名>":
        log_events = data_json["logEvents"]
        for log_event in log_events:
            message = f"クローラー:{log_stream}\n\nメッセージ:\n\n{log_event['message']}"
            try:
                # SNS Publish
                publishResponse = sns.publish(
                    TopicArn=os.environ["SNS_TOPIC_ARN"],
                    Message=message,
                    Subject=os.environ["ALARM_SUBJECT"]
                )
            except Exception as e:
                print(e)

大事なポイントは以下の通り。

ログデータの解凍とデコード

サブスクリプションフィルターでLambda関数に転送したログはbase64でエンコード・gzip形式で圧縮されているため、解凍とデコード処理をします。CloudWatch Logsはevent['awslogs']['data']で取得します。

 data = zlib.decompress(base64.b64decode(event['awslogs']['data']), 16 + zlib.MAX_WBITS)

特定のログストリームのみ処理

Glueクローラーの場合、CloudWatch Logsのログストリームにクローラー名が記載されます。今回はdev-crawler-1という名前のGlueクローラーのみ対象とします。

    # 対象のlogStreamの場合のみ処理を実行
    if log_stream == 'dev-crawler-1':

通知メッセージを編集

せっかくなので通知メッセージにクローラー名と検知したログ内容が記載されるようにします。

            message = f"クローラー:{log_stream}\n\nメッセージ:\n\n{log_event['message']}"
            try:
                sns = boto3.client('sns')
                # SNS Publish
                publishResponse = sns.publish(
                    TopicArn=os.environ['SNS_TOPIC_ARN'],
                    Message=message,
                    Subject=os.environ['ALARM_SUBJECT']
                )

SNSトピックのARN(SNS_TOPIC_ARN)と通知メールの件名(ALARM_SUBJECT)は、Lambda関数の環境変数に保存しています。先ほど作成したSNSトピックのARNを指定します。

サブスクリプションフィルターを設定

CloudWatch Logsのロググループ/aws/glue/crawlersのアクションからLambdaサブスクリプションフィルターの作成をします。

送信先のLambda関数には先ほど作成したものを指定します。

ログ形式にはJSON、サブスクリプションフィルターのパターンには検知したい文字列を指定します。今回はスキーマ変更を検知したいので、INFO : Discovered schema changesを指定しました。

「ストリーミングを開始」を押せば準備は完了です。

CloudWatch Logsに指定した文字列のイベントが発生すると、以下のようなメールが通知されました。

最後に

サブスクリプションフィルターとLambda関数を使って、特定のGlueクローラーのみログ検知を行ってみました。メトリクスフィルターでは全てのクローラーが検知対象となってしまい、通知過多となってしまいがちですが、サブスクリプションフィルターを使うことで対象が絞られました。

Lambda関数の作成と管理という手間はかかるのですが、今回のケースのように柔軟な検知が実現できたり、通知内容のカスタマイズもできたりといったメリットもあります。

このブログがどなたかのお役に立てば幸いです。