DynamoDBからStream経由でのLambda関数起動を作成から検証を通してAWSCLIのみで行ってみた
はじめに
Lambda関数の呼び出し方法については多種多様に存在します。
これまで自身が知っていて且つ行ったことのある方法はAPI Gatewayからの実行のみでしたが、業務にて
- 定期的に実行されるバッチがある
- バッチの実行完了を元にLambdaを呼び出す
という要件を低コストで成立可能かどうか調査する機会があり、結果、やり方の一つとしてDynamoDBからStream経由でTriggerによりLambdaが実行可能であることを知りました。
設定については多分そんなに苦労はしないだろうと思っていたものの、実際に動作検証してみたところ意外とハマりがあったため、備忘録兼ねて記録することにしました。
用いるサービスについて
以下の通りです。
- CloudWatch
- IAM
- DynamoDB
- AWS Lambda
設定手順
管理コンソールは更新によるレイアウト変更が発生するため、安定していることが想定できるaws-cliにて実施します。
- DynamoDBにテーブルを作成する
- IAMでRoleを作成する
- 作成したRoleを使ってLambdaを作成する
- LambdaのイベントリソースにDynamoDBを適用する
DynamoDBにテーブルを作成する
必要なパラメータについては以下の通り。
項目 | 詳細 |
---|---|
atrribute-definitions | 要素定義 |
table-name | 作成するテーブル名 |
key-schema | キーとなるスキーマ設計 |
stream-specification | Stream指定 |
provisioned-throughout | スループット |
Stream指定はオプションですが、Triggerの前提となるので有効にしています。
% aws dynamodb create-table \ --attribute-definitions AttributeName=name,AttributeType=S \ --table-name test \ --key-schema AttributeName=name,KeyType=HASH \ --stream-specification StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES \ --provisioned-throughput ReadCapacityUnits=1,WriteCapacityUnits=1 { "TableDescription": { "TableArn": "arn:aws:dynamodb:ap-northeast-1:000000000000:table/test", "AttributeDefinitions": [ { "AttributeName": "name", "AttributeType": "S" } ], "ProvisionedThroughput": { "NumberOfDecreasesToday": 0, "WriteCapacityUnits": 1, "ReadCapacityUnits": 1 }, "TableSizeBytes": 0, "TableName": "test", "TableStatus": "CREATING", "StreamSpecification": { "StreamViewType": "NEW_AND_OLD_IMAGES", "StreamEnabled": true }, "TableId": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX", "LatestStreamLabel": "2019-06-07T01:45:34.828", "KeySchema": [ { "KeyType": "HASH", "AttributeName": "name" } ], "ItemCount": 0, "CreationDateTime": 1559871934.828, "LatestStreamArn": "arn:aws:dynamodb:ap-northeast-1:000000000000:table/test/stream/2019-06-07T01:45:34.828" } }
IAMのRole作成とポリシーの付与
Lambda実行用Roleを作成して、DynamoDB操作用のポリシーを適用します。
Role作成
% aws iam create-role \ --role-name lambda_invoke_by_dynamodb \ --assume-role-policy-document '{"Version": "2012-10-17", "Statement":[{"Action": "sts:AssumeRole","Effect": "Allow","Principal": {"Service": "lambda.amazonaws.com"}}]}' { "Role": { "AssumeRolePolicyDocument": { "Version": "2012-10-17", "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" } } ] }, "RoleId": "XXXXXXXXXXXXXXXXXXXX", "CreateDate": "2019-06-07T08:32:46Z", "RoleName": "lambda_invoke_by_dynamodb", "Path": "/", "Arn": "arn:aws:iam::000000000000:role/lambda_invoke_by_dynamodb" } }
RoleへのPolicy適用
CLIからの確認しやすさを優先して、インラインポリシーとして適用します。
% aws iam put-role-policy \ --role-name lambda_invoke_by_dynamodb \ --policy-name dynamodb-access-by-lambda \ --policy-document '{"Version": "2012-10-17","Statement": [{"Effect": "Allow","Action": ["dynamodb:GetRecords","dynamodb:GetShardIterator","dynamodb:DescribeStream","dynamodb:ListStreams","logs:CreateLogGroup","logs:CreateLogStream","logs:PutLogEvents"],"Resource": "*"}]}' # 確認 % aws iam get-role-policy --role-name lambda_invoke_by_dynamodb --policy-name dynamodb-access-by-lambda { "RoleName": "lambda_invoke_by_dynamodb", "PolicyDocument": { "Version": "2012-10-17", "Statement": [ { "Action": [ "dynamodb:GetRecords", "dynamodb:GetShardIterator", "dynamodb:DescribeStream", "dynamodb:ListStreams", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "*", "Effect": "Allow" } ] }, "PolicyName": "dynamodb-access-by-lambda" }
Lambda関数の作成
関数コードを作成・圧縮し、create-function
に渡します。
関数用コードの作成
DynamoDBから渡ってくるレコードを確認できるようにします。
% echo "import json print('Loading function') def lambda_handler(event, context): for key in event.keys(): print(event[key]) return 'Successfully processed.' " > example.py % zip -r HelloWorld.zip example.py
関数の作成
--zip-file
でのスキーマがfilebとなっていますが、バイナリ転送のためです。
% aws lambda create-function \ --function-name test \ --runtime python3.7 \ --role $(aws iam get-role --role-name lambda_invoke_by_dynamodb | jq -r '.Role.Arn') \ --handler example.lambda_handler \ --zip-file fileb://HelloWorld.zip \ --region ap-northeast-1
DynamoDBのStream参照
イベントリソースとしてDynamoDBのStreamを指定します。
% aws lambda create-event-source-mapping \ --event-source-arn $(aws dynamodb describe-table --table-name test | jq -r '.Table.LatestStreamArn') \ --function-name test \ --enabled \ --starting-position LATEST { "UUID": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX", "StateTransitionReason": "User action", "LastModified": 1559906220.72, "BatchSize": 100, "EventSourceArn": "arn:aws:dynamodb:ap-northeast-1:000000000000:table/test/stream/2019-06-07T10:47:41.099", "FunctionArn": "arn:aws:lambda:ap-northeast-1:000000000000:function:test", "State": "Creating", "LastProcessingResult": "No records processed" }
連携動作確認
DynamoDBに新規レコードを追加することでLambdaを連動させて、CloudWatchLogsに出力されていることを確認します。
% aws dynamodb put-item --table-name test --item '{"name":{"S":"test3"}}' % aws logs get-log-events --log-group-name /aws/lambda/test \ --log-stream-name $(aws logs describe-log-streams --log-group-name /aws/lambda/test | jq -r '.[] | sort_by(.creationTime) | reverse | .[0].logStreamName') { "nextForwardToken": "f/000000000000000000000000000000000000000000000000000000000", "events": [ { "ingestionTime": 1559906927111, "timestamp": 1559906927098, "message": "START RequestId: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX Version: $LATEST\n" }, { "ingestionTime": 1559906942179, "timestamp": 1559906927099, "message": "[{'eventID': 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX', 'eventName': 'INSERT', 'eventVersion': '1.1', 'eventSource': 'aws:dynamodb', 'awsRegion': 'ap-northeast-1', 'dynamodb': {'ApproximateCreationDateTime': 1559906926.0, 'Keys': {'name': {'S': 'test3'}}, 'NewImage': {'name': {'S': 'test3'}}, 'SequenceNumber': '000000000000000000000', 'SizeBytes': 18, 'StreamViewType': 'NEW_AND_OLD_IMAGES'}, 'eventSourceARN': 'arn:aws:dynamodb:ap-northeast-1:000000000000:table/test/stream/2019-06-07T10:47:41.099'}]\n" }, { "ingestionTime": 1559906942179, "timestamp": 1559906927109, "message": "END RequestId: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX\n" }, { "ingestionTime": 1559906942179, "timestamp": 1559906927109, "message": "REPORT RequestId: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX\tDuration: 10.99 ms\tBilled Duration: 100 ms \tMemory Size: 128 MB\tMax Memory Used: 53 MB\t\n" } ], "nextBackwardToken": "b/000000000000000000000000000000000000000000000000000000000" }
まとめ
管理コンソールでの操作が快適であることを重々理解できた検証でした。
要素を取得する際にjq
がとても効率的で、特に並び替え関数は最新ログの取得等に欠かせませんでした。
部分的にでも役立つことがあれば幸いです。