DynamoDB Streamsで起動するLambdaについて、「特定のAttributeが追加、または、削除されたとき」だけ起動させてみる

AWS SAMで、Lambdaのイベントソースマッピングを設定してみました。
2022.09.09

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

Lambdaのイベントソースマッピングでフィルタ設定ができるようになっています。

今回は、特定のAttributeが追加、または、削除されたときだけ起動するように、イベントソースマッピングのフィルタ設定をしてみました。

おすすめの方

  • AWS SAMでLambdaのイベントソースマッピングのフィルタ設定をしたい方
  • DynamoDBで特定のAttributeが追加、または、削除されたときだけLambdaを起動するフィルタ設定をしたい方
  • AWS CLIでDynamoDBテーブルにデータを追加・更新したい方

DynamoDB StreamsとLambdaをデプロイする

sam init

sam init \
    --runtime python3.9 \
    --name DynamoDB-Streams-Lambda-Filter-Sample \
    --app-template hello-world \
    --no-tracing \
    --package-type Zip

AWS SAMテンプレート

今回は、特定のAttribute(memo)が追加、または、削除されたときにLambdaを起動するフィルターを設定してみます。 ただし、INSERTおよびREMOVE時は、Lambdaを起動させないようにします。 簡単にまとめると下記です。

  • INSERT
    • Lambdaを起動しない
  • MODIFY
    • memoが追加されたら、Lambdaを起動する
    • memoが削除されたら、Lambdaを起動する
  • REMOVE
    • Lambdaを起動しない

template.yaml

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: DynamoDB-Streams-Lambda-Filter-Sample

Resources:
  DeviceSampleTable:
    DeletionPolicy: Retain
    UpdateReplacePolicy: Retain
    Type: AWS::DynamoDB::Table
    Properties:
      TableName: device-sample-table
      BillingMode: PAY_PER_REQUEST
      AttributeDefinitions:
        - AttributeName: deviceId
          AttributeType: S
      KeySchema:
        - AttributeName: deviceId
          KeyType: HASH
      StreamSpecification:
        StreamViewType: NEW_AND_OLD_IMAGES

  HelloWorldFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: hello_world/
      Handler: app.lambda_handler
      Runtime: python3.9
      Timeout: 3
      Architectures:
        - x86_64
      Events:
        DeviceSampleTableStream:
          Type: DynamoDB
          Properties:
            BatchSize: 100
            BisectBatchOnFunctionError: true
            FilterCriteria:
              Filters:
                - Pattern: '{"eventName":["MODIFY"],"dynamodb":{"NewImage":{"memo":{"S":[{"exists":true}]}},"OldImage":{"memo":{"S":[{"exists":false}]}}}}'
                - Pattern: '{"eventName":["MODIFY"],"dynamodb":{"NewImage":{"memo":{"S":[{"exists":false}]}},"OldImage":{"memo":{"S":[{"exists":true}]}}}}'
            MaximumRetryAttempts: 3
            StartingPosition: TRIM_HORIZON
            Stream: !GetAtt DeviceSampleTable.StreamArn

  HelloWorldFunctionLogGroup:
    Type: AWS::Logs::LogGroup
    Properties:
        LogGroupName: !Sub /aws/lambda/${HelloWorldFunction}

フィルタ設定を見やすくすると下記です。

{
    "eventName": [
        "MODIFY"
    ],
    "dynamodb": {
        "NewImage": {
            "memo": {
                "S": [{"exists": true}]
            }
        },
        "OldImage": {
            "memo": {
                "S": [{"exists": false}]
            }
        }
    }
}
{
    "eventName": [
        "MODIFY"
    ],
    "dynamodb": {
        "NewImage": {
            "memo": {
                "S": [{"exists": false}]
            }
        },
        "OldImage": {
            "memo": {
                "S": [{"exists": true}]
            }
        }
    }
}

なお、existsを次のように設定すると駄目でした。DynamoDBの型情報(Sなどの部分)が必要です。ご注意ください。

{
    "eventName": [
        "MODIFY"
    ],
    "dynamodb": {
        "NewImage": {
            "memo": [{"exists": false}]
        },
        "OldImage": {
            "memo": [{"exists": true}]
        }
    }
}

Lambdaコード

ログ出力するだけです。

app.py

import json

def lambda_handler(event, context):
    for record in event['Records']:
        print(json.dumps(record))

デプロイ

sam deploy \
    --stack-name DynamoDB-Streams-Lambda-Filter-Sample-Stack \
    --s3-bucket cm-fujii.genki-deploy \
    --capabilities CAPABILITY_NAMED_IAM \
    --no-fail-on-empty-changeset

動作確認をする

INSERT

memoがあるデータを追加します。フィルタ条件を満たしていないので、Lambdaは起動しないはずです。

aws dynamodb put-item \
    --table-name device-sample-table \
    --item '{"deviceId":{"S":"device1"}, "name": {"S":"class"}, "memo": {"S": "public"}}'

REMOVE

さきほど追加したmemoがあるデータを削除します。フィルタ条件を満たしていないので、Lambdaは起動しないはずです。

aws dynamodb delete-item \
    --table-name device-sample-table \
    --key '{"deviceId":{"S":"device1"}}'

MODIFY

memoが無いデータを追加します。フィルタ条件を満たしていないので、Lambdaは起動しないはずです。

aws dynamodb put-item \
    --table-name device-sample-table \
    --item '{"deviceId":{"S":"device1"}, "name": {"S":"class"}}'

新しいAttribute(memo)を追加します。フィルタ条件を満たすので、Lambdaは起動するはずです。

aws dynamodb update-item \
    --table-name device-sample-table \
    --key '{"deviceId":{"S":"device1"}}' \
    --update-expression 'SET #memo=:memo' \
    --expression-attribute-names '{"#memo":"memo"}' \
    --expression-attribute-values '{":memo":{"S":"This is memo!"}}'

次に、memoを変更します。フィルタ条件を満たしていないので、Lambdaは起動しないはずです。

aws dynamodb update-item \
    --table-name device-sample-table \
    --key '{"deviceId":{"S":"device1"}}' \
    --update-expression 'SET #memo=:memo' \
    --expression-attribute-names '{"#memo":"memo"}' \
    --expression-attribute-values '{":memo":{"S":"This is memo??????"}}'

memoを削除します。フィルタ条件を満たすので、Lambdaは起動するはずです。

aws dynamodb update-item \
    --table-name device-sample-table \
    --key '{"deviceId":{"S":"device1"}}' \
    --update-expression 'REMOVE #memo' \
    --expression-attribute-names '{"#memo":"memo"}'

CloudWatch Logsを見る

Lambdaの起動は、2回です。

Lambdaが2回起動している

それぞれのログを見ると、memoが新しく追加されたデータ、および、memoが無くなったデータでした。期待通りですね。

memoが追加された

memoが削除された

さいごに

フィルタ設定をするとき、Sなどの型情報も必要です。お忘れなく……。

参考