DynamoDBテーブルのデータ変更をKinesis Data Streamsで直接キャプチャしてS3バケットに出力してみた(AWS CDK)

2021.11.20

こんにちは、CX事業本部 IoT事業部の若槻です。

前回の下記エントリではDynamoDB Streamを使わずにデータ変更をS3出力する構成をマネジメントコンソールから作りました。

今回は、同じくAmazon DynamoDBテーブルのデータ変更をAmazon Kinesis Data Streamsで直接キャプチャしてS3バケットに出力する構成を、AWS CDKで作成してみました。

やってみた

下記のような構成を作ってみます。

CDKコード

lib/aws-cdk-app-stack.ts

import * as cdk from '@aws-cdk/core';
import * as dynamodb from '@aws-cdk/aws-dynamodb';
import * as s3 from '@aws-cdk/aws-s3';
import * as kinesis from '@aws-cdk/aws-kinesis';
import * as kinesisfirehose from '@aws-cdk/aws-kinesisfirehose';
import * as kinesisDestinations from '@aws-cdk/aws-kinesisfirehose-destinations';

export class AwsCdkAppStack extends cdk.Stack {
  constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    //S3バケット
    const deviceTableDataChengeLogsBucket = new s3.Bucket(
      this,
      'deviceTableDataChengeLogs'
    );

    //Kinesis Data Stream
    const deviceTableDataChangeDataStream = new kinesis.Stream(
      this,
      'deviceTableDataChangeStream',
      {
        shardCount: 1,
      }
    );

    //配信ストリーム
    new kinesisfirehose.DeliveryStream(
      this,
      'deviceTableDataChangeDeliveryStream',
      {
        sourceStream: deviceTableDataChangeDataStream,
        destinations: [
          new kinesisDestinations.S3Bucket(deviceTableDataChengeLogsBucket, {
            dataOutputPrefix: 'data/!{timestamp:yyyy/MM/dd/HH/}',
            errorOutputPrefix:
              'error/!{firehose:error-output-type}/!{timestamp:yyyy/MM/dd/HH/}',
          }),
        ],
      }
    );

    //DynamoDBテーブル
    new dynamodb.Table(this, 'deviceTable', {
      tableName: 'deviceTable',
      partitionKey: { name: 'deviceId', type: dynamodb.AttributeType.STRING },
      billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
      kinesisStream: deviceTableDataChangeDataStream,
    });
  }
}

cdk deployでデプロイします。

動作

DynamoDBテーブルに対してデータ変更の操作を行います。

$ aws dynamodb put-item \
  --table-name deviceTable \
  --item '{ "deviceId": { "S": "d001" } }'
$ aws dynamodb put-item \
  --table-name deviceTable \
  --item '{ "deviceId": { "S": "d002" } }'
$ aws dynamodb put-item \
  --table-name deviceTable \
  --item '{ "deviceId": { "S": "d002" }, "deviceName": { "S": "デバイス002" } }'
$ aws dynamodb delete-item \
  --table-name deviceTable \
  --key '{ "deviceId": { "S": "d002" } }'

数分待つとバケット内に指定のプレフィクスでオブジェクトが作成されました。

オブジェクトに対してS3 Selectでクエリを実行してみます。

すると結果が出力できました。アイテムの変更のキャプチャが取得できています。

{
  "awsRegion": "ap-northeast-1",
  "eventID": "a77152e9-4a99-40b6-bd44-ecf4760123e3",
  "eventName": "INSERT",
  "userIdentity": null,
  "recordFormat": "application/json",
  "tableName": "deviceTable",
  "dynamodb": {
    "ApproximateCreationDateTime": 1637419833903,
    "Keys": {
      "deviceId": {
        "S": "d001"
      }
    },
    "NewImage": {
      "deviceId": {
        "S": "d001"
      }
    },
    "SizeBytes": 24
  },
  "eventSource": "aws:dynamodb"
}
{
  "awsRegion": "ap-northeast-1",
  "eventID": "6bcdb5e3-8cd1-4cbe-9008-7c85e8cef954",
  "eventName": "INSERT",
  "userIdentity": null,
  "recordFormat": "application/json",
  "tableName": "deviceTable",
  "dynamodb": {
    "ApproximateCreationDateTime": 1637419838528,
    "Keys": {
      "deviceId": {
        "S": "d002"
      }
    },
    "NewImage": {
      "deviceId": {
        "S": "d002"
      }
    },
    "SizeBytes": 24
  },
  "eventSource": "aws:dynamodb"
}
{
  "awsRegion": "ap-northeast-1",
  "eventID": "5eeb2c47-daad-4e08-8b46-ea8da858c74b",
  "eventName": "MODIFY",
  "userIdentity": null,
  "recordFormat": "application/json",
  "tableName": "deviceTable",
  "dynamodb": {
    "ApproximateCreationDateTime": 1637419842822,
    "Keys": {
      "deviceId": {
        "S": "d002"
      }
    },
    "NewImage": {
      "deviceName": {
        "S": "デバイス002"
      },
      "deviceId": {
        "S": "d002"
      }
    },
    "OldImage": {
      "deviceId": {
        "S": "d002"
      }
    },
    "SizeBytes": 61
  },
  "eventSource": "aws:dynamodb"
}
{
  "awsRegion": "ap-northeast-1",
  "eventID": "527126e2-3d04-4bee-af42-8a4e2e26b3fc",
  "eventName": "REMOVE",
  "userIdentity": null,
  "recordFormat": "application/json",
  "tableName": "deviceTable",
  "dynamodb": {
    "ApproximateCreationDateTime": 1637419846152,
    "Keys": {
      "deviceId": {
        "S": "d002"
      }
    },
    "OldImage": {
      "deviceName": {
        "S": "デバイス002"
      },
      "deviceId": {
        "S": "d002"
      }
    },
    "SizeBytes": 49
  },
  "eventSource": "aws:dynamodb"
}

注意点

DeliveryStream()はExperimental(2021/11時点)

Amazon Kinesis Data Firehose Construct Library(@aws-cdk/aws-kinesisfirehoseモジュール)では、今回利用したDeliveryStream()は、2021/11時点でExperimentalとなっています。よって将来バージョンで大きな変更がある可能性があるためモジュールのアップグレード時に下位互換のない変更が発生する場合があります。

The APIs of higher level constructs in this module are experimental and under active development. They are subject to non-backward compatible changes or removal in any future version. These are not subject to the Semantic Versioning model and breaking changes will be announced in the release notes. This means that while you may use them, you may need to update your source code when upgrading to a newer version of this package.

そのためDeliveryStream()の利用は一時的なPoCプロジェクトなどに限って利用するようにし、その場合でもセマンティックバージョンは固定するようにした方が良さそうです。また本番システムで利用したい場合はすでにstableとなっているCfnDeliveryStream()を利用するようにしましょう。

以上