EventBridge Pipesを使ってDynamoDBの変更を検知・通知してみた
はじめに
かつまたです。EventBridge Pipes学習のために以下構成図のようなDynamoDB ストリーム+EventBridge Pipes+SQS+Lambda+SNS構成を作成してみたので手順をご紹介します。
想定シナリオとしては、DynamoDBにアイテム(商品)が挿入された際、EventBridge Pipesの判定により適切なアイテムであった場合、SQSキューを経由してその数量をLambdaで判定し、アラートメールを送信するユースケースです。
Eventbridge Pipesとは
EventBridge Pipesはソースとターゲット間の接続を可能にします。また、ソースとターゲットの間に、フィルタリングとエンリッチメントという工程を追加し、フィルターや変換を実施することも可能です。
-
ソース
DynamoDB ストリーム、Kinesis Data Streams、SQSなどから、ストリーミングデータやメッセージを自動的に読み取ります。 -
フィルタリング
ソースから取得したイベントを条件に基づいて選別するオプションフェーズです。
EventBridgeのイベントパターン構文を使用して、特定の条件に一致するイベントのみを次のフェーズに渡すことができます。 -
エンリッチメント
イベントデータに追加情報を付与したり、変換処理を行うオプションフェーズです。
Lambda関数、Step Functions、API Gateway、EventBridge API Destinationsを呼び出して、外部データの取得やフォーマット変換を実行できます。 -
ターゲット
最終的にイベントを配信する宛先を指定するフェーズです。
EventBridge Event Bus、Lambda、Step Functions、Kinesis Data Streams、SNS/SQSなどのAWSサービスに直接配信可能です。
https://docs.aws.amazon.com/ja_jp/eventbridge/latest/userguide/eb-pipes.html
やってみる
DynamoDBテーブル作成
-
DynamoDBテーブルを作成します。パーティションキーを"product_id"(文字列)で作成しました。
-
EventBridge PipesへのトリガーとしてDynamoDBストリームを有効にします。作成したテーブルの詳細画面「エクスポートおよびストリーム」から有効化を実施します。表示タイプは「新旧イメージ」を選択します。
SQSキュー作成
- SQSキューを標準キューで作成します。アクセスポリシーはSQSへの権限とEventVridge Pipesの権限を含むよう、以下の通り設定しました。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowAccountOwner",
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::アカウントID:root"
},
"Action": "SQS:*",
"Resource": "arn:aws:sqs:ap-northeast-1:アカウントID:SQS名"
},
{
"Sid": "AllowEventBridgePipes",
"Effect": "Allow",
"Principal": {
"Service": "pipes.amazonaws.com"
},
"Action": "SQS:SendMessage",
"Resource": "arn:aws:sqs:ap-northeast-1:アカウントID:SQS名"
]
}
SNSトピック作成
-
スタンダードタイプでSNSトピックを作成します。
-
Eメールでサブスクリプションを作成後、記入したメールアドレスに届いたメールから承認します。
IAMロール作成
Lambda実行ロールとして以下ポリシーを持つ、IAMロールを作成します。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "CloudWatchLogs",
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "arn:aws:logs:ap-northeast-1:アカウントID:log-group:/aws/lambda/Lambda関数名:*"
},
{
"Sid": "SQSConsumer",
"Effect": "Allow",
"Action": [
"sqs:ReceiveMessage",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes",
"sqs:ChangeMessageVisibility"
],
"Resource": "arn:aws:sqs:ap-northeast-1:アカウントID:SQSキュー名"
},
{
"Sid": "SNSPublish",
"Effect": "Allow",
"Action": [
"sns:Publish"
],
"Resource": "arn:aws:sns:ap-northeast-1:アカウントID:SNSトピック名"
}
]
}
Lambda作成
-
ランタイム Python3.11でLambda関数を作成します。前提条件で作成済みのLambda実行ロールを選択します。
-
SQSキューから受信したDynamoDBテーブルの項目に含まれる在庫数(ALERT_THRESHOL)が閾値以下の場合、アラートメールを送信するコードをデプロイします。以下コードを入力後、「Deploy」を実行します。
コード例
import json
import boto3
import os
from datetime import datetime
sns = boto3.client('sns')
# 環境変数からSNSトピックARNを取得
SNS_TOPIC_ARN = os.environ['SNS_TOPIC_ARN']
ALERT_THRESHOLD = int(os.environ.get('ALERT_THRESHOLD', '10'))
def lambda_handler(event, context):
"""
SQSからのメッセージを処理し、在庫アラートをメール送信
"""
processed_count = 0
error_count = 0
skipped_count = 0
for record in event['Records']:
try:
# SQSメッセージの本文を解析
message_body = json.loads(record['body'])
# DynamoDBストリームのレコードを解析
if 'dynamodb' in message_body:
result = process_dynamodb_record(message_body)
if result == 'processed':
processed_count += 1
elif result == 'skipped':
skipped_count += 1
except Exception as e:
print(f"Error processing record: {str(e)}")
print(f"Record: {json.dumps(record)}")
error_count += 1
# エラーの場合、メッセージはSQSに戻される
raise e
return {
'statusCode': 200,
'body': json.dumps({
'processed': processed_count,
'skipped': skipped_count,
'errors': error_count
})
}
def process_dynamodb_record(record):
"""
DynamoDBレコードを処理してアラートを送信
"""
event_name = record.get('eventName', '')
# 新しいイメージと古いイメージを取得
new_image = record.get('dynamodb', {}).get('NewImage', {})
old_image = record.get('dynamodb', {}).get('OldImage', {})
# 商品情報を抽出
product_id = new_image.get('product_id', {}).get('S', 'Unknown')
product_name = new_image.get('product_name', {}).get('S', 'Unknown Product')
new_stock = int(new_image.get('stock_quantity', {}).get('N', '0'))
# 在庫数が閾値より多い場合はアラートを送信しない
if new_stock > ALERT_THRESHOLD:
print(f"Stock level ({new_stock}) is above threshold ({ALERT_THRESHOLD}). Skipping alert for product {product_id}")
return 'skipped'
# 古い在庫数を取得(UPDATE時のみ)
old_stock = None
if old_image:
old_stock = int(old_image.get('stock_quantity', {}).get('N', '0'))
# アラートメッセージの作成
alert_message = create_alert_message(
event_name,
product_id,
product_name,
new_stock,
old_stock
)
# SNSでメール送信
send_alert(alert_message, product_id, new_stock)
return 'processed'
def create_alert_message(event_name, product_id, product_name, new_stock, old_stock):
"""
アラートメッセージを作成
"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
if event_name == 'INSERT':
message = f"""
【在庫アラート - 新規商品登録】
日時: {timestamp}
商品ID: {product_id}
商品名: {product_name}
現在の在庫数: {new_stock}個
⚠️ 在庫数が閾値({ALERT_THRESHOLD}個)以下です。
早急な在庫補充を検討してください。
"""
elif event_name == 'MODIFY':
stock_change = new_stock - old_stock if old_stock else 0
change_text = f"({stock_change:+d}個)" if old_stock else ""
message = f"""
【在庫アラート - 在庫数更新】
日時: {timestamp}
商品ID: {product_id}
商品名: {product_name}
以前の在庫数: {old_stock if old_stock else 'N/A'}個
現在の在庫数: {new_stock}個 {change_text}
⚠️ 在庫数が閾値({ALERT_THRESHOLD}個)以下です。
早急な在庫補充を検討してください。
"""
else:
message = f"""
【在庫アラート】
日時: {timestamp}
イベント: {event_name}
商品ID: {product_id}
商品名: {product_name}
現在の在庫数: {new_stock}個
⚠️ 在庫数が閾値({ALERT_THRESHOLD}個)以下です。
"""
return message
def send_alert(message, product_id, stock_quantity):
"""
SNS経由でアラートメールを送信
"""
subject = f"【重要】在庫アラート - {product_id} (残り{stock_quantity}個)"
try:
response = sns.publish(
TopicArn=SNS_TOPIC_ARN,
Subject=subject,
Message=message,
MessageAttributes={
'product_id': {
'DataType': 'String',
'StringValue': product_id
},
'stock_quantity': {
'DataType': 'Number',
'StringValue': str(stock_quantity)
},
'alert_type': {
'DataType': 'String',
'StringValue': 'LOW_STOCK'
}
}
)
print(f"Alert sent successfully. MessageId: {response['MessageId']}")
return response
except Exception as e:
print(f"Failed to send alert: {str(e)}")
raise e
- コード内で利用する環境変数を以下のように設定します。
- ALERT_THRESHOLD:10 (商品在庫の閾値)
- SNS_TOPIC_ARN:作成したSNSトピックのURL
- 作成したSQSキューをLambdaのトリガーとして設定します。「設定」→「トリガー」→「トリガーを追加」からSQSキューを選択し、追加します。
EventBridge Pipes作成
Eventbridge Pipesの各フェーズを設定していきます。
-
ソースを設定します。作成したDynamoDBテーブルをソースに設定します。
-
フィルタリングを設定します。DynamoDBストリームによって受信したイベントの内、product_idフィールドが存在し、かつDELETE以外のイベントを通すフィルタリングを設定しました。
{
"dynamodb": {
"NewImage": {
"product_id": {
"S": [{
"exists": true
}]
}
}
},
"eventName": [{
"anything-but": "DELETE"
}]
}
:::
- ターゲットを設定します。作成したSQSキューをターゲットに設定します。
動作確認
DynamoDBに在庫数(stock_quantityフィールド)が10以下のアイテムを追加して、通知が送信されるか確認してみます。
- 以下CLIを実行します。
aws dynamodb put-item \
--table-name YOUR_TABLE_NAME \
--item '{
"product_id": {"S": "PROD-001"},
"stock_quantity": {"N": "5"},
"product_name": {"S": "サンプル商品A"}
}'
- サブスクリプション登録してメールアドレスにLambda関数によってカスタマイズされたメールが来ることを確認できました。
おわりに
ご覧いただきありがとうございました。
詳しく知らなかったEventBridge Pipesがどのような方法で利用されるかについて少し学ぶことができました。エンリッチメント機能については今回触れていないので、今後も学習を続けていきたいと思います。
アノテーション株式会社について
アノテーション株式会社はクラスメソッドグループのオペレーション専門特化企業です。サポート・運用・開発保守・情シス・バックオフィスの専門チームが、最新 IT テクノロジー、高い技術力、蓄積されたノウハウをフル活用し、お客様の課題解決を行っています。当社は様々な職種でメンバーを募集しています。「オペレーション・エクセレンス」と「らしく働く、らしく生きる」を共に実現するカルチャー・しくみ・働き方にご興味がある方は、アノテーション株式会社 採用サイトをぜひご覧ください。