DynamoDBの変更履歴をS3に保存する仕組みをAWS CDKを使って実装してみた

2023.10.05

はじめに

DynamoDBを使っていると、何かあった時のために変更履歴を後から確認できるようにしておきたいですよね。

以下のブログのような仕組みを作ることで、Athenaから調査できる状態にしておけることを知りました。

今回はこの仕組みをAWS CDKを使って実装してみます。アーキテクチャとして新しいことはないので、CDK(Typescript)実装の参考としてください。

事前準備

cdk環境を用意するため、以下のコマンドをターミナルで実行してください。

cdk init app --language typescript

Stack

スタックのリソースは以下の通りです。

  • DynamoDBテーブル
  • S3バケット
  • Kinesis Firehose
    • IAMロール
  • Lambda
    • IAMロール
    • トリガー

/lib/cdk-dynamo-streams-stack.ts

import * as cdk from 'aws-cdk-lib';
import * as s3 from 'aws-cdk-lib/aws-s3';
import * as dynamodb from 'aws-cdk-lib/aws-dynamodb';
import * as firehose from 'aws-cdk-lib/aws-kinesisfirehose';
import * as iam from 'aws-cdk-lib/aws-iam';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import { Construct } from 'constructs';
import { DynamoEventSource } from 'aws-cdk-lib/aws-lambda-event-sources';

export class CdkDynamoStreamsStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    // ===================================================================
    // DynamoDBのテーブル
    const sampleTable = new dynamodb.Table(this, 'SampleTable', {
      tableName: 'sample-table',
      partitionKey: {
        name: 'Id',
        type: dynamodb.AttributeType.STRING,
      },
      billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
      pointInTimeRecovery: true,
      stream: dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,
    });

    // ===================================================================
    // DynamoDBの変更履歴を保存するS3バケット
    const dynamodbStreamsBucket = new s3.Bucket(this, 'DynamodbStreamsBucket', {
      bucketName: 'dynamodb-streams-bucket-'+this.account,
      versioned: true,
    });

    // ===================================================================
    // S3バケットへ出力するためのFirehose
    const dynamodbStreamsDeliveryStream = new firehose.CfnDeliveryStream(this, 'DeliveryStream', {
      deliveryStreamType: 'DirectPut',
      deliveryStreamName: 'dynamodb-streams-delivery-stream',
      extendedS3DestinationConfiguration: {
        bucketArn: dynamodbStreamsBucket.bucketArn,
        bufferingHints: {
          intervalInSeconds: 60,
          sizeInMBs: 50,
        },
        prefix: 'sample-table/',
        compressionFormat: 'UNCOMPRESSED',
        roleArn: new iam.Role(this, 'FirehoseRole', {
          assumedBy: new iam.ServicePrincipal('firehose.amazonaws.com'),
          managedPolicies: [iam.ManagedPolicy.fromAwsManagedPolicyName('AmazonS3FullAccess')],
        }).roleArn,
      },
    });

    // ===================================================================
    // Lambda関連リソース

    const dynamodbStreamsFunctionRole = new iam.Role(this, 'DynamodbStreamsFunctionRole', {
      roleName: 'dynamodb-streams-function-role',
      assumedBy: new iam.ServicePrincipal('lambda.amazonaws.com'),
      managedPolicies: [
        new iam.ManagedPolicy(this, 'dynamodb-streams-function-policy', {
          managedPolicyName: 'dynamodb-streams-function-policy',
          statements: [
            new iam.PolicyStatement({
              effect: iam.Effect.ALLOW,
              actions: ['logs:CreateLogGroup', 'logs:CreateLogStream', 'logs:PutLogEvents'],
              resources: ['arn:aws:logs:*:*:*'],
            }),
            new iam.PolicyStatement({
              effect: iam.Effect.ALLOW,
              actions: ['firehose:PutRecord'],
              resources: ['*'],
            }),
          ],
        })
      ],
    });

    const dynamodbStreamsFunction = new lambda.Function(this, 'DynamodbStreamsFunction', {
      code: new lambda.AssetCode('src'),
      handler: 'streams_to_kinesis.handler',
      runtime: lambda.Runtime.PYTHON_3_10,
      role: dynamodbStreamsFunctionRole,
      functionName: 'dynamodb-streams-function',
      environment: {
        DELIVERY_STREAM_NAME: String(dynamodbStreamsDeliveryStream.deliveryStreamName),
      },
      timeout: cdk.Duration.seconds(180),
    });
    // DynamoDBテーブルのストリームをトリガーとしてLambda関数を設定
    dynamodbStreamsFunction.addEventSource(
      new DynamoEventSource(sampleTable, {
        startingPosition: lambda.StartingPosition.TRIM_HORIZON,
      })
    );
  }
}

Lambda

LambdaのコードはPython3.10を使っています。 ルートディレクトリにsrcフォルダーを作成して、その中にstreams_to_kinesis.pyを作成します。

こちらのコードは前述したこちらのブログで記載しているものをそのまま使わせて頂きました。

/src/streams_to_kinesis.py

# 参考: https://dev.classmethod.jp/articles/dynamodb-streams-kinesis-firehose-athena/
import json
import os
from datetime import datetime

import boto3
from boto3.dynamodb.types import TypeDeserializer

firehose = boto3.client("firehose")
dynamodb_deserializer = TypeDeserializer()


def handler(event, context):
    for item in event["Records"]:
        timestamp = int(item["dynamodb"]["ApproximateCreationDateTime"])
        to_firehose_item = {
            "action": item["eventName"],
            "keys": deserialize(item["dynamodb"]["Keys"]),
            "new_image": None,
            "old_image": None,
            "timestamp": timestamp,
            "timestamp_utc": datetime.utcfromtimestamp(timestamp),
        }

        if "NewImage" in item["dynamodb"]:
            to_firehose_item["new_image"] = deserialize(item["dynamodb"]["NewImage"])
        if "OldImage" in item["dynamodb"]:
            to_firehose_item["old_image"] = deserialize(item["dynamodb"]["OldImage"])

        firehose.put_record(
            DeliveryStreamName=os.environ["DELIVERY_STREAM_NAME"],
            Record={"Data": json.dumps(to_firehose_item, default=json_default) + "\n"},
        )


def deserialize(item):
    """
    {'fooId': {'S': 'bar1234'}} を {'fooId': 'bar1234'} に変換する
    """
    d = {}
    for key in item:
        d[key] = dynamodb_deserializer.deserialize(item[key])
    return d


def json_default(obj):
    try:
        return str(obj)
    except Exception as e:
        print(e)
        raise TypeError("Failed convert to str.")

デプロイ・テスト

(npx) cdk deployを実行して、スタックをデプロイします。

デプロイが成功したらDynamoDBのテーブル(sample-table)が作成されているので、テスト用の項目を追加していきます。

少し待つと、S3バケットにログが出力されていることが確認できました。

中身を開くとJSON Linesの形式で保存されています。actionや対象のkey、変更前後が記録されていていい感じです。サンプルとして載せておきます。

追加

{"action": "INSERT", "keys": {"Id": "1"}, "new_image": {"Id": "1", "Age": "20", "Name": "Suzuki"}, "old_image": null, "timestamp": 1696398445, "timestamp_utc": "2023-10-04 05:47:25"}
{"action": "INSERT", "keys": {"Id": "2"}, "new_image": {"Id": "2", "Age": "20", "Name": "Tanaka"}, "old_image": null, "timestamp": 1696398463, "timestamp_utc": "2023-10-04 05:47:43"}
{"action": "INSERT", "keys": {"Id": "3"}, "new_image": {"Id": "3", "Age": "30", "Name": "Sato"}, "old_image": null, "timestamp": 1696398479, "timestamp_utc": "2023-10-04 05:47:59"}

変更

{"action": "MODIFY", "keys": {"Id": "1"}, "new_image": {"Id": "1", "Age": "40", "Name": "Suzuki"}, "old_image": {"Id": "1", "Age": "20", "Name": "Suzuki"}, "timestamp": 1696472278, "timestamp_utc": "2023-10-05 02:17:58"}

削除

{"action": "REMOVE", "keys": {"Id": "3"}, "new_image": null, "old_image": {"Id": "3", "Age": "30", "Name": "Sato"}, "timestamp": 1696472283, "timestamp_utc": "2023-10-05 02:18:03"}

このデータがあれば何かあった時Athenaで確認して、変更履歴を追うことができそうですね。

まとめ

DynamoDBテーブルへの変更履歴を保存する仕組みをAWS CDKを使って実装してみました。CDKを使う方の参考になれば幸いです。

参考