複数のDynamoDB Streamsをトリガーにして、ひとつのLambdaを起動する

できました。
2022.05.19

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

AWS LambdaとDynamoDB Streamsを組み合わせて使おうとしたとき、ふと疑問に思いました。 「複数のDynamoDB Streamsをトリガーにして、ひとつのLambdaを使えるのだろうか?」と。

概要図

というわけで、試してみました。

おすすめの方

  • AWS SAMでDynamoDB Streamsで起動するLambdaを作りたい方

DynamoDBとLambdaをデプロイする

sam init

sam init \
    --runtime python3.9 \
    --name DynamoDB-Streams-Multi-Subscribe-Sample \
    --app-template hello-world \
    --package-type Zip

AWS SAMテンプレート

2つのDynamoDBテーブルと1つのLambdaを作成しています。 Lambdaの起動トリガーには、DynamoDB Streamsを2つ設定しています。

template.yaml

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: DynamoDB-Streams-Multi-Subscribe-Sample

Resources:
  TestUserTable:
    Type: AWS::DynamoDB::Table
    Properties:
      TableName: test-user-table
      BillingMode: PAY_PER_REQUEST
      AttributeDefinitions:
        - AttributeName: userId
          AttributeType: S
      KeySchema:
        - AttributeName: userId
          KeyType: HASH
      StreamSpecification:
        StreamViewType: NEW_AND_OLD_IMAGES

  TestDeviceTable:
    Type: AWS::DynamoDB::Table
    Properties:
      TableName: test-device-table
      BillingMode: PAY_PER_REQUEST
      AttributeDefinitions:
        - AttributeName: deviceId
          AttributeType: S
      KeySchema:
        - AttributeName: deviceId
          KeyType: HASH
      StreamSpecification:
        StreamViewType: NEW_AND_OLD_IMAGES

  HelloWorldFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: hello_world/
      Handler: app.lambda_handler
      Runtime: python3.9
      Timeout: 5
      Architectures:
        - x86_64
      Events:
        TestUserTable:
          Type: DynamoDB
          Properties:
            BatchSize: 100
            BisectBatchOnFunctionError: true
            MaximumRetryAttempts: 3
            StartingPosition: TRIM_HORIZON
            Stream: !GetAtt TestUserTable.StreamArn
        TestDeviceTable:
          Type: DynamoDB
          Properties:
            BatchSize: 100
            BisectBatchOnFunctionError: true
            MaximumRetryAttempts: 3
            StartingPosition: TRIM_HORIZON
            Stream: !GetAtt TestDeviceTable.StreamArn

  HelloWorldFunctionLogGroup:
      Type: AWS::Logs::LogGroup
      Properties:
        LogGroupName: !Sub /aws/lambda/${HelloWorldFunction}

Lambdaコード

ログを表示するだけです。

app.py

def lambda_handler(event, context):
    for record in event['Records']:

        if 'test-user-table' in record['eventSourceARN']:
            print('this is test-user-table')
        elif 'test-device-table' in record['eventSourceARN']:
            print('this is test-device-table')
        else:
            print('???')

デプロイ

sam build

sam deploy \
    --stack-name DynamoDB-Streams-Multi-Subscribe-Sample-Stack \
    --s3-bucket cm-fujii.genki-deploy \
    --capabilities CAPABILITY_NAMED_IAM \
    --no-fail-on-empty-changeset

動作を確認する

DynamoDBテーブルのデータを更新する

DynamoDBテーブルにデータを追加する

DynamoDBテーブルにデータを追加する

CloudWatch Logsを確認する

バッチリでした。

CloudWatch Logsにそれぞれのテーブル名が表示されている

さいごに

複数のDynamoDB Streamsをトリガーにして、ひとつのLambdaを使ってみました。 更新履歴を別途残したい場合や読み込み専用のDynamoDBテーブルを作りたい場合などで活用できそうです。