DynamoDBテーブルのデータ変更をKinesis Data Streamsで直接キャプチャしてS3バケットに出力してみた

2021.11.19

こんにちは、CX事業本部 IoT事業部の若槻です。

今回は、Amazon DynamoDBテーブルのデータ変更をAmazon Kinesis Data Streamsで直接キャプチャしてS3バケットに出力してみました。

DynamoDB Streamを使わなくてもデータ変更をS3バケットに出力できるようになった

今まではDynamoDBテーブル上のデータの変更をS3バケットに保存したい場合は、DynamoDB Streamを使用して下記のような構成を作る必要がありました。

DynamoDB Stream -> Lambda関数 -> 配信ストリーム -> S3バケット

しかし昨年11月のアップデートで、Kinesis Data Streamsで直接キャプチャできるようになりました。

これによりLambda関数を実装することなくS3バケットへの出力ができるようになりました。

やってみた

下記のような構成を作ってみます。

S3バケットの作成

データストリームの作成

Kinesis Data Streamsでデータストリームを作成します。

配信ストリームの作成

作成したKinesis Data Streamsの[アプリケーション]-[コンシューマー]-[Amazon Kinesis Data Firehose]で[配信ストリームを使用した処理を]クリックします。

Kinesis Data Firehoseの配信ストリームの作成画面が開きます。[Choose source and destination]-[Destination]でAmazon S3を選択します。

[Destination settings]-[S3 bucket]で先程作成したバケットを選択します。[S3 bucket prefix]でdata/!{timestamp:yyyy/MM/dd/HH/}を指定します。これにより出力データが日時プレフィクスによりパーティショニングされるようにします。(このプレフィクスの日時はUTC時間となります。)[Create delivery stream]をクリックして作成を完了します。

DynamoDBのデータストリームの設定

データ変更を出力したいテーブルで[エクスポートおよびストリーム]-[Amazon Kinesisデータストリームの詳細]で[有効化]をクリックします。

[ストリームの詳細]-[送信先 Kinesis データストリーム]で先程作成したKinesis Data Streamsを選択します。[ストリームの有効化]をクリックします。

データストリームが有効化中となります。

有効化できました。

動作

DynamoDBテーブルに対してデータ変更の操作を行います。

$ aws dynamodb put-item \
  --table-name deviceTable \
  --item '{ "deviceId": { "S": "d001" } }'

$ aws dynamodb put-item \
  --table-name deviceTable \
  --item '{ "deviceId": { "S": "d002" } }'

$ aws dynamodb put-item \
  --table-name deviceTable \
  --item '{ "deviceId": { "S": "d002" }, "deviceName": { "S": "デバイス002" } }'

$ aws dynamodb delete-item \
  --table-name deviceTable \
  --key '{ "deviceId": { "S": "d002" } }'

数分待つとバケット内に指定のプレフィクスでオブジェクトが作成されました。

オブジェクトに対してS3 Selectでクエリを実行してみます。

すると結果が出力できました。アイテムの変更のキャプチャが取得できています。

{
  "awsRegion": "ap-northeast-1",
  "eventID": "3341ef17-b0eb-4048-8d7e-a1485e1e823b",
  "eventName": "INSERT",
  "userIdentity": null,
  "recordFormat": "application/json",
  "tableName": "deviceTable",
  "dynamodb": {
    "ApproximateCreationDateTime": 1637336077404,
    "Keys": {
      "deviceId": {
        "S": "d001"
      }
    },
    "NewImage": {
      "deviceId": {
        "S": "d001"
      }
    },
    "SizeBytes": 24
  },
  "eventSource": "aws:dynamodb"
}
{
  "awsRegion": "ap-northeast-1",
  "eventID": "89d642f8-f7a6-4f47-935f-304081f5f47d",
  "eventName": "INSERT",
  "userIdentity": null,
  "recordFormat": "application/json",
  "tableName": "deviceTable",
  "dynamodb": {
    "ApproximateCreationDateTime": 1637336083175,
    "Keys": {
      "deviceId": {
        "S": "d002"
      }
    },
    "NewImage": {
      "deviceId": {
        "S": "d002"
      }
    },
    "SizeBytes": 24
  },
  "eventSource": "aws:dynamodb"
}
{
  "awsRegion": "ap-northeast-1",
  "eventID": "7f94e5f0-3041-4952-a7e0-079fe4149dd2",
  "eventName": "MODIFY",
  "userIdentity": null,
  "recordFormat": "application/json",
  "tableName": "deviceTable",
  "dynamodb": {
    "ApproximateCreationDateTime": 1637336088220,
    "Keys": {
      "deviceId": {
        "S": "d002"
      }
    },
    "NewImage": {
      "deviceName": {
        "S": "デバイス002"
      },
      "deviceId": {
        "S": "d002"
      }
    },
    "OldImage": {
      "deviceId": {
        "S": "d002"
      }
    },
    "SizeBytes": 61
  },
  "eventSource": "aws:dynamodb"
}
{
  "awsRegion": "ap-northeast-1",
  "eventID": "886fce5c-1048-4c42-8cb4-eeeb08a8e6a7",
  "eventName": "REMOVE",
  "userIdentity": null,
  "recordFormat": "application/json",
  "tableName": "deviceTable",
  "dynamodb": {
    "ApproximateCreationDateTime": 1637336093198,
    "Keys": {
      "deviceId": {
        "S": "d002"
      }
    },
    "OldImage": {
      "deviceName": {
        "S": "デバイス002"
      },
      "deviceId": {
        "S": "d002"
      }
    },
    "SizeBytes": 49
  },
  "eventSource": "aws:dynamodb"
}

参考

以上