Amazon EventBridge Pipesを使ってプロデューサー/コンシューマー型メッセージ処理のパイプラインを簡略化しよう #reinvent

2022.12.03

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

re:Invent 2022で発表されたAmazon EventBridge PipesはProducer/Consumer型のメッセージング処理をメッセージブローカーやワーカーに関係なく一貫した手法で実装するサービスです。

メッセージブローカーにAmazon SQS、ConsumerのワーカーにLambdaを利用する構成で、従来のLambdaイベントソース型EventBridge Pipes型を比較しましょう。

Producer/Consumer型メッセージ処理

Producer/Consumer型メッセージは、メッセージキューなどで利用される、非同期のメッセージングパターンです。Producerはブローカーにメッセージを送信し、Consumerはブローカーをポーリングしてメッセージを受信します。

SQS-Lambda構成の場合、SQSがメッセージブローカー、LambdaがConsumerに相当します。

以下では Producerのことは一旦忘れてブローカーとConsumer間の処理にフォーカスし、2006年から本稼働しているSQSのConsumer実装が、AWSの進化とともに疎結合になる姿を確認します。

1. ナイーブ実装

SQSキューのメッセージをSQSのAPIを使って処理すると、以下の流れで行います。

  1. SQSキュー内をポーリング
  2. メッセージを受信
  3. メッセージを処理
  4. メッセージをキューから削除

import boto3
sqs = boto3.client('sqs')
SQS_QUEUE_URL = 'https://sqs.eu-central-1.amazonaws.com/123456789012/bar'

response = sqs.receive_message(
  QueueUrl = SQS_QUEUE_URL,
)

for message in response.get("Messages", []): # メッセージの受信
    print(message["Body"]) # 処理

    sqs.delete_message(
      QueueUrl = SQS_QUEUE_URL,
      ReceiptHandle = message['ReceiptHandle']
    ) # メッセージの削除

コンシューマーはメッセージ処理(ビジネスロジック)に注力したいのに

  • ポーリング
  • メッセージの受信
  • メッセージの削除

なども必要です。

これらの手間を省くのが、次のLambdaイベントソース型です。

2. Lambdaイベントソース型

Lambdaのイベントソース(トリガー)にSQSを指定します。

イベントソース・コンポーネントが

  • ポーリング
  • メッセージの受信
  • メッセージの削除

を行うため、Lambda関数は、メッセージを処理するだけですみます。

def lambda_handler(event, context):
    for message in event['Records']:
        print(message['body'])
CloudFormationテンプレート (ここをクリックしてください)
Resources:
  QueueForLambda:
    Type: 'AWS::SQS::Queue'
  LambdaServiceRole:
    Type: 'AWS::IAM::Role'
    Properties:
      AssumeRolePolicyDocument:
        Statement:
          - Action: 'sts:AssumeRole'
            Effect: Allow
            Principal:
              Service: lambda.amazonaws.com
        Version: '2012-10-17'
      ManagedPolicyArns:
        - 'Fn::Join':
            - ''
            - - 'arn:'
              - Ref: 'AWS::Partition'
              - ':iam::aws:policy/service-role/AWSLambdaBasicExecutionRole'
  LambdaServiceRoleDefaultPolicy:
    Type: 'AWS::IAM::Policy'
    Properties:
      PolicyDocument:
        Statement:
          - Action:
              - 'sqs:ChangeMessageVisibility'
              - 'sqs:DeleteMessage'
              - 'sqs:GetQueueAttributes'
              - 'sqs:GetQueueUrl'
              - 'sqs:ReceiveMessage'
            Effect: Allow
            Resource:
              'Fn::GetAtt':
                - QueueForLambda
                - Arn
        Version: '2012-10-17'
      PolicyName: LambdaServiceRoleDefaultPolicy
      Roles:
        - Ref: LambdaServiceRole
  LambdaForSQS:
    Type: 'AWS::Lambda::Function'
    Properties:
      Code:
        ZipFile: |-
          def handler(event, context):
              for record in event['Records']:
                  print(record['body'])
      Handler: index.handler
      Role:
        'Fn::GetAtt':
          - LambdaServiceRole
          - Arn
      Runtime: python3.11
    DependsOn:
      - LambdaServiceRoleDefaultPolicy
      - LambdaServiceRole
  LambdaSqsEventSourceSqs:
    Type: 'AWS::Lambda::EventSourceMapping'
    Properties:
      EventSourceArn:
        'Fn::GetAtt':
          - QueueForLambda
          - Arn
      FunctionName:
        Ref: LambdaForSQS

Lambdaの実行ロールには、SQS操作用の

  • sqs:ReceiveMessage
  • sqs:DeleteMessage
  • sqs:GetQueueAttributes

といったアクションも許可します。

ブローカーとコンシューマーは明確に分離されていますが、コンシューマーのすべての処理はLambdaに寄せられています。

コンシューマーからメッセージ処理以外を引き剥がしたのが、次のEventBridge Pipes型です。

3. EventBridge Pipes型

EventBridge Pipesがブローカーとターゲットを仲介し、コンシューマーの処理に必要な

  • Source(ブローカーとのポーリング)
  • Filter(フィルタリング)
  • Enrichment(プリプロセス)
  • Target(ターゲットをメッセージとともに呼び出す)

のパイプライン処理を行います。

SourceにはSQS、TargetにはLambdaを指定します。

def lambda_handler(event, context):
    for message in event:
        print(message['body'])
CloudFormationテンプレート (ここをクリックしてください)
---
Resources:
  QueueForPipe:
    Type: AWS::SQS::Queue
  LambdaServiceRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Statement:
        - Action: sts:AssumeRole
          Effect: Allow
          Principal:
            Service: lambda.amazonaws.com
        Version: '2012-10-17'
      ManagedPolicyArns:
      - Fn::Join:
        - ''
        - - 'arn:'
          - Ref: AWS::Partition
          - ":iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
  LambdaForPipe:
    Type: AWS::Lambda::Function
    Properties:
      Code:
        ZipFile: |-
          def handler(event, context):
              for record in event:
                  print(record['body'])
      Handler: index.handler
      Role:
        Fn::GetAtt:
        - LambdaServiceRole
        - Arn
      Runtime: python3.11
    DependsOn:
    - LambdaServiceRole
  PipeRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Statement:
        - Action: sts:AssumeRole
          Effect: Allow
          Principal:
            Service: pipes.amazonaws.com
        Version: '2012-10-17'
  PipeRoleDefaultPolicy:
    Type: AWS::IAM::Policy
    Properties:
      PolicyDocument:
        Statement:
        - Action:
          - sqs:ChangeMessageVisibility
          - sqs:DeleteMessage
          - sqs:GetQueueAttributes
          - sqs:GetQueueUrl
          - sqs:ReceiveMessage
          Effect: Allow
          Resource:
            Fn::GetAtt:
            - QueueForPipe
            - Arn
        - Action: lambda:InvokeFunction
          Effect: Allow
          Resource:
          - Fn::GetAtt:
            - LambdaForPipe
            - Arn
          - Fn::Join:
            - ''
            - - Fn::GetAtt:
                - LambdaForPipe
                - Arn
              - ":*"
        Version: '2012-10-17'
      PolicyName: PipeRoleDefaultPolicy
      Roles:
      - Ref: PipeRole
  Pipe:
    Type: AWS::Pipes::Pipe
    Properties:
      RoleArn:
        Fn::GetAtt:
        - PipeRole
        - Arn
      Source:
        Fn::GetAtt:
        - QueueForPipe
        - Arn
      Target:
        Fn::GetAtt:
        - LambdaForPipe
        - Arn
      TargetParameters:
        LambdaFunctionParameters:
          InvocationType: REQUEST_RESPONSE

ブローカーとポーリングし、取得したメッセージでLambdaを呼び出すのはPipesのため、Pipesの実行ロールには、

  • ソース操作用
    • sqs:ReceiveMessage
    • sqs:DeleteMessage
    • sqs:GetQueueAttributes
  • ターゲット操作用
    • lambda:InvokeFunction

といったアクションを許可します。

Lambda関数の責務は受け取ったメッセージを処理するだけのため、ブローカーを意識する必要はなく、ブローカーを操作するポリシーも不要です。

SQSとPipesとLambdaが疎結合になりました。

最後に

SQS-Lambda構成に限定すると、LambdaのイベントソースマッピングがEventBridge Pipesに置き換わっただけです。

もう一段上のProducer/Consumer型メッセージングサービスを利用している観点から考えると、ブローカー(SQSなど)やワーカー(Lambdaなど)に関係なく

  • Source(ブローカーとポーリング)
  • Filter(フィルタリング)
  • Enrichment(プリプロセス)
  • Target(メッセージとともにTargetを呼び出す)

というパイプラインをEventBridge Pipesで抽象化できます。

※ 図は公式ドキュメントから引用

さらに、ブローカーを操作するポリシーはTargetではなくEventBridge Pipesに付与することで、各コンポーネントの責任範囲が明確になりました。

これがEventBridge Pipesの狙いです。

運用しているブローカー・コンシューマーが複雑になるにつれ、EventBridge Pipesを利用し、責任範囲を明確にして、一貫したパイプラインでメッセージを処理するメリットが増えると思います。

AWS Japan SA によるServerlessDays Tokyo 2022 での発表資料も合わせて参照ください。

それでは。