
DynamoDBの変更履歴をKinesis Firehose経由でS3に保存してAthenaで見る
この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
不具合の調査などをしているとき、「あのタイミングのDynamoDBテーブルのデータが分かれば助かるのに……」と思ったことは無いでしょうか? 私はあります。 Lambdaで参照したDynamoDBテーブルのログデータを見ても良いのですが、下記のように影響範囲を調査する際は有効ではありません。(ログを調べれば分かるけど大変)
- この値が原因なのは分かった。いつからこの値になっていた……?
 
というわけで本記事では、DynamoDBテーブルの変更履歴をLambdaとKinesis Firehoseを経由してS3に格納し、Athenaで参照する仕組みを構築してみました。Partition Projectionを使って、Athenaのパーティション更新を自動化しています。
おすすめの方
- AWS SAMを使ってみたい方
 - DynamoDB Streamの情報をS3保存したい方
 - Kinesis FirehoseがS3に保存したデータをAthenaで見たい方
 - DynamoDBの変更履歴をAthenaで見たい方
 - Athenaのパーティション更新を自動化したい方
 
DynamoDBとLambdaとS3を作る
AWS SAMを使って作ります。
SAM Init
sam init \
    --runtime python3.7 \
    --name DynamoDB-Update-Item-Athena \
    --app-template hello-world
SAMテンプレート
DynamoDB・Lambda・Kinesis Firehose・S3バケットを定義しています。
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: DynamoDB-Update-Item-Athena
Resources:
  # 変更履歴を残したいDynamoDBテーブル
  TodoTable:
    Type: AWS::DynamoDB::Table
    Properties:
      TableName: todo-sample-table
      AttributeDefinitions:
        - AttributeName: todoId
          AttributeType: S
      KeySchema:
        - AttributeName: todoId
          KeyType: HASH
      BillingMode: PAY_PER_REQUEST
      StreamSpecification:
        StreamViewType: NEW_AND_OLD_IMAGES
  # DynamoDB Streamsで起動してKinesis FirehoseにPutするLambda
  ToFirehoseFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: hello_world/
      Handler: app.lambda_handler
      Runtime: python3.7
      Timeout: 30
      Environment:
        Variables:
          DELIVERY_STREAM_NAME: !Ref TodoTableUpdateDeliveryStream
      Policies:
        - arn:aws:iam::aws:policy/AmazonKinesisFirehoseFullAccess
      Events:
        DynamoDBStream:
          Type: DynamoDB
          Properties:
            Stream: !GetAtt TodoTable.StreamArn
            BatchSize: 100
            StartingPosition: TRIM_HORIZON
            BisectBatchOnFunctionError: true
  ToFirehoseFunctionLogGroup:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: !Sub /aws/lambda/${ToFirehoseFunction}
  # DynamoDBの変更履歴を保存するS3バケット(Athena検索対象)
  TodoTableUpdateBucket:
    Type: AWS::S3::Bucket
    DeletionPolicy: Retain
    Properties:
      AccessControl: Private
      BucketName: cm-fujii.genki-dynamodb-update-bucket
  # S3バケットに格納するKinesis Firehose
  TodoTableUpdateDeliveryStream:
    Type: AWS::KinesisFirehose::DeliveryStream
    Properties: 
      ExtendedS3DestinationConfiguration:
        BucketARN: !GetAtt TodoTableUpdateBucket.Arn
        BufferingHints:
          IntervalInSeconds: 60
          SizeInMBs: 50
        CompressionFormat: UNCOMPRESSED
        Prefix: dynamodb_item/
        ErrorOutputPrefix: error/
        RoleARN: !GetAtt TodoTableUpdateDeliveryStreamRole.Arn
  TodoTableUpdateDeliveryStreamRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service: firehose.amazonaws.com
            Action: sts:AssumeRole
      Policies:
        - PolicyName: "todo-table-update-firehose-delivery-policy"
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - s3:AbortMultipartUpload
                  - s3:GetBucketLocation
                  - s3:GetObject
                  - s3:ListBucket
                  - s3:ListBucketMultipartUploads
                  - s3:PutObject
                Resource: !Sub "${TodoTableUpdateBucket.Arn}/*"
なお、Kinesis FirehoseのIntervalInSecondsは実験用として短い60秒にしています。
実際に運用する場合はDynamoDBの変更頻度を考慮しつつ、下記を参考にデータ量・データ数・圧縮の有無を調整してみてください。
Lambdaコード
DynamoDB Streams経由で起動し、変更前と変更後のデータを保存しています。 このLambda関数では、DynamoDBにどのようなデータがあるのか?を意識していません。そのため、DynamoDBの項目を変更しても、このLambda自体の修正は不要です。
import json
import os
import boto3
from datetime import datetime
from boto3.dynamodb.types import TypeDeserializer
firehose = boto3.client('firehose')
dynamodb_deserializer = TypeDeserializer()
def lambda_handler(event, context):
    for item in event['Records']:
        timestamp = int(item['dynamodb']['ApproximateCreationDateTime'])
        to_firehose_item = {
            'action': item['eventName'],
            'keys': deserialize(item['dynamodb']['Keys']),
            'new_image': None,
            'old_image': None,
            'timestamp': timestamp,
            'timestamp_utc': datetime.utcfromtimestamp(timestamp)
        }
        if 'NewImage' in item['dynamodb']:
            to_firehose_item['new_image'] = deserialize(item['dynamodb']['NewImage'])
        if 'OldImage' in item['dynamodb']:
            to_firehose_item['old_image'] = deserialize(item['dynamodb']['OldImage'])
        firehose.put_record(
            DeliveryStreamName=os.environ['DELIVERY_STREAM_NAME'],
            Record={
                'Data': json.dumps(to_firehose_item, default=json_default) + '\n'
            }
        )
def deserialize(item):
    """
    {'fooId': {'S': 'bar1234'}} を {'fooId': 'bar1234'} に変換する
    """
    d = {}
    for key in item:
        d[key] = dynamodb_deserializer.deserialize(item[key])
    return d
def json_default(obj):
    try:
        return str(obj)
    except Exception as e:
        print(e)
        raise TypeError('Failed convert to str.')
デプロイする
sam build
sam package \
    --output-template-file packaged.yaml \
    --s3-bucket cm-fujii.genki-deploy
sam deploy \
    --template-file packaged.yaml \
    --stack-name DynamoDB-Update-Item-Athena-Stack \
    --capabilities CAPABILITY_NAMED_IAM \
    --no-fail-on-empty-changeset
DynamoDBテーブルを変更してみる
データ追加
ためしに2件のデータを追加します。
{
  "todoId": "t0001"
}
{
  "todoId": "t2222",
  "title": "iPhone 12 miniを買う"
}
データ変更
さきほどのデータを変更します。
{
  "todoId": "t0001",
  "title": "いますぐ寝る!!!",
  "done": false
}
{
  "todoId": "t2222",
  "title": "iPhone 12 miniを予約する",
  "done": false
}
データ削除
todoId:t0001のデータを削除します。
S3に格納された様子
2020年10月26日分として、バッチリ保存されています。
以下はデータ追加した際のファイルの内容です(一部抜粋)。
{"action": "INSERT", "keys": {"todoId": "t0001"}, "new_image": {"todoId": "t0001"}, "old_image": null, "timestamp": 1603679304, "timestamp_utc": "2020-10-26 02:28:24"}
Amazon Athenaでデータベースとテーブルを作成する
Kinesis FirehoseがS3バケットに格納したJSONファイルを見るためのAthenaを作成していきます。 せっかくなのでPartition Projectionを使って、パーティション更新を自動化します。
データベースの作成
下記でデータベースを作成します。
CREATE DATABASE dynamodb_update_database
テーブルの作成
下記でテーブルを作成します。
CREATE EXTERNAL TABLE IF NOT EXISTS dynamodb_update_database.todo_item_update_table (
  `action` string,
  `keys` struct<todoid:string>,
  `timestamp` bigint,
  `timestamp_utc` timestamp,
  `new_image` struct<todoid:string, title:string, done:boolean>,
  `old_image` struct<todoid:string, title:string, done:boolean> 
)
PARTITIONED BY (
  `dateday` string 
)
ROW FORMAT SERDE
  'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = '1'
)
LOCATION
  's3://cm-fujii.genki-dynamodb-update-bucket/dynamodb_item/'
TBLPROPERTIES (
  'has_encrypted_data'='false',
  'projection.enabled' = 'true',
  'projection.dateday.type' = 'date',
  'projection.dateday.range' = '2020/10/01,NOW',
  'projection.dateday.format' = 'yyyy/MM/dd',
  'projection.dateday.interval' = '1',
  'projection.dateday.interval.unit' = 'DAYS',
  'storage.location.template' = 's3://cm-fujii.genki-dynamodb-update-bucket/dynamodb_item/${dateday}'
);
できました!
試しにデータを確認する
この時点でお試し確認してみます。
10件取得
試しに下記クエリを実行します。
SELECT * FROM "dynamodb_update_database"."todo_item_update_table" WHERE dateday = '2020/10/26' ORDER BY timestamp LIMIT 10;
しっかり確認できました!
特定のデータを確認
todoid:t0001のデータのみを表示します。
SELECT * FROM "dynamodb_update_database"."todo_item_update_table" WHERE dateday = '2020/10/26' AND keys.todoid = 't0001' ORDER BY timestamp LIMIT 10;
最初(INSERT)のデータを確認
action:INSERTのデータのみを表示します。
SELECT * FROM "dynamodb_update_database"."todo_item_update_table" WHERE dateday = '2020/10/26' AND action = 'INSERT' ORDER BY timestamp LIMIT 10;
タイトルに特定文字を含む
new_imageのtitleにiPhoneを含むデータのみを表示します。
SELECT * FROM "dynamodb_update_database"."todo_item_update_table" WHERE dateday = '2020/10/26' AND new_image.title LIKE '%iPhone%' ORDER BY timestamp LIMIT 10;
Amazon Athenaのパーティション自動更新を確認する
テーブル作成時、Partition Projectionを使って、datedayのパーティションを作成しました。本来であれば、GlueやLambda等でパーティションを更新する必要がありますが、Partition Projectionを使うことで自動化しています。本当に自動化されているのかを確認します。
1日待ってDynamoDBテーブルを変更する
1日待ってからDynamoDBテーブルに1件のデータを追加し、1件のデータを変更します。
データ追加
{
  "todoId": "t8888",
  "title": "お米を買う",
  "done": false
}
データ変更
{
  "todoId": "t2222",
  "title": "iPhone 12 Pro Maxを予約する",
  "done": false
}
S3バケットの様子
2020年10月27日分として格納されました。
内容もバッチリです。
{"action": "INSERT", "keys": {"todoId": "t8888"}, "new_image": {"title": "\u304a\u7c73\u3092\u8cb7\u3046", "todoId": "t8888", "done": false}, "old_image": null, "timestamp": 1603763256, "timestamp_utc": "2020-10-27 01:47:36"}
{"action": "MODIFY", "keys": {"todoId": "t2222"}, "new_image": {"title": "iPhone 12 Pro Max\u3092\u4e88\u7d04\u3059\u308b", "todoId": "t2222", "done": false}, "old_image": {"title": "iPhone 12 mini\u3092\u4e88\u7d04\u3059\u308b", "todoId": "t2222", "done": false}, "timestamp": 1603763317, "timestamp_utc": "2020-10-27 01:48:37"}
Amazon Athenaで確認する
試しに下記クエリを実行します。
SELECT * FROM "dynamodb_update_database"."todo_item_update_table" WHERE dateday = '2020/10/27' ORDER BY timestamp LIMIT 10;
GlueやLambdaでパーティション更新はしていませんが、、Partition Projectionのおかげでしっかり確認できました!
DynamoDBテーブルの項目を増やしてみる
項目にmemoを追加
DynamoDBテーブルの新しい項目としてmemoを追加します。
{
  "todoId": "t2222",
  "title": "iPhone 12 Pro Maxを予約する",
  "done": false,
  "memo": "Apple Storeオンラインで予約する"
}
{
  "todoId": "t8888",
  "title": "お米を買う",
  "done": false,
  "memo": "あきたこまち!"
}
このままだとAmazon Athenaで確認できない
S3バケットのデータにはmemoが含まれていますが、Amazon Athenaでmemoは確認できません。
なぜなら、テーブル作成時の定義にmemoが含まれていないからです。
{"action": "MODIFY", "keys": {"todoId": "t2222"}, "new_image": {"memo": "Apple Store\u30aa\u30f3\u30e9\u30a4\u30f3\u3067\u4e88\u7d04\u3059\u308b", "title": "iPhone 12 Pro Max\u3092\u4e88\u7d04\u3059\u308b", "todoId": "t2222", "done": false}, "old_image": {"title": "iPhone 12 Pro Max\u3092\u4e88\u7d04\u3059\u308b", "todoId": "t2222", "done": false}, "timestamp": 1603763890, "timestamp_utc": "2020-10-27 01:58:10"}
{"action": "MODIFY", "keys": {"todoId": "t8888"}, "new_image": {"memo": "\u3042\u304d\u305f\u3053\u307e\u3061\uff01", "title": "\u304a\u7c73\u3092\u8cb7\u3046", "todoId": "t8888", "done": false}, "old_image": {"title": "\u304a\u7c73\u3092\u8cb7\u3046", "todoId": "t8888", "done": false}, "timestamp": 1603764087, "timestamp_utc": "2020-10-27 02:01:27"}
SELECT * FROM "dynamodb_update_database"."todo_item_update_table" WHERE dateday = '2020/10/27' ORDER BY timestamp LIMIT 10;
Amazon Athenaのテーブルを再作成する
Amazon Atenaのテーブルを削除し、memoを含めたテーブル定義を作成します。
テーブルの削除
DROP TABLE `todo_item_update_table`;
memoを含めたテーブルを作成
new_imageとold_imageのstructにmemo:stringを追加しています。
CREATE EXTERNAL TABLE IF NOT EXISTS dynamodb_update_database.todo_item_update_table (
  `action` string,
  `keys` struct<todoid:string>,
  `timestamp` bigint,
  `timestamp_utc` timestamp,
  `new_image` struct<todoid:string, title:string, done:boolean, memo:string>,
  `old_image` struct<todoid:string, title:string, done:boolean, memo:string> 
)
PARTITIONED BY (
  `dateday` string 
)
ROW FORMAT SERDE
  'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = '1'
)
LOCATION
  's3://cm-fujii.genki-dynamodb-update-bucket/dynamodb_item/'
TBLPROPERTIES (
  'has_encrypted_data'='false',
  'projection.enabled' = 'true',
  'projection.dateday.type' = 'date',
  'projection.dateday.range' = '2020/10/01,NOW',
  'projection.dateday.format' = 'yyyy/MM/dd',
  'projection.dateday.interval' = '1',
  'projection.dateday.interval.unit' = 'DAYS',
  'storage.location.template' = 's3://cm-fujii.genki-dynamodb-update-bucket/dynamodb_item/${dateday}'
);
Amazon Athenaでデータを確認する
SELECT * FROM "dynamodb_update_database"."todo_item_update_table" WHERE dateday = '2020/10/27' ORDER BY timestamp LIMIT 10;
memoが確認できました!
さいごに
DynamoDBのデータを更新したとき、DynamoDb Streams・Lambda・Kinesis Firehoseを経由してS3にファイルを格納し、Amazon Athenaで見る仕組みを作ってみました。 Kinesis FirehoseがS3に出力したデータをAmazon Athenaで見る際の参考にもなると思います。
参考
- DynamoDB ストリームから Lambda 関数を呼び出して Kinesis Firehose を経由して S3 バケットに出力してみた | Developers.IO
 - [新機能]Amazon Athena ルールベースでパーティションプルーニングを自動化する Partition Projection の徹底解説 | Developers.IO
 - Amazon Athena を使用したパーティション射影 - Amazon Athena
 - Amazon Kinesis Data Firehose 例 - Amazon Athena
 - DynamoDB - AWS Serverless Application Model
 - AWS::KinesisFirehose::DeliveryStream - AWS CloudFormation
 - Firehose — Boto3 Docs 1.16.3 documentation
 - boto3.dynamodb.types — Boto3 Docs 1.16.3 documentation
 - パーティション射影のサポートされている型 - Amazon Athena
 - Amazon Kinesis Data Firehose 例 - Amazon Athena
 - [新機能]Amazon Athena ルールベースでパーティションプルーニングを自動化する Partition Projection の徹底解説 | Developers.IO
 - Amazon Athenaを使ってJSONファイルを検索してみる | Developers.IO
 - Amazon Athena Nested-JSONのSESログファイルを検索する | Developers.IO
 - Amazon Athena のパフォーマンスチューニング Tips トップ 10 | Amazon Web Services ブログ
 - DDL ステートメント - Amazon Athena
 




















