re:Invent 2022で発表されたAmazon EventBridge Pipesを使うと、Producer/Consumer型のメッセージング処理をメッセージブローカーやワーカーに関係なく一貫した手法で実装できます。
メッセージブローカーにAmazon SQS、ConsumerのワーカーにLambdaを利用する構成で、従来のLambdaイベントソース型とEventBridge Pipes型を比較します。
Producer/Consumer型メッセージ処理
Producer/Consumer型メッセージは、メッセージキューなどで利用される、非同期のメッセージングパターンです。
Producerはブローカーにメッセージを送信し、Consumerはブローカーをポーリングしてメッセージを受信します。
SQS-Lambda構成の場合、SQSがメッセージブローカー、LambdaがConsumerに該当します。
以下では Producerのことは一旦忘れ、ブローカーとConsumer間の処理にフォーカスします。
ナイーブ実装
SQSキューのメッセージをSQSのAPIを使って処理すると、以下の流れで行います。
- SQSキュー内をポーリング
- メッセージを受信
- メッセージを処理
- メッセージをキューから削除
import boto3
sqs = boto3.client('sqs')
SQS_QUEUE_URL = 'https://sqs.eu-central-1.amazonaws.com/123456789012/bar'
response = sqs.receive_message(
QueueUrl = SQS_QUEUE_URL,
)
for message in response.get("Messages", []): # メッセージの受信
print(message["Body"]) # 処理
sqs.delete_message(
QueueUrl = SQS_QUEUE_URL,
ReceiptHandle = message['ReceiptHandle']
) # メッセージの削除
コンシューマーはメッセージ処理(ビジネスロジック)に注力したいのに
- ポーリング
- メッセージの受信
- メッセージの削除
なども必要です。
これらの手間を省くのが、次のLambdaイベントソース型です。
Lambdaイベントソース型
Lambdaのイベントソース(トリガー)にSQSを指定します。
イベントソース・コンポーネントが
- ポーリング
- メッセージの受信
- メッセージの削除
を行うため、Lambda関数は、メッセージを処理するだけですみます。
def lambda_handler(event, context):
for message in event['Records']:
print(message['body'])
Lambdaの実行ロールには、SQS操作用の
- sqs:ReceiveMessage
- sqs:DeleteMessage
- sqs:GetQueueAttributes
といったアクションも許可します。
ブローカーとコンシューマーは明確に分離されていますが、コンシューマーのすべての処理はLambdaに寄せられています。
Lambdaからメッセージ処理以外を引き剥がしたのが、次のEventBridge Pipes型です。
EventBridge Pipes型
EventBridge Pipesがブローカーとターゲットを仲介し、コンシューマーの処理に必要な
- Source(ブローカーとのポーリング)
- Filter(フィルタリング)
- Enrichment(プリプロセス)
- Target(ターゲットをメッセージとともに呼び出す)
のパイプライン処理を行います。
SourceにはSQS、TargetにはLambdaを指定します。
def lambda_handler(event, context):
for message in event:
print(message['body'])
ブローカーとポーリングし、取得したメッセージでLambdaを呼び出すのはPipesのため、Pipesの実行ロールには、
- sqs:ReceiveMessage
- sqs:DeleteMessage
- sqs:GetQueueAttributes
- lambda:InvokeFunction
といったアクションを許可します。
Lambda関数の責務はメッセージを処理するだけあり、メッセージブローカーを操作するポリシーは不要です。
SQSとPipesとLambdaが疎結合になりました。
最後に
SQS-Lambda構成に限定すると、LambdaのイベントソースマッピングがEventBridge Pipesに置き換わっただけです。
もう一段上のProducer/Consumer型メッセージングサービスを利用している観点から考えると、ブローカー(SQSなど)やワーカー(Lambdaなど)に関係なく
- Source(ブローカーとポーリング)
- Filter(フィルタリング)
- Enrichment(プリプロセス)
- Target(メッセージとともにTargetを呼び出す)
のパイプラインをEventBridge Pipesで抽象化できます。
※ 図は公式ドキュメントから引用
さらに、ブローカーを操作するポリシーはTargetではなくPipesに付与するなど、各コンポーネントの責任範囲が明確になりました。
これがEventBridge Pipesの狙いです。
運用しているブローカー・コンシューマーが複雑になるにつれ、EventBridge Pipesを利用し、責任範囲を明確にして、一貫したパイプラインでメッセージを処理するメリットが増えると思います。
それでは。