CloudWatch Logsからフィルターした特定ログをいい感じにまとめてメール送信してみた
AWS上のシステム監視において、CloudWatch Logsを監視し、特定文字列を含んだログを通知したいことがあります。
CloudWatch LogsにLambdaのサブスクリプションフィルターを設定すると、次のブログのようにリアルタイムで通知できます。
この方式の問題点は、対象ログが連続して発生すると、その数だけ連続して通知されてしまうことです。 電話通知と連動している場合、ひっきりなしに電話が鳴り続けてしまいます。
この問題を回避するために、リアルタイム性を一部犠牲にし、まとめて通知する方法を紹介します。
ポイントは、対象ログを一度SQSにキューイングすることです。
- マネージドサービスに寄せて実装が少ないEventBridge Pipes版
- 実装が増えるけれども自由度の高いEventBridge Scheduler版
の2方式を紹介します。
違いはSQSメッセージの処理方法だけです。
実装方式の違い
方式 | Pipes版 | Scheduler版 |
---|---|---|
呼び出しトリガー | メッセージ数/バッチウィンドウ | スケジュール |
グルーピング | Pipes | Lambda |
最大間隔 | 5分 | 任意 |
最大メッセージ数 | 10,000(Standard)/10(FIFO) | 任意 |
メッセージの通知処理 | Lambda | Lambda |
Lambdaの実装 | シンプル | 複雑 |
EventBridge Pipes版
1つ目に紹介するのが、融通はきかないけれども、シンプルな設定でいい感じに動いてくれるEventBridge Pipes版です。
構成図
CloudWatch Logsの特定ログをSQSに送信
CloudWatch Logsのロググループに対してサブスクリプションフィルターを設定し、特定ログをLambda関数経由でSQSにキューイングします。
Lambda サブスクリプションフィルターの設定では
- Log format : Space Delimited
- Subscription filter pattern : ERROR
のように設定すると、ERROR
の文字列を含むログに対して、ターゲット指定したLambda関数を呼び出せます。
Lambda 関数のサンプルです。
import base64 import gzip import json import os import boto3 sqs = boto3.resource('sqs') queue = sqs.get_queue_by_name(QueueName=os.environ['SQS_QUEUE_NAME']) def lambda_handler(event, context): # ref : https://stackoverflow.com/a/51116602 cw_data = event['awslogs']['data'] compressed_payload = base64.b64decode(cw_data) uncompressed_payload = gzip.decompress(compressed_payload) payload = json.loads(uncompressed_payload) for log_event in payload['logEvents']: queue.send_message(MessageBody=log_event['message'])
SQSメッセージをまとめて処理
どうすればSQSキュー内のメッセージをまとめて処理できるでしょうか?
そこで登場するのが、プロデューサー-コンシューマー型メッセージをいい感じにハンドリングしてくれるEventBridge Pipesです。
EventBridge PipesをSQSと連携させると、SQSメッセージを
- バッチサイズ:最大10,000(Standard)/10(FIFO)メッセージ
- バッチウィンドウ:最大5分
でグルーピングしてターゲット(Lambda関数)に渡します。
この設定・上限値はLambdaのイベントソースにSQSを指定する場合と同じです。
ドキュメントからも、内部的には同等であることが伺えます。
両者の違いは、次のブログを参照ください。
Pipes の設定
Pipes作成時には、ソースでSQSをバッチ設定とともに指定し、ターゲットにLambdaを指定します。
SQSメッセージをSNSに通知するPipesターゲットのLambda関数のサンプルコードです。
import os import boto3 sns = boto3.client('sns') def lambda_handler(event, context): sns.publish( TopicArn=os.environ["SNS_TOPIC_ARN"], Subject="{} error messages found".format(len(event)), Message="\n---\n".join(e["body"] for e in event), )
SQSキューからメッセージをまとめて取得し、処理したメッセージを消し込む処理はEventBridge Pipesが担っているため、Lambdaの実装はバッチで渡ってくるメッセージを処理するだけです。
通知内容
SNSからメール通知したサンプルです。
SQSのメッセージが程よくグルーピングされながら、通知されています。
EventBridge Pipes版の特徴
EventBridge Pipes方式では、バッチ設定(グルーピングルール)はEventBridge Pipesの仕様に引きずられます。
メリットとしては、ターゲットのLambda関数ではグルーピング済みのメッセージをまとめて処理するロジックを書くだけで済み、バッチ設定はPipesに集約されているため、役割分担が明確です。
デメリットとしては、グルーピングルールを細かく制御できないことです。 例えば、30分に一度通知するといったことはPipesの仕様からできません。
また、EventBridge Pipesの代わりにLambdaのイベントソースでSQSを指定しても同等のことを実現できます。 この場合も、上記と同じグルーピング制約を受けます。
EventBridge Scheduler版
次に紹介するのが、呼び出し間隔やSQSメッセージの処理方法までユーザーがコントロールするEventBridge Scheduler版です。
構成図
CloudWatch Logsの特定ログをSQSに送信
この部分は先程と同じため、省略します。
SQSメッセージをまとめて処理
SQSはプロデューサー-コンシューマー型メッセージのため、キューイングされたメッセージをポーリングして処理します。
EventBridge Schedulerを利用して定期的にLambdaを呼び出し、Lambdaがキューイングされたメッセージを処理します。
Scheduler の設定
5分間隔でポーリングする場合
- Recurring schedule
- Rate-based schedule
- Rate expression : 5 minutes
- Flexible time window : Off
のように設定し、ターゲットとしてLambda関数を呼び出します。
SQSキューを処理するサンプルコードです。
import boto3 SQS_QUEUE_URL = "https://sqs.REGION.amazonaws.com/123/YOUR-QUEUE-NAME" sqs = boto3.client("sqs") response = sqs.receive_message( QueueUrl = SQS_QUEUE_URL, MaxNumberOfMessages = 10 ) messages = response.get("Messages", []) while len(messages) > 0: for message in messages: print(message["Body"]) # メッセージでなにか処理 sqs.delete_message( QueueUrl = SQS_QUEUE_URL, ReceiptHandle = message["ReceiptHandle"] ) response = sqs.receive_message( QueueUrl = SQS_QUEUE_URL, MaxNumberOfMessages = 10 ) messages = response.get("Messages", [])
Pipes版のLambda関数と異なり、このLambda関数は
- メッセージをSQSキューから受信
- メッセージを処理
- メッセージをSQSキューから削除
のようにメッセージに関する全ての処理を実装するため、Lambda関数は複雑です。
EventBridge Scheduler版の特徴
EventBridge Scheduler方式では、SQSキュー内のメッセージをどう処理するかはLambda関数の実装次第です。
自由が効く一方で、あれもこれもLambda関数内で実装する必要があります。
まとめ
CloudWatch Logsの特定メッセージをまとめて通知するために、SQSで一度キューイングし、EventBridgeでまとめて処理する方法を紹介しました。
EventBridge Pipes方式の場合、実装がシンプルになる一方で、メッセージのまとめ方はPipesの仕様に引きずられるため、自由が効きません。また、EventBridge Pipesの代わりにLambdaのイベントソースでSQSを指定しても、同様のことを実現できます。
EventBridge Scheduler方式の場合、メッセージをどうまとめるかは、Lambda関数の実装次第です。 自由が効く一方で、Lambda関数の処理が複雑になります。
まずはシンプルなPipes方式を検討し、どうしても運用にマッチしない場合は、Scheduler方式を採用し、Lambda関数を作り込んで運用に迎合させるのが良いのではないかと思います。