DynamoDBの変更履歴をKinesis Firehose経由でS3に保存してAthenaで見る

DynamoDBのデータを更新したとき、DynamoDb Streams・Lambda・Kinesis Firehoseを経由してS3にファイルを格納し、Amazon Athenaで見る仕組みを作ってみました。
2020.10.27

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

不具合の調査などをしているとき、「あのタイミングのDynamoDBテーブルのデータが分かれば助かるのに……」と思ったことは無いでしょうか? 私はあります。 Lambdaで参照したDynamoDBテーブルのログデータを見ても良いのですが、下記のように影響範囲を調査する際は有効ではありません。(ログを調べれば分かるけど大変)

  • この値が原因なのは分かった。いつからこの値になっていた……?

というわけで本記事では、DynamoDBテーブルの変更履歴をLambdaとKinesis Firehoseを経由してS3に格納し、Athenaで参照する仕組みを構築してみました。Partition Projectionを使って、Athenaのパーティション更新を自動化しています。

おすすめの方

  • AWS SAMを使ってみたい方
  • DynamoDB Streamの情報をS3保存したい方
  • Kinesis FirehoseがS3に保存したデータをAthenaで見たい方
  • DynamoDBの変更履歴をAthenaで見たい方
  • Athenaのパーティション更新を自動化したい方

DynamoDBとLambdaとS3を作る

AWS SAMを使って作ります。

SAM Init

sam init \
    --runtime python3.7 \
    --name DynamoDB-Update-Item-Athena \
    --app-template hello-world

SAMテンプレート

DynamoDB・Lambda・Kinesis Firehose・S3バケットを定義しています。

template.yaml

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: DynamoDB-Update-Item-Athena

Resources:
  # 変更履歴を残したいDynamoDBテーブル
  TodoTable:
    Type: AWS::DynamoDB::Table
    Properties:
      TableName: todo-sample-table
      AttributeDefinitions:
        - AttributeName: todoId
          AttributeType: S
      KeySchema:
        - AttributeName: todoId
          KeyType: HASH
      BillingMode: PAY_PER_REQUEST
      StreamSpecification:
        StreamViewType: NEW_AND_OLD_IMAGES

  # DynamoDB Streamsで起動してKinesis FirehoseにPutするLambda
  ToFirehoseFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: hello_world/
      Handler: app.lambda_handler
      Runtime: python3.7
      Timeout: 30
      Environment:
        Variables:
          DELIVERY_STREAM_NAME: !Ref TodoTableUpdateDeliveryStream
      Policies:
        - arn:aws:iam::aws:policy/AmazonKinesisFirehoseFullAccess
      Events:
        DynamoDBStream:
          Type: DynamoDB
          Properties:
            Stream: !GetAtt TodoTable.StreamArn
            BatchSize: 100
            StartingPosition: TRIM_HORIZON
            BisectBatchOnFunctionError: true

  ToFirehoseFunctionLogGroup:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: !Sub /aws/lambda/${ToFirehoseFunction}

  # DynamoDBの変更履歴を保存するS3バケット(Athena検索対象)
  TodoTableUpdateBucket:
    Type: AWS::S3::Bucket
    DeletionPolicy: Retain
    Properties:
      AccessControl: Private
      BucketName: cm-fujii.genki-dynamodb-update-bucket

  # S3バケットに格納するKinesis Firehose
  TodoTableUpdateDeliveryStream:
    Type: AWS::KinesisFirehose::DeliveryStream
    Properties: 
      ExtendedS3DestinationConfiguration:
        BucketARN: !GetAtt TodoTableUpdateBucket.Arn
        BufferingHints:
          IntervalInSeconds: 60
          SizeInMBs: 50
        CompressionFormat: UNCOMPRESSED
        Prefix: dynamodb_item/
        ErrorOutputPrefix: error/
        RoleARN: !GetAtt TodoTableUpdateDeliveryStreamRole.Arn

  TodoTableUpdateDeliveryStreamRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service: firehose.amazonaws.com
            Action: sts:AssumeRole
      Policies:
        - PolicyName: "todo-table-update-firehose-delivery-policy"
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - s3:AbortMultipartUpload
                  - s3:GetBucketLocation
                  - s3:GetObject
                  - s3:ListBucket
                  - s3:ListBucketMultipartUploads
                  - s3:PutObject
                Resource: !Sub "${TodoTableUpdateBucket.Arn}/*"

なお、Kinesis FirehoseのIntervalInSecondsは実験用として短い60秒にしています。 実際に運用する場合はDynamoDBの変更頻度を考慮しつつ、下記を参考にデータ量・データ数・圧縮の有無を調整してみてください。

Lambdaコード

DynamoDB Streams経由で起動し、変更前と変更後のデータを保存しています。 このLambda関数では、DynamoDBにどのようなデータがあるのか?を意識していません。そのため、DynamoDBの項目を変更しても、このLambda自体の修正は不要です。

app.py

import json
import os
import boto3
from datetime import datetime
from boto3.dynamodb.types import TypeDeserializer

firehose = boto3.client('firehose')
dynamodb_deserializer = TypeDeserializer()

def lambda_handler(event, context):

    for item in event['Records']:
        timestamp = int(item['dynamodb']['ApproximateCreationDateTime'])
        to_firehose_item = {
            'action': item['eventName'],
            'keys': deserialize(item['dynamodb']['Keys']),
            'new_image': None,
            'old_image': None,
            'timestamp': timestamp,
            'timestamp_utc': datetime.utcfromtimestamp(timestamp)
        }

        if 'NewImage' in item['dynamodb']:
            to_firehose_item['new_image'] = deserialize(item['dynamodb']['NewImage'])
        if 'OldImage' in item['dynamodb']:
            to_firehose_item['old_image'] = deserialize(item['dynamodb']['OldImage'])

        firehose.put_record(
            DeliveryStreamName=os.environ['DELIVERY_STREAM_NAME'],
            Record={
                'Data': json.dumps(to_firehose_item, default=json_default) + '\n'
            }
        )

def deserialize(item):
    """
    {'fooId': {'S': 'bar1234'}} を {'fooId': 'bar1234'} に変換する
    """
    d = {}
    for key in item:
        d[key] = dynamodb_deserializer.deserialize(item[key])
    return d

def json_default(obj):
    try:
        return str(obj)
    except Exception as e:
        print(e)
        raise TypeError('Failed convert to str.')

デプロイする

sam build

sam package \
    --output-template-file packaged.yaml \
    --s3-bucket cm-fujii.genki-deploy

sam deploy \
    --template-file packaged.yaml \
    --stack-name DynamoDB-Update-Item-Athena-Stack \
    --capabilities CAPABILITY_NAMED_IAM \
    --no-fail-on-empty-changeset

DynamoDBテーブルを変更してみる

データ追加

ためしに2件のデータを追加します。

{
  "todoId": "t0001"
}
{
  "todoId": "t2222",
  "title": "iPhone 12 miniを買う"
}

DynamoDBにデータを追加する

データ変更

さきほどのデータを変更します。

{
  "todoId": "t0001",
  "title": "いますぐ寝る!!!",
  "done": false
}
{
  "todoId": "t2222",
  "title": "iPhone 12 miniを予約する",
  "done": false
}

DynamoDBのデータを変更する

データ削除

todoId:t0001のデータを削除します。

DynamoDBのデータを削除

S3に格納された様子

2020年10月26日分として、バッチリ保存されています。

S3バケットの様子

以下はデータ追加した際のファイルの内容です(一部抜粋)。

{"action": "INSERT", "keys": {"todoId": "t0001"}, "new_image": {"todoId": "t0001"}, "old_image": null, "timestamp": 1603679304, "timestamp_utc": "2020-10-26 02:28:24"}

Amazon Athenaでデータベースとテーブルを作成する

Kinesis FirehoseがS3バケットに格納したJSONファイルを見るためのAthenaを作成していきます。 せっかくなのでPartition Projectionを使って、パーティション更新を自動化します。

データベースの作成

下記でデータベースを作成します。

CREATE DATABASE dynamodb_update_database

テーブルの作成

下記でテーブルを作成します。

CREATE EXTERNAL TABLE IF NOT EXISTS dynamodb_update_database.todo_item_update_table (
  `action` string,
  `keys` struct<todoid:string>,
  `timestamp` bigint,
  `timestamp_utc` timestamp,
  `new_image` struct<todoid:string, title:string, done:boolean>,
  `old_image` struct<todoid:string, title:string, done:boolean> 
)
PARTITIONED BY (
  `dateday` string 
)
ROW FORMAT SERDE
  'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = '1'
)
LOCATION
  's3://cm-fujii.genki-dynamodb-update-bucket/dynamodb_item/'
TBLPROPERTIES (
  'has_encrypted_data'='false',
  'projection.enabled' = 'true',
  'projection.dateday.type' = 'date',
  'projection.dateday.range' = '2020/10/01,NOW',
  'projection.dateday.format' = 'yyyy/MM/dd',
  'projection.dateday.interval' = '1',
  'projection.dateday.interval.unit' = 'DAYS',
  'storage.location.template' = 's3://cm-fujii.genki-dynamodb-update-bucket/dynamodb_item/${dateday}'
);

できました!

Athenaでテーブルを作成した

試しにデータを確認する

この時点でお試し確認してみます。

10件取得

試しに下記クエリを実行します。

SELECT * FROM "dynamodb_update_database"."todo_item_update_table"
WHERE dateday = '2020/10/26'
ORDER BY timestamp
LIMIT 10;

しっかり確認できました!

クエリ実行結果

特定のデータを確認

todoid:t0001のデータのみを表示します。

SELECT * FROM "dynamodb_update_database"."todo_item_update_table"
WHERE dateday = '2020/10/26' AND keys.todoid = 't0001'
ORDER BY timestamp
LIMIT 10;

クエリ実行結果

最初(INSERT)のデータを確認

action:INSERTのデータのみを表示します。

SELECT * FROM "dynamodb_update_database"."todo_item_update_table"
WHERE dateday = '2020/10/26' AND action = 'INSERT'
ORDER BY timestamp
LIMIT 10;

クエリ実行結果

タイトルに特定文字を含む

new_imagetitleiPhoneを含むデータのみを表示します。

SELECT * FROM "dynamodb_update_database"."todo_item_update_table"
WHERE dateday = '2020/10/26' AND new_image.title LIKE '%iPhone%'
ORDER BY timestamp
LIMIT 10;

クエリ実行結果

Amazon Athenaのパーティション自動更新を確認する

テーブル作成時、Partition Projectionを使って、datedayのパーティションを作成しました。本来であれば、GlueやLambda等でパーティションを更新する必要がありますが、Partition Projectionを使うことで自動化しています。本当に自動化されているのかを確認します。

1日待ってDynamoDBテーブルを変更する

1日待ってからDynamoDBテーブルに1件のデータを追加し、1件のデータを変更します。

データ追加

{
  "todoId": "t8888",
  "title": "お米を買う",
  "done": false
}

データ変更

{
  "todoId": "t2222",
  "title": "iPhone 12 Pro Maxを予約する",
  "done": false
}

S3バケットの様子

2020年10月27日分として格納されました。

S3バケットの様子

内容もバッチリです。

{"action": "INSERT", "keys": {"todoId": "t8888"}, "new_image": {"title": "\u304a\u7c73\u3092\u8cb7\u3046", "todoId": "t8888", "done": false}, "old_image": null, "timestamp": 1603763256, "timestamp_utc": "2020-10-27 01:47:36"}
{"action": "MODIFY", "keys": {"todoId": "t2222"}, "new_image": {"title": "iPhone 12 Pro Max\u3092\u4e88\u7d04\u3059\u308b", "todoId": "t2222", "done": false}, "old_image": {"title": "iPhone 12 mini\u3092\u4e88\u7d04\u3059\u308b", "todoId": "t2222", "done": false}, "timestamp": 1603763317, "timestamp_utc": "2020-10-27 01:48:37"}

Amazon Athenaで確認する

試しに下記クエリを実行します。

SELECT * FROM "dynamodb_update_database"."todo_item_update_table"
WHERE dateday = '2020/10/27'
ORDER BY timestamp
LIMIT 10;

GlueやLambdaでパーティション更新はしていませんが、、Partition Projectionのおかげでしっかり確認できました!

クエリ実行結果

DynamoDBテーブルの項目を増やしてみる

項目にmemoを追加

DynamoDBテーブルの新しい項目としてmemoを追加します。

{
  "todoId": "t2222",
  "title": "iPhone 12 Pro Maxを予約する",
  "done": false,
  "memo": "Apple Storeオンラインで予約する"
}
{
  "todoId": "t8888",
  "title": "お米を買う",
  "done": false,
  "memo": "あきたこまち!"
}

DynamoDBのデータを変更

このままだとAmazon Athenaで確認できない

S3バケットのデータにはmemoが含まれていますが、Amazon Athenaでmemoは確認できません。 なぜなら、テーブル作成時の定義にmemoが含まれていないからです。

{"action": "MODIFY", "keys": {"todoId": "t2222"}, "new_image": {"memo": "Apple Store\u30aa\u30f3\u30e9\u30a4\u30f3\u3067\u4e88\u7d04\u3059\u308b", "title": "iPhone 12 Pro Max\u3092\u4e88\u7d04\u3059\u308b", "todoId": "t2222", "done": false}, "old_image": {"title": "iPhone 12 Pro Max\u3092\u4e88\u7d04\u3059\u308b", "todoId": "t2222", "done": false}, "timestamp": 1603763890, "timestamp_utc": "2020-10-27 01:58:10"}
{"action": "MODIFY", "keys": {"todoId": "t8888"}, "new_image": {"memo": "\u3042\u304d\u305f\u3053\u307e\u3061\uff01", "title": "\u304a\u7c73\u3092\u8cb7\u3046", "todoId": "t8888", "done": false}, "old_image": {"title": "\u304a\u7c73\u3092\u8cb7\u3046", "todoId": "t8888", "done": false}, "timestamp": 1603764087, "timestamp_utc": "2020-10-27 02:01:27"}
SELECT * FROM "dynamodb_update_database"."todo_item_update_table"
WHERE dateday = '2020/10/27'
ORDER BY timestamp
LIMIT 10;

クエリ実行結果

Amazon Athenaのテーブルを再作成する

Amazon Atenaのテーブルを削除し、memoを含めたテーブル定義を作成します。

テーブルの削除

DROP TABLE `todo_item_update_table`;

memoを含めたテーブルを作成

new_imageold_imagestructmemo:stringを追加しています。

CREATE EXTERNAL TABLE IF NOT EXISTS dynamodb_update_database.todo_item_update_table (
  `action` string,
  `keys` struct<todoid:string>,
  `timestamp` bigint,
  `timestamp_utc` timestamp,
  `new_image` struct<todoid:string, title:string, done:boolean, memo:string>,
  `old_image` struct<todoid:string, title:string, done:boolean, memo:string> 
)
PARTITIONED BY (
  `dateday` string 
)
ROW FORMAT SERDE
  'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = '1'
)
LOCATION
  's3://cm-fujii.genki-dynamodb-update-bucket/dynamodb_item/'
TBLPROPERTIES (
  'has_encrypted_data'='false',
  'projection.enabled' = 'true',
  'projection.dateday.type' = 'date',
  'projection.dateday.range' = '2020/10/01,NOW',
  'projection.dateday.format' = 'yyyy/MM/dd',
  'projection.dateday.interval' = '1',
  'projection.dateday.interval.unit' = 'DAYS',
  'storage.location.template' = 's3://cm-fujii.genki-dynamodb-update-bucket/dynamodb_item/${dateday}'
);

Amazon Athenaでデータを確認する

SELECT * FROM "dynamodb_update_database"."todo_item_update_table"
WHERE dateday = '2020/10/27'
ORDER BY timestamp
LIMIT 10;

memoが確認できました!

クエリ実行結果

さいごに

DynamoDBのデータを更新したとき、DynamoDb Streams・Lambda・Kinesis Firehoseを経由してS3にファイルを格納し、Amazon Athenaで見る仕組みを作ってみました。 Kinesis FirehoseがS3に出力したデータをAmazon Athenaで見る際の参考にもなると思います。

参考