AWS Step FunctionsからDynamoDB StreamにアクセスしてRecordを取得する
こんにちは、CX事業本部 IoT事業部の若槻です。
今回は、AWS Step FunctionsからDynamoDB StreamにアクセスしてRecordを取得する方法を確認してみました。
やりたいこと
DynamoDB Describeにより取得できるStreamDescriptionは次のような形式となります。
{ "StreamDescription": { "CreationRequestDateTime": "2022-08-15T14:13:31.627Z", "KeySchema": [ { "AttributeName": "id", "KeyType": "HASH" } ], "Shards": [ { "SequenceNumberRange": { "StartingSequenceNumber": "1600000000010903384455" }, "ShardId": "shardId-00000001660572811957-f6972ba3" } ], "StreamArn": "arn:aws:dynamodb:ap-northeast-1:XXXXXXXXXXXX:table/test/stream/2022-08-15T14:13:31.638", "StreamLabel": "2022-08-15T14:13:31.638", "StreamStatus": "ENABLED", "StreamViewType": "NEW_AND_OLD_IMAGES", "TableName": "test" } }
今回やりたいことは、このうちStreamDescription.Shards
より取得したShardId
の値をもとに指定のShardにアクセスし、ShardIteratorからRecordを取得することです。
やってみた
実装
作成したState Machineは以下となります。
{ "Comment": "A description of my state machine", "StartAt": "DescribeStream", "States": { "DescribeStream": { "Type": "Task", "Parameters": { "StreamArn.$": "$.streamArn" }, "Resource": "arn:aws:states:::aws-sdk:dynamodbstreams:describeStream", "Next": "Map", "ResultPath": "$.streamDescriptionOutput" }, "Map": { "Type": "Map", "End": true, "Iterator": { "StartAt": "GetShardIterator", "States": { "GetShardIterator": { "Type": "Task", "Parameters": { "ShardId.$": "$.shardId", "ShardIteratorType": "TRIM_HORIZON", "StreamArn.$": "$.streamArn" }, "Resource": "arn:aws:states:::aws-sdk:dynamodbstreams:getShardIterator", "Next": "GetRecords" }, "GetRecords": { "Type": "Task", "End": true, "Parameters": { "ShardIterator.$": "$.ShardIterator" }, "Resource": "arn:aws:states:::aws-sdk:dynamodbstreams:getRecords" } } }, "Parameters": { "shardId.$": "$$.Map.Item.Value.ShardId", "streamArn.$": "$.streamArn" }, "ItemsPath": "$.streamDescriptionOutput.StreamDescription.Shards" } } }
DescribeStream
-> GetShardIterator
-> GetRecords
の順でAPIを使用し、InputのStream ArnをもとにRecordにアクセスしています。方法としてはこれがセオリーのようです。
Returns a shard iterator. A shard iterator provides information about how to retrieve the stream records from within a shard. Use the shard iterator in a subsequent GetRecords request to read the stream records from the shard.
動作確認
DynamoDB Streamを設定したテーブル上のアイテムを更新します。
//before { "id": "あああ" } //after { "id": "いいい" }
DynamoDB StreamのArnを指定してState Macihneを実行します。
すると実行が成功しました。GetRecordsのOutputとして、アイテムの作成を示すRecordsが取得できています。
{ "NextShardIterator": "arn:aws:dynamodb:ap-northeast-1:XXXXXXXXXXXX:table/testTable/stream/2022-08-15T15:34:45.580|1|AAAAAAAAAAHpTfl88BISHJDIUxtK9T127PRXND0Z8WPzbDhWXCxc++IKR/GWW5HYGK6uTfyRv+CN2DAr7QzAAhhieh1asNsKGmma+Nh/33RgpZgacenM35t9KcnJpyFBG9SorYnE1onAweHM9zeaob6uWs9VOyF6a4jFrDx/wwM4f2LSb+iAUio87SaYlOtK/xKJXqgmxbApnvIeeV/vU6ZToa9F5OF09kdkUS5xBb1KCez+C19OzMSAxNGbN/VfRGcLVRhU24lBc11NIyyy9yeh3UgETFfsuzohVSzIchmPSIIPxxfxbwp5+1FDp7igHX4t87/ubZ+JTume8wohsvwdhnwE1Nb7/AjpaIT3k8ZLL5P2Ye5jMSszb81t9KSALW1Nw9Rlf5BwIadyenkDoIyZkk3UQn0rTfl5hov8ee+S5LJxbo3I3duF3FQBOe/+T289yyM9sP9snLYq8fQivpZpKD/b4UTAYJoUev4fhDUIA1cGnmGe8qW7cyx2p9oU0KFzgQQ9sxx053eKkqj75QHewmGiDuPB+oETRQFz5dbf62+wAkuqm8mTfuZ+IXbjysEQQ1QNkNp4tZ0jgnYRo+qYJ5AxJ3f6yHv3JRNcBefibf0AuHyT9v6MDhUGJxKHZSu5jMEjyr28QlRP7Iw1uWMjLAApaWy2eNVLY6bVbI2ZSaAKGHdwXfea7102URRCaTep04U9tuVMetVjMOvIDDOkQxvVH0rOs+r/Em+Upw9XGFiqMJQrRapRm9FdQE4scruYqiIpwmM=", "Records": [ { "AwsRegion": "ap-northeast-1", "Dynamodb": { "ApproximateCreationDateTime": "2022-08-15T15:43:14Z", "Keys": { "id": { "S": "あああ" } }, "NewImage": { "name": { "S": "いいいい" }, "id": { "S": "あああ" } }, "OldImage": { "id": { "S": "あああ" } }, "SequenceNumber": "131800000000014534594972", "SizeBytes": 49, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "EventID": "83ef3176b69a8e40472a0037ff0d5011", "EventName": "MODIFY", "EventSource": "aws:dynamodb", "EventVersion": "1.1" } ] }
NextShardIteratorを使用して次のRecordを取得したい場合
GetRecordsの実行結果でNextShardIterator
が取得された場合は、ShardIteratorを使用してさらに次のRecordを取得することができます。その場合はNextShardIterator
が取得されなくなるまでGetRecordsをループ実行するState Machineを実装することになります。次のエントリが参考になるので合わせて御覧ください。
- AWS Step FunctionsでDynamoDBテーブルからLastEvaluatedKeyによる繰り返し取得をしたアイテムを一つの配列に結合する(AWS CDK v2) | DevelopersIO
- How to Implement a Nested While Loop in AWS Step Functions | DevelopersIO
参考
以上