AWS Step FunctionsからDynamoDB StreamにアクセスしてRecordを取得する

2022.08.15

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、CX事業本部 IoT事業部の若槻です。

今回は、AWS Step FunctionsからDynamoDB StreamにアクセスしてRecordを取得する方法を確認してみました。

やりたいこと

DynamoDB Describeにより取得できるStreamDescriptionは次のような形式となります。

{
  "StreamDescription": {
    "CreationRequestDateTime": "2022-08-15T14:13:31.627Z",
    "KeySchema": [
      {
        "AttributeName": "id",
        "KeyType": "HASH"
      }
    ],
    "Shards": [
      {
        "SequenceNumberRange": {
          "StartingSequenceNumber": "1600000000010903384455"
        },
        "ShardId": "shardId-00000001660572811957-f6972ba3"
      }
    ],
    "StreamArn": "arn:aws:dynamodb:ap-northeast-1:XXXXXXXXXXXX:table/test/stream/2022-08-15T14:13:31.638",
    "StreamLabel": "2022-08-15T14:13:31.638",
    "StreamStatus": "ENABLED",
    "StreamViewType": "NEW_AND_OLD_IMAGES",
    "TableName": "test"
  }
}

今回やりたいことは、このうちStreamDescription.Shardsより取得したShardIdの値をもとに指定のShardにアクセスし、ShardIteratorからRecordを取得することです。

やってみた

実装

作成したState Machineは以下となります。

{
  "Comment": "A description of my state machine",
  "StartAt": "DescribeStream",
  "States": {
    "DescribeStream": {
      "Type": "Task",
      "Parameters": {
        "StreamArn.$": "$.streamArn"
      },
      "Resource": "arn:aws:states:::aws-sdk:dynamodbstreams:describeStream",
      "Next": "Map",
      "ResultPath": "$.streamDescriptionOutput"
    },
    "Map": {
      "Type": "Map",
      "End": true,
      "Iterator": {
        "StartAt": "GetShardIterator",
        "States": {
          "GetShardIterator": {
            "Type": "Task",
            "Parameters": {
              "ShardId.$": "$.shardId",
              "ShardIteratorType": "TRIM_HORIZON",
              "StreamArn.$": "$.streamArn"
            },
            "Resource": "arn:aws:states:::aws-sdk:dynamodbstreams:getShardIterator",
            "Next": "GetRecords"
          },
          "GetRecords": {
            "Type": "Task",
            "End": true,
            "Parameters": {
              "ShardIterator.$": "$.ShardIterator"
            },
            "Resource": "arn:aws:states:::aws-sdk:dynamodbstreams:getRecords"
          }
        }
      },
      "Parameters": {
        "shardId.$": "$$.Map.Item.Value.ShardId",
        "streamArn.$": "$.streamArn"
      },
      "ItemsPath": "$.streamDescriptionOutput.StreamDescription.Shards"
    }
  }
}

DescribeStream -> GetShardIterator -> GetRecordsの順でAPIを使用し、InputのStream ArnをもとにRecordにアクセスしています。方法としてはこれがセオリーのようです。

Returns a shard iterator. A shard iterator provides information about how to retrieve the stream records from within a shard. Use the shard iterator in a subsequent GetRecords request to read the stream records from the shard.

動作確認

DynamoDB Streamを設定したテーブル上のアイテムを更新します。

//before
{
 "id": "あああ"
}

//after
{
 "id": "いいい"
}

DynamoDB StreamのArnを指定してState Macihneを実行します。

すると実行が成功しました。GetRecordsのOutputとして、アイテムの作成を示すRecordsが取得できています。

{
  "NextShardIterator": "arn:aws:dynamodb:ap-northeast-1:XXXXXXXXXXXX:table/testTable/stream/2022-08-15T15:34:45.580|1|AAAAAAAAAAHpTfl88BISHJDIUxtK9T127PRXND0Z8WPzbDhWXCxc++IKR/GWW5HYGK6uTfyRv+CN2DAr7QzAAhhieh1asNsKGmma+Nh/33RgpZgacenM35t9KcnJpyFBG9SorYnE1onAweHM9zeaob6uWs9VOyF6a4jFrDx/wwM4f2LSb+iAUio87SaYlOtK/xKJXqgmxbApnvIeeV/vU6ZToa9F5OF09kdkUS5xBb1KCez+C19OzMSAxNGbN/VfRGcLVRhU24lBc11NIyyy9yeh3UgETFfsuzohVSzIchmPSIIPxxfxbwp5+1FDp7igHX4t87/ubZ+JTume8wohsvwdhnwE1Nb7/AjpaIT3k8ZLL5P2Ye5jMSszb81t9KSALW1Nw9Rlf5BwIadyenkDoIyZkk3UQn0rTfl5hov8ee+S5LJxbo3I3duF3FQBOe/+T289yyM9sP9snLYq8fQivpZpKD/b4UTAYJoUev4fhDUIA1cGnmGe8qW7cyx2p9oU0KFzgQQ9sxx053eKkqj75QHewmGiDuPB+oETRQFz5dbf62+wAkuqm8mTfuZ+IXbjysEQQ1QNkNp4tZ0jgnYRo+qYJ5AxJ3f6yHv3JRNcBefibf0AuHyT9v6MDhUGJxKHZSu5jMEjyr28QlRP7Iw1uWMjLAApaWy2eNVLY6bVbI2ZSaAKGHdwXfea7102URRCaTep04U9tuVMetVjMOvIDDOkQxvVH0rOs+r/Em+Upw9XGFiqMJQrRapRm9FdQE4scruYqiIpwmM=",
  "Records": [
    {
      "AwsRegion": "ap-northeast-1",
      "Dynamodb": {
        "ApproximateCreationDateTime": "2022-08-15T15:43:14Z",
        "Keys": {
          "id": {
            "S": "あああ"
          }
        },
        "NewImage": {
          "name": {
            "S": "いいいい"
          },
          "id": {
            "S": "あああ"
          }
        },
        "OldImage": {
          "id": {
            "S": "あああ"
          }
        },
        "SequenceNumber": "131800000000014534594972",
        "SizeBytes": 49,
        "StreamViewType": "NEW_AND_OLD_IMAGES"
      },
      "EventID": "83ef3176b69a8e40472a0037ff0d5011",
      "EventName": "MODIFY",
      "EventSource": "aws:dynamodb",
      "EventVersion": "1.1"
    }
  ]
}

NextShardIteratorを使用して次のRecordを取得したい場合

GetRecordsの実行結果でNextShardIteratorが取得された場合は、ShardIteratorを使用してさらに次のRecordを取得することができます。その場合はNextShardIteratorが取得されなくなるまでGetRecordsをループ実行するState Machineを実装することになります。次のエントリが参考になるので合わせて御覧ください。

参考

以上