DynamoDBテーブルのデータ変更をKinesis Data Streamsで直接キャプチャしてS3バケットに出力してみた

2021.11.19

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、CX事業本部 IoT事業部の若槻です。

今回は、Amazon DynamoDBテーブルのデータ変更をAmazon Kinesis Data Streamsで直接キャプチャしてS3バケットに出力してみました。

DynamoDB Streamを使わなくてもデータ変更をS3バケットに出力できるようになった

今まではDynamoDBテーブル上のデータの変更をS3バケットに保存したい場合は、DynamoDB Streamを使用して下記のような構成を作る必要がありました。

今まで

DynamoDB Stream -> Lambda関数 -> 配信ストリーム -> S3バケット

しかし昨年11月のアップデートで、Kinesis Data Streamsで直接キャプチャできるようになりました。

これによりLambda関数を実装することなくS3バケットへの出力ができるようになりました。

やってみた

下記のような構成を作ってみます。

S3バケットの作成

データストリームの作成

Kinesis Data Streamsでデータストリームを作成します。

配信ストリームの作成

作成したKinesis Data Streamsの[アプリケーション]-[コンシューマー]-[Amazon Kinesis Data Firehose]で[配信ストリームを使用した処理を]クリックします。

Kinesis Data Firehoseの配信ストリームの作成画面が開きます。[Choose source and destination]-[Destination]でAmazon S3を選択します。

[Destination settings]-[S3 bucket]で先程作成したバケットを選択します。[S3 bucket prefix]でdata/!{timestamp:yyyy/MM/dd/HH/}を指定します。これにより出力データが日時プレフィクスによりパーティショニングされるようにします。(このプレフィクスの日時はUTC時間となります。)[Create delivery stream]をクリックして作成を完了します。

DynamoDBのデータストリームの設定

データ変更を出力したいテーブルで[エクスポートおよびストリーム]-[Amazon Kinesisデータストリームの詳細]で[有効化]をクリックします。

[ストリームの詳細]-[送信先 Kinesis データストリーム]で先程作成したKinesis Data Streamsを選択します。[ストリームの有効化]をクリックします。

データストリームが有効化中となります。

有効化できました。

動作

DynamoDBテーブルに対してデータ変更の操作を行います。

$ aws dynamodb put-item \
  --table-name deviceTable \
  --item '{ "deviceId": { "S": "d001" } }'

$ aws dynamodb put-item \
  --table-name deviceTable \
  --item '{ "deviceId": { "S": "d002" } }'

$ aws dynamodb put-item \
  --table-name deviceTable \
  --item '{ "deviceId": { "S": "d002" }, "deviceName": { "S": "デバイス002" } }'

$ aws dynamodb delete-item \
  --table-name deviceTable \
  --key '{ "deviceId": { "S": "d002" } }'

数分待つとバケット内に指定のプレフィクスでオブジェクトが作成されました。

オブジェクトに対してS3 Selectでクエリを実行してみます。

すると結果が出力できました。アイテムの変更のキャプチャが取得できています。

{
  "awsRegion": "ap-northeast-1",
  "eventID": "3341ef17-b0eb-4048-8d7e-a1485e1e823b",
  "eventName": "INSERT",
  "userIdentity": null,
  "recordFormat": "application/json",
  "tableName": "deviceTable",
  "dynamodb": {
    "ApproximateCreationDateTime": 1637336077404,
    "Keys": {
      "deviceId": {
        "S": "d001"
      }
    },
    "NewImage": {
      "deviceId": {
        "S": "d001"
      }
    },
    "SizeBytes": 24
  },
  "eventSource": "aws:dynamodb"
}
{
  "awsRegion": "ap-northeast-1",
  "eventID": "89d642f8-f7a6-4f47-935f-304081f5f47d",
  "eventName": "INSERT",
  "userIdentity": null,
  "recordFormat": "application/json",
  "tableName": "deviceTable",
  "dynamodb": {
    "ApproximateCreationDateTime": 1637336083175,
    "Keys": {
      "deviceId": {
        "S": "d002"
      }
    },
    "NewImage": {
      "deviceId": {
        "S": "d002"
      }
    },
    "SizeBytes": 24
  },
  "eventSource": "aws:dynamodb"
}
{
  "awsRegion": "ap-northeast-1",
  "eventID": "7f94e5f0-3041-4952-a7e0-079fe4149dd2",
  "eventName": "MODIFY",
  "userIdentity": null,
  "recordFormat": "application/json",
  "tableName": "deviceTable",
  "dynamodb": {
    "ApproximateCreationDateTime": 1637336088220,
    "Keys": {
      "deviceId": {
        "S": "d002"
      }
    },
    "NewImage": {
      "deviceName": {
        "S": "デバイス002"
      },
      "deviceId": {
        "S": "d002"
      }
    },
    "OldImage": {
      "deviceId": {
        "S": "d002"
      }
    },
    "SizeBytes": 61
  },
  "eventSource": "aws:dynamodb"
}
{
  "awsRegion": "ap-northeast-1",
  "eventID": "886fce5c-1048-4c42-8cb4-eeeb08a8e6a7",
  "eventName": "REMOVE",
  "userIdentity": null,
  "recordFormat": "application/json",
  "tableName": "deviceTable",
  "dynamodb": {
    "ApproximateCreationDateTime": 1637336093198,
    "Keys": {
      "deviceId": {
        "S": "d002"
      }
    },
    "OldImage": {
      "deviceName": {
        "S": "デバイス002"
      },
      "deviceId": {
        "S": "d002"
      }
    },
    "SizeBytes": 49
  },
  "eventSource": "aws:dynamodb"
}

(追記)JSON Lines形式で出力したい場合は別途処理の実装が必要

配信ストリームで出力したストリームデータは、既定ではJSONが1行につながった形式となります。この形式の場合はS3 Selectでは上手くクエリできますが、Amazon Athena(Hive JSON SerDe)ではクエリできません。

クエリ可能とする場合は、配信ストリームで別途処理の実装が必要です。実装方法など詳しくは下記で触れていますので合わせて御覧ください。

参考

以上