DynamoDBからStream経由でのLambda関数起動を作成から検証を通してAWSCLIのみで行ってみた

DynamoDBのStreamを有効にして、作成したLambda関数のイベントリソースとして設定し、DynamoDBのレコード追加を行って連動を確認できるまでの操作をAWSCLIのみで行ってみました。
2019.06.07

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

Lambda関数の呼び出し方法については多種多様に存在します。

これまで自身が知っていて且つ行ったことのある方法はAPI Gatewayからの実行のみでしたが、業務にて

  • 定期的に実行されるバッチがある
  • バッチの実行完了を元にLambdaを呼び出す

という要件を低コストで成立可能かどうか調査する機会があり、結果、やり方の一つとしてDynamoDBからStream経由でTriggerによりLambdaが実行可能であることを知りました。

設定については多分そんなに苦労はしないだろうと思っていたものの、実際に動作検証してみたところ意外とハマりがあったため、備忘録兼ねて記録することにしました。

用いるサービスについて

以下の通りです。

  • CloudWatch
  • IAM
  • DynamoDB
  • AWS Lambda

設定手順

管理コンソールは更新によるレイアウト変更が発生するため、安定していることが想定できるaws-cliにて実施します。

  • DynamoDBにテーブルを作成する
  • IAMでRoleを作成する
  • 作成したRoleを使ってLambdaを作成する
  • LambdaのイベントリソースにDynamoDBを適用する

DynamoDBにテーブルを作成する

必要なパラメータについては以下の通り。

項目 詳細
atrribute-definitions 要素定義
table-name 作成するテーブル名
key-schema キーとなるスキーマ設計
stream-specification Stream指定
provisioned-throughout スループット

Stream指定はオプションですが、Triggerの前提となるので有効にしています。

% aws dynamodb create-table \
    --attribute-definitions AttributeName=name,AttributeType=S \
    --table-name test \
    --key-schema AttributeName=name,KeyType=HASH \
    --stream-specification StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES \
    --provisioned-throughput ReadCapacityUnits=1,WriteCapacityUnits=1
{
    "TableDescription": {
    "TableArn": "arn:aws:dynamodb:ap-northeast-1:000000000000:table/test",
    "AttributeDefinitions": [
        {
            "AttributeName": "name",
            "AttributeType": "S"
        }
    ],
    "ProvisionedThroughput": {
        "NumberOfDecreasesToday": 0,
        "WriteCapacityUnits": 1,
        "ReadCapacityUnits": 1
    },
    "TableSizeBytes": 0,
    "TableName": "test",
    "TableStatus": "CREATING",
    "StreamSpecification": {
        "StreamViewType": "NEW_AND_OLD_IMAGES",
        "StreamEnabled": true
    },
    "TableId": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
    "LatestStreamLabel": "2019-06-07T01:45:34.828",
    "KeySchema": [
        {
            "KeyType": "HASH",
            "AttributeName": "name"
        }
    ],
    "ItemCount": 0,
    "CreationDateTime": 1559871934.828,
    "LatestStreamArn": "arn:aws:dynamodb:ap-northeast-1:000000000000:table/test/stream/2019-06-07T01:45:34.828"
  }
}

IAMのRole作成とポリシーの付与

Lambda実行用Roleを作成して、DynamoDB操作用のポリシーを適用します。

Role作成

% aws iam create-role \
    --role-name lambda_invoke_by_dynamodb \
    --assume-role-policy-document '{"Version": "2012-10-17", "Statement":[{"Action": "sts:AssumeRole","Effect": "Allow","Principal": {"Service": "lambda.amazonaws.com"}}]}'
{
    "Role": {
        "AssumeRolePolicyDocument": {
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Action": "sts:AssumeRole",
                    "Effect": "Allow",
                    "Principal": {
                        "Service": "lambda.amazonaws.com"
                    }
                 }
            ]
        },
        "RoleId": "XXXXXXXXXXXXXXXXXXXX",
        "CreateDate": "2019-06-07T08:32:46Z",
        "RoleName": "lambda_invoke_by_dynamodb",
        "Path": "/",
        "Arn": "arn:aws:iam::000000000000:role/lambda_invoke_by_dynamodb"
  }
}

RoleへのPolicy適用

CLIからの確認しやすさを優先して、インラインポリシーとして適用します。

% aws iam put-role-policy \
    --role-name lambda_invoke_by_dynamodb \
    --policy-name dynamodb-access-by-lambda \
    --policy-document '{"Version": "2012-10-17","Statement": [{"Effect": "Allow","Action": ["dynamodb:GetRecords","dynamodb:GetShardIterator","dynamodb:DescribeStream","dynamodb:ListStreams","logs:CreateLogGroup","logs:CreateLogStream","logs:PutLogEvents"],"Resource": "*"}]}'
# 確認
% aws iam get-role-policy --role-name lambda_invoke_by_dynamodb --policy-name dynamodb-access-by-lambda
{
    "RoleName": "lambda_invoke_by_dynamodb",
    "PolicyDocument": {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Action": [
                    "dynamodb:GetRecords",
                    "dynamodb:GetShardIterator",
                    "dynamodb:DescribeStream",
                    "dynamodb:ListStreams",
                    "logs:CreateLogGroup",
                    "logs:CreateLogStream",
                    "logs:PutLogEvents"
                ],
                "Resource": "*",
                "Effect": "Allow"
            }
        ]
    },
    "PolicyName": "dynamodb-access-by-lambda"
}

Lambda関数の作成

関数コードを作成・圧縮し、create-functionに渡します。

関数用コードの作成

DynamoDBから渡ってくるレコードを確認できるようにします。

% echo "import json
print('Loading function')
def lambda_handler(event, context):
    for key in event.keys():
        print(event[key])
    return 'Successfully processed.'
" > example.py
% zip -r HelloWorld.zip example.py

関数の作成

--zip-fileでのスキーマがfilebとなっていますが、バイナリ転送のためです。

% aws lambda create-function \
    --function-name test \
    --runtime python3.7 \
    --role $(aws iam get-role --role-name lambda_invoke_by_dynamodb | jq -r '.Role.Arn') \
    --handler example.lambda_handler \
    --zip-file fileb://HelloWorld.zip \
    --region ap-northeast-1

DynamoDBのStream参照

イベントリソースとしてDynamoDBのStreamを指定します。

% aws lambda create-event-source-mapping \
    --event-source-arn $(aws dynamodb describe-table --table-name test | jq -r '.Table.LatestStreamArn') \
    --function-name test \
    --enabled \
    --starting-position LATEST
{
    "UUID": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
    "StateTransitionReason": "User action",
    "LastModified": 1559906220.72,
    "BatchSize": 100,
    "EventSourceArn": "arn:aws:dynamodb:ap-northeast-1:000000000000:table/test/stream/2019-06-07T10:47:41.099",
    "FunctionArn": "arn:aws:lambda:ap-northeast-1:000000000000:function:test",
    "State": "Creating",
    "LastProcessingResult": "No records processed"
}

連携動作確認

DynamoDBに新規レコードを追加することでLambdaを連動させて、CloudWatchLogsに出力されていることを確認します。

% aws dynamodb put-item --table-name test --item '{"name":{"S":"test3"}}'
% aws logs get-log-events 
    --log-group-name /aws/lambda/test \
    --log-stream-name $(aws logs describe-log-streams --log-group-name  /aws/lambda/test | jq -r '.[] | sort_by(.creationTime) | reverse | .[0].logStreamName')
{
    "nextForwardToken": "f/000000000000000000000000000000000000000000000000000000000",
    "events": [
        {
            "ingestionTime": 1559906927111,
            "timestamp": 1559906927098,
            "message": "START RequestId: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX Version: $LATEST\n"
        },
        {
            "ingestionTime": 1559906942179,
            "timestamp": 1559906927099,
            "message": "[{'eventID': 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX', 'eventName': 'INSERT', 'eventVersion': '1.1', 'eventSource': 'aws:dynamodb', 'awsRegion': 'ap-northeast-1', 'dynamodb': {'ApproximateCreationDateTime': 1559906926.0, 'Keys': {'name': {'S': 'test3'}}, 'NewImage': {'name': {'S': 'test3'}}, 'SequenceNumber': '000000000000000000000', 'SizeBytes': 18, 'StreamViewType': 'NEW_AND_OLD_IMAGES'}, 'eventSourceARN': 'arn:aws:dynamodb:ap-northeast-1:000000000000:table/test/stream/2019-06-07T10:47:41.099'}]\n"
        },
        {
            "ingestionTime": 1559906942179,
            "timestamp": 1559906927109,
            "message": "END RequestId: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX\n"
        },
        {
            "ingestionTime": 1559906942179,
            "timestamp": 1559906927109,
            "message": "REPORT RequestId: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX\tDuration: 10.99 ms\tBilled Duration: 100 ms \tMemory Size: 128 MB\tMax Memory Used: 53 MB\t\n"
        }
    ],
    "nextBackwardToken": "b/000000000000000000000000000000000000000000000000000000000"
}

まとめ

管理コンソールでの操作が快適であることを重々理解できた検証でした。

要素を取得する際にjqがとても効率的で、特に並び替え関数は最新ログの取得等に欠かせませんでした。

部分的にでも役立つことがあれば幸いです。