DynamoDBテーブルのデータ変更をKinesis Data Streamsで直接キャプチャしてS3バケットに出力してみた(AWS CDK)

DynamoDBテーブルのデータ変更をKinesis Data Streamsで直接キャプチャしてS3バケットに出力してみた(AWS CDK)

Clock Icon2021.11.20 14:53

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、CX事業本部 IoT事業部の若槻です。

前回の下記エントリではDynamoDB Streamを使わずにデータ変更をS3出力する構成をマネジメントコンソールから作りました。

今回は、同じくAmazon DynamoDBテーブルのデータ変更をAmazon Kinesis Data Streamsで直接キャプチャしてS3バケットに出力する構成を、AWS CDKで作成してみました。

やってみた

下記のような構成を作ってみます。

CDKコード

import * as cdk from '@aws-cdk/core';
import * as dynamodb from '@aws-cdk/aws-dynamodb';
import * as s3 from '@aws-cdk/aws-s3';
import * as kinesis from '@aws-cdk/aws-kinesis';
import * as kinesisfirehose from '@aws-cdk/aws-kinesisfirehose';
import * as kinesisDestinations from '@aws-cdk/aws-kinesisfirehose-destinations';

export class AwsCdkAppStack extends cdk.Stack {
  constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    //S3バケット
    const deviceTableDataChengeLogsBucket = new s3.Bucket(
      this,
      'deviceTableDataChengeLogs'
    );

    //Kinesis Data Stream
    const deviceTableDataChangeDataStream = new kinesis.Stream(
      this,
      'deviceTableDataChangeStream',
      {
        shardCount: 1,
      }
    );

    //配信ストリーム
    new kinesisfirehose.DeliveryStream(
      this,
      'deviceTableDataChangeDeliveryStream',
      {
        sourceStream: deviceTableDataChangeDataStream,
        destinations: [
          new kinesisDestinations.S3Bucket(deviceTableDataChengeLogsBucket, {
            dataOutputPrefix: 'data/!{timestamp:yyyy/MM/dd/HH/}',
            errorOutputPrefix:
              'error/!{firehose:error-output-type}/!{timestamp:yyyy/MM/dd/HH/}',
          }),
        ],
      }
    );

    //DynamoDBテーブル
    new dynamodb.Table(this, 'deviceTable', {
      tableName: 'deviceTable',
      partitionKey: { name: 'deviceId', type: dynamodb.AttributeType.STRING },
      billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
      kinesisStream: deviceTableDataChangeDataStream,
    });
  }
}

cdk deployでデプロイします。

動作

DynamoDBテーブルに対してデータ変更の操作を行います。

$ aws dynamodb put-item \
  --table-name deviceTable \
  --item '{ "deviceId": { "S": "d001" } }'
$ aws dynamodb put-item \
  --table-name deviceTable \
  --item '{ "deviceId": { "S": "d002" } }'
$ aws dynamodb put-item \
  --table-name deviceTable \
  --item '{ "deviceId": { "S": "d002" }, "deviceName": { "S": "デバイス002" } }'
$ aws dynamodb delete-item \
  --table-name deviceTable \
  --key '{ "deviceId": { "S": "d002" } }'

数分待つとバケット内に指定のプレフィクスでオブジェクトが作成されました。

オブジェクトに対してS3 Selectでクエリを実行してみます。

すると結果が出力できました。アイテムの変更のキャプチャが取得できています。

{
  "awsRegion": "ap-northeast-1",
  "eventID": "a77152e9-4a99-40b6-bd44-ecf4760123e3",
  "eventName": "INSERT",
  "userIdentity": null,
  "recordFormat": "application/json",
  "tableName": "deviceTable",
  "dynamodb": {
    "ApproximateCreationDateTime": 1637419833903,
    "Keys": {
      "deviceId": {
        "S": "d001"
      }
    },
    "NewImage": {
      "deviceId": {
        "S": "d001"
      }
    },
    "SizeBytes": 24
  },
  "eventSource": "aws:dynamodb"
}
{
  "awsRegion": "ap-northeast-1",
  "eventID": "6bcdb5e3-8cd1-4cbe-9008-7c85e8cef954",
  "eventName": "INSERT",
  "userIdentity": null,
  "recordFormat": "application/json",
  "tableName": "deviceTable",
  "dynamodb": {
    "ApproximateCreationDateTime": 1637419838528,
    "Keys": {
      "deviceId": {
        "S": "d002"
      }
    },
    "NewImage": {
      "deviceId": {
        "S": "d002"
      }
    },
    "SizeBytes": 24
  },
  "eventSource": "aws:dynamodb"
}
{
  "awsRegion": "ap-northeast-1",
  "eventID": "5eeb2c47-daad-4e08-8b46-ea8da858c74b",
  "eventName": "MODIFY",
  "userIdentity": null,
  "recordFormat": "application/json",
  "tableName": "deviceTable",
  "dynamodb": {
    "ApproximateCreationDateTime": 1637419842822,
    "Keys": {
      "deviceId": {
        "S": "d002"
      }
    },
    "NewImage": {
      "deviceName": {
        "S": "デバイス002"
      },
      "deviceId": {
        "S": "d002"
      }
    },
    "OldImage": {
      "deviceId": {
        "S": "d002"
      }
    },
    "SizeBytes": 61
  },
  "eventSource": "aws:dynamodb"
}
{
  "awsRegion": "ap-northeast-1",
  "eventID": "527126e2-3d04-4bee-af42-8a4e2e26b3fc",
  "eventName": "REMOVE",
  "userIdentity": null,
  "recordFormat": "application/json",
  "tableName": "deviceTable",
  "dynamodb": {
    "ApproximateCreationDateTime": 1637419846152,
    "Keys": {
      "deviceId": {
        "S": "d002"
      }
    },
    "OldImage": {
      "deviceName": {
        "S": "デバイス002"
      },
      "deviceId": {
        "S": "d002"
      }
    },
    "SizeBytes": 49
  },
  "eventSource": "aws:dynamodb"
}

注意点

DeliveryStream()はExperimental(2021/11時点)

Amazon Kinesis Data Firehose Construct Library(@aws-cdk/aws-kinesisfirehoseモジュール)では、今回利用したDeliveryStream()は、2021/11時点でExperimentalとなっています。よって将来バージョンで大きな変更がある可能性があるためモジュールのアップグレード時に下位互換のない変更が発生する場合があります。

The APIs of higher level constructs in this module are experimental and under active development. They are subject to non-backward compatible changes or removal in any future version. These are not subject to the Semantic Versioning model and breaking changes will be announced in the release notes. This means that while you may use them, you may need to update your source code when upgrading to a newer version of this package.

そのためDeliveryStream()の利用は一時的なPoCプロジェクトなどに限って利用するようにし、その場合でもセマンティックバージョンは固定するようにした方が良さそうです。また本番システムで利用したい場合はすでにstableとなっているCfnDeliveryStream()を利用するようにしましょう。

(追記)JSON Lines形式で出力したい場合は別途処理の実装が必要

Delivery Streamで出力したストリームデータは、既定ではJSONが1行につながった形式となります。この形式の場合はS3 Selectでは上手くクエリできますが、Amazon Athena(Hive JSON SerDe)ではクエリできません。

クエリ可能とする場合は、Delivery Streamで別途処理の実装が必要です。実装方法など詳しくは下記で触れていますので合わせて御覧ください。

以上

この記事をシェアする

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.