CloudWatch Logsからフィルターした特定ログをいい感じにまとめてメール送信してみた

EventBridgeとSQSを使いてCloudWatch Logsの特定ログをバッファリングしながらまとめて通知する方法を紹介! 検知毎に通知されて、うんざりしているアタナにおすすめ。
2023.01.06

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

AWS上のシステム監視において、CloudWatch Logsを監視し、特定文字列を含んだログを通知したいことがあります。

CloudWatch LogsにLambdaのサブスクリプションフィルターを設定すると、次のブログのようにリアルタイムで通知できます。

この方式の問題点は、対象ログが連続して発生すると、その数だけ連続して通知されてしまうことです。 電話通知と連動している場合、ひっきりなしに電話が鳴り続けてしまいます。

この問題を回避するために、リアルタイム性を一部犠牲にし、まとめて通知する方法を紹介します。

ポイントは、対象ログを一度SQSにキューイングすることです。

  • マネージドサービスに寄せて実装が少ないEventBridge Pipes
  • 実装が増えるけれども自由度の高いEventBridge Scheduler

の2方式を紹介します。

違いはSQSメッセージの処理方法だけです。

実装方式の違い

方式 Pipes版 Scheduler版
呼び出しトリガー メッセージ数/バッチウィンドウ スケジュール
グルーピング Pipes Lambda
最大間隔 5分 任意
最大メッセージ数 10,000(Standard)/10(FIFO) 任意
メッセージの通知処理 Lambda Lambda
Lambdaの実装 シンプル 複雑

EventBridge Pipes版

1つ目に紹介するのが、融通はきかないけれども、シンプルな設定でいい感じに動いてくれるEventBridge Pipes版です。

構成図

CloudWatch Logsの特定ログをSQSに送信

CloudWatch Logsのロググループに対してサブスクリプションフィルターを設定し、特定ログをLambda関数経由でSQSにキューイングします。

Lambda サブスクリプションフィルターの設定では

  • Log format : Space Delimited
  • Subscription filter pattern : ERROR

のように設定すると、ERROR の文字列を含むログに対して、ターゲット指定したLambda関数を呼び出せます。

Lambda 関数のサンプルです。

import base64
import gzip
import json
import os

import boto3

sqs = boto3.resource('sqs')
queue = sqs.get_queue_by_name(QueueName=os.environ['SQS_QUEUE_NAME'])

def lambda_handler(event, context):
    # ref : https://stackoverflow.com/a/51116602
    cw_data = event['awslogs']['data']
    compressed_payload = base64.b64decode(cw_data)
    uncompressed_payload = gzip.decompress(compressed_payload)
    payload = json.loads(uncompressed_payload)

    for log_event in payload['logEvents']:
        queue.send_message(MessageBody=log_event['message'])

SQSメッセージをまとめて処理

どうすればSQSキュー内のメッセージをまとめて処理できるでしょうか?

そこで登場するのが、プロデューサー-コンシューマー型メッセージをいい感じにハンドリングしてくれるEventBridge Pipesです。

EventBridge PipesをSQSと連携させると、SQSメッセージを

  • バッチサイズ:最大10,000(Standard)/10(FIFO)メッセージ
  • バッチウィンドウ:最大5分

でグルーピングしてターゲット(Lambda関数)に渡します。

この設定・上限値はLambdaのイベントソースにSQSを指定する場合と同じです。

ドキュメントからも、内部的には同等であることが伺えます。

SQS/Pipesドキュメント

SQS/Lambdaドキュメント

両者の違いは、次のブログを参照ください。

Pipes の設定

Pipes作成時には、ソースでSQSをバッチ設定とともに指定し、ターゲットにLambdaを指定します。

SQSメッセージをSNSに通知するPipesターゲットのLambda関数のサンプルコードです。

import os

import boto3

sns = boto3.client('sns')

def lambda_handler(event, context):
    sns.publish(
        TopicArn=os.environ["SNS_TOPIC_ARN"],
        Subject="{} error messages found".format(len(event)),
        Message="\n---\n".join(e["body"] for e in event),
    )

SQSキューからメッセージをまとめて取得し、処理したメッセージを消し込む処理はEventBridge Pipesが担っているため、Lambdaの実装はバッチで渡ってくるメッセージを処理するだけです。

通知内容

SNSからメール通知したサンプルです。

SQSのメッセージが程よくグルーピングされながら、通知されています。

EventBridge Pipes版の特徴

EventBridge Pipes方式では、バッチ設定(グルーピングルール)はEventBridge Pipesの仕様に引きずられます

メリットとしては、ターゲットのLambda関数ではグルーピング済みのメッセージをまとめて処理するロジックを書くだけで済み、バッチ設定はPipesに集約されているため、役割分担が明確です。

デメリットとしては、グルーピングルールを細かく制御できないことです。 例えば、30分に一度通知するといったことはPipesの仕様からできません。

また、EventBridge Pipesの代わりにLambdaのイベントソースでSQSを指定しても同等のことを実現できます。 この場合も、上記と同じグルーピング制約を受けます。

EventBridge Scheduler版

次に紹介するのが、呼び出し間隔やSQSメッセージの処理方法までユーザーがコントロールするEventBridge Scheduler版です。

構成図

CloudWatch Logsの特定ログをSQSに送信

この部分は先程と同じため、省略します。

SQSメッセージをまとめて処理

SQSはプロデューサー-コンシューマー型メッセージのため、キューイングされたメッセージをポーリングして処理します。

EventBridge Schedulerを利用して定期的にLambdaを呼び出し、Lambdaがキューイングされたメッセージを処理します。

Scheduler の設定

5分間隔でポーリングする場合

  • Recurring schedule
  • Rate-based schedule
  • Rate expression : 5 minutes
  • Flexible time window : Off

のように設定し、ターゲットとしてLambda関数を呼び出します。

SQSキューを処理するサンプルコードです。

import boto3

SQS_QUEUE_URL = "https://sqs.REGION.amazonaws.com/123/YOUR-QUEUE-NAME"
sqs = boto3.client("sqs")

response = sqs.receive_message(
  QueueUrl = SQS_QUEUE_URL,
  MaxNumberOfMessages = 10
)

messages = response.get("Messages", [])

while len(messages) > 0:
    for message in messages:
        print(message["Body"]) # メッセージでなにか処理

        sqs.delete_message(
          QueueUrl = SQS_QUEUE_URL,
          ReceiptHandle = message["ReceiptHandle"]
        )
    
    response = sqs.receive_message(
      QueueUrl = SQS_QUEUE_URL,
      MaxNumberOfMessages = 10
    )
    messages = response.get("Messages", [])

Pipes版のLambda関数と異なり、このLambda関数は

  • メッセージをSQSキューから受信
  • メッセージを処理
  • メッセージをSQSキューから削除

のようにメッセージに関する全ての処理を実装するため、Lambda関数は複雑です。

EventBridge Scheduler版の特徴

EventBridge Scheduler方式では、SQSキュー内のメッセージをどう処理するかはLambda関数の実装次第です。

自由が効く一方で、あれもこれもLambda関数内で実装する必要があります。

まとめ

CloudWatch Logsの特定メッセージをまとめて通知するために、SQSで一度キューイングし、EventBridgeでまとめて処理する方法を紹介しました。

EventBridge Pipes方式の場合、実装がシンプルになる一方で、メッセージのまとめ方はPipesの仕様に引きずられるため、自由が効きません。また、EventBridge Pipesの代わりにLambdaのイベントソースでSQSを指定しても、同様のことを実現できます。

EventBridge Scheduler方式の場合、メッセージをどうまとめるかは、Lambda関数の実装次第です。 自由が効く一方で、Lambda関数の処理が複雑になります。

まずはシンプルなPipes方式を検討し、どうしても運用にマッチしない場合は、Scheduler方式を採用し、Lambda関数を作り込んで運用に迎合させるのが良いのではないかと思います。

参考