こんにちは、CX事業本部 IoT事業部の若槻です。
今回は、AWS Step FunctionsからDynamoDB StreamにアクセスしてRecordを取得する方法を確認してみました。
やりたいこと
DynamoDB Describeにより取得できるStreamDescriptionは次のような形式となります。
{
"StreamDescription": {
"CreationRequestDateTime": "2022-08-15T14:13:31.627Z",
"KeySchema": [
{
"AttributeName": "id",
"KeyType": "HASH"
}
],
"Shards": [
{
"SequenceNumberRange": {
"StartingSequenceNumber": "1600000000010903384455"
},
"ShardId": "shardId-00000001660572811957-f6972ba3"
}
],
"StreamArn": "arn:aws:dynamodb:ap-northeast-1:XXXXXXXXXXXX:table/test/stream/2022-08-15T14:13:31.638",
"StreamLabel": "2022-08-15T14:13:31.638",
"StreamStatus": "ENABLED",
"StreamViewType": "NEW_AND_OLD_IMAGES",
"TableName": "test"
}
}
今回やりたいことは、このうちStreamDescription.Shards
より取得したShardId
の値をもとに指定のShardにアクセスし、ShardIteratorからRecordを取得することです。
やってみた
実装
作成したState Machineは以下となります。
{
"Comment": "A description of my state machine",
"StartAt": "DescribeStream",
"States": {
"DescribeStream": {
"Type": "Task",
"Parameters": {
"StreamArn.$": "$.streamArn"
},
"Resource": "arn:aws:states:::aws-sdk:dynamodbstreams:describeStream",
"Next": "Map",
"ResultPath": "$.streamDescriptionOutput"
},
"Map": {
"Type": "Map",
"End": true,
"Iterator": {
"StartAt": "GetShardIterator",
"States": {
"GetShardIterator": {
"Type": "Task",
"Parameters": {
"ShardId.$": "$.shardId",
"ShardIteratorType": "TRIM_HORIZON",
"StreamArn.$": "$.streamArn"
},
"Resource": "arn:aws:states:::aws-sdk:dynamodbstreams:getShardIterator",
"Next": "GetRecords"
},
"GetRecords": {
"Type": "Task",
"End": true,
"Parameters": {
"ShardIterator.$": "$.ShardIterator"
},
"Resource": "arn:aws:states:::aws-sdk:dynamodbstreams:getRecords"
}
}
},
"Parameters": {
"shardId.$": "$$.Map.Item.Value.ShardId",
"streamArn.$": "$.streamArn"
},
"ItemsPath": "$.streamDescriptionOutput.StreamDescription.Shards"
}
}
}
DescribeStream
-> GetShardIterator
-> GetRecords
の順でAPIを使用し、InputのStream ArnをもとにRecordにアクセスしています。方法としてはこれがセオリーのようです。
Returns a shard iterator. A shard iterator provides information about how to retrieve the stream records from within a shard. Use the shard iterator in a subsequent GetRecords request to read the stream records from the shard.
動作確認
DynamoDB Streamを設定したテーブル上のアイテムを更新します。
//before
{
"id": "あああ"
}
//after
{
"id": "いいい"
}
DynamoDB StreamのArnを指定してState Macihneを実行します。
すると実行が成功しました。GetRecordsのOutputとして、アイテムの作成を示すRecordsが取得できています。
{
"NextShardIterator": "arn:aws:dynamodb:ap-northeast-1:XXXXXXXXXXXX:table/testTable/stream/2022-08-15T15:34:45.580|1|AAAAAAAAAAHpTfl88BISHJDIUxtK9T127PRXND0Z8WPzbDhWXCxc++IKR/GWW5HYGK6uTfyRv+CN2DAr7QzAAhhieh1asNsKGmma+Nh/33RgpZgacenM35t9KcnJpyFBG9SorYnE1onAweHM9zeaob6uWs9VOyF6a4jFrDx/wwM4f2LSb+iAUio87SaYlOtK/xKJXqgmxbApnvIeeV/vU6ZToa9F5OF09kdkUS5xBb1KCez+C19OzMSAxNGbN/VfRGcLVRhU24lBc11NIyyy9yeh3UgETFfsuzohVSzIchmPSIIPxxfxbwp5+1FDp7igHX4t87/ubZ+JTume8wohsvwdhnwE1Nb7/AjpaIT3k8ZLL5P2Ye5jMSszb81t9KSALW1Nw9Rlf5BwIadyenkDoIyZkk3UQn0rTfl5hov8ee+S5LJxbo3I3duF3FQBOe/+T289yyM9sP9snLYq8fQivpZpKD/b4UTAYJoUev4fhDUIA1cGnmGe8qW7cyx2p9oU0KFzgQQ9sxx053eKkqj75QHewmGiDuPB+oETRQFz5dbf62+wAkuqm8mTfuZ+IXbjysEQQ1QNkNp4tZ0jgnYRo+qYJ5AxJ3f6yHv3JRNcBefibf0AuHyT9v6MDhUGJxKHZSu5jMEjyr28QlRP7Iw1uWMjLAApaWy2eNVLY6bVbI2ZSaAKGHdwXfea7102URRCaTep04U9tuVMetVjMOvIDDOkQxvVH0rOs+r/Em+Upw9XGFiqMJQrRapRm9FdQE4scruYqiIpwmM=",
"Records": [
{
"AwsRegion": "ap-northeast-1",
"Dynamodb": {
"ApproximateCreationDateTime": "2022-08-15T15:43:14Z",
"Keys": {
"id": {
"S": "あああ"
}
},
"NewImage": {
"name": {
"S": "いいいい"
},
"id": {
"S": "あああ"
}
},
"OldImage": {
"id": {
"S": "あああ"
}
},
"SequenceNumber": "131800000000014534594972",
"SizeBytes": 49,
"StreamViewType": "NEW_AND_OLD_IMAGES"
},
"EventID": "83ef3176b69a8e40472a0037ff0d5011",
"EventName": "MODIFY",
"EventSource": "aws:dynamodb",
"EventVersion": "1.1"
}
]
}
NextShardIteratorを使用して次のRecordを取得したい場合
GetRecordsの実行結果でNextShardIterator
が取得された場合は、ShardIteratorを使用してさらに次のRecordを取得することができます。その場合はNextShardIterator
が取得されなくなるまでGetRecordsをループ実行するState Machineを実装することになります。次のエントリが参考になるので合わせて御覧ください。
- AWS Step FunctionsでDynamoDBテーブルからLastEvaluatedKeyによる繰り返し取得をしたアイテムを一つの配列に結合する(AWS CDK v2) | DevelopersIO
- How to Implement a Nested While Loop in AWS Step Functions | DevelopersIO
参考
以上