この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
不具合の調査などをしているとき、「あのタイミングのDynamoDBテーブルのデータが分かれば助かるのに……」と思ったことは無いでしょうか? 私はあります。 Lambdaで参照したDynamoDBテーブルのログデータを見ても良いのですが、下記のように影響範囲を調査する際は有効ではありません。(ログを調べれば分かるけど大変)
- この値が原因なのは分かった。いつからこの値になっていた……?
というわけで本記事では、DynamoDBテーブルの変更履歴をLambdaとKinesis Firehoseを経由してS3に格納し、Athenaで参照する仕組みを構築してみました。Partition Projectionを使って、Athenaのパーティション更新を自動化しています。
おすすめの方
- AWS SAMを使ってみたい方
- DynamoDB Streamの情報をS3保存したい方
- Kinesis FirehoseがS3に保存したデータをAthenaで見たい方
- DynamoDBの変更履歴をAthenaで見たい方
- Athenaのパーティション更新を自動化したい方
DynamoDBとLambdaとS3を作る
AWS SAMを使って作ります。
SAM Init
sam init \
--runtime python3.7 \
--name DynamoDB-Update-Item-Athena \
--app-template hello-world
SAMテンプレート
DynamoDB・Lambda・Kinesis Firehose・S3バケットを定義しています。
template.yaml
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: DynamoDB-Update-Item-Athena
Resources:
# 変更履歴を残したいDynamoDBテーブル
TodoTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: todo-sample-table
AttributeDefinitions:
- AttributeName: todoId
AttributeType: S
KeySchema:
- AttributeName: todoId
KeyType: HASH
BillingMode: PAY_PER_REQUEST
StreamSpecification:
StreamViewType: NEW_AND_OLD_IMAGES
# DynamoDB Streamsで起動してKinesis FirehoseにPutするLambda
ToFirehoseFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: hello_world/
Handler: app.lambda_handler
Runtime: python3.7
Timeout: 30
Environment:
Variables:
DELIVERY_STREAM_NAME: !Ref TodoTableUpdateDeliveryStream
Policies:
- arn:aws:iam::aws:policy/AmazonKinesisFirehoseFullAccess
Events:
DynamoDBStream:
Type: DynamoDB
Properties:
Stream: !GetAtt TodoTable.StreamArn
BatchSize: 100
StartingPosition: TRIM_HORIZON
BisectBatchOnFunctionError: true
ToFirehoseFunctionLogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: !Sub /aws/lambda/${ToFirehoseFunction}
# DynamoDBの変更履歴を保存するS3バケット(Athena検索対象)
TodoTableUpdateBucket:
Type: AWS::S3::Bucket
DeletionPolicy: Retain
Properties:
AccessControl: Private
BucketName: cm-fujii.genki-dynamodb-update-bucket
# S3バケットに格納するKinesis Firehose
TodoTableUpdateDeliveryStream:
Type: AWS::KinesisFirehose::DeliveryStream
Properties:
ExtendedS3DestinationConfiguration:
BucketARN: !GetAtt TodoTableUpdateBucket.Arn
BufferingHints:
IntervalInSeconds: 60
SizeInMBs: 50
CompressionFormat: UNCOMPRESSED
Prefix: dynamodb_item/
ErrorOutputPrefix: error/
RoleARN: !GetAtt TodoTableUpdateDeliveryStreamRole.Arn
TodoTableUpdateDeliveryStreamRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service: firehose.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyName: "todo-table-update-firehose-delivery-policy"
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- s3:AbortMultipartUpload
- s3:GetBucketLocation
- s3:GetObject
- s3:ListBucket
- s3:ListBucketMultipartUploads
- s3:PutObject
Resource: !Sub "${TodoTableUpdateBucket.Arn}/*"
なお、Kinesis FirehoseのIntervalInSeconds
は実験用として短い60秒にしています。
実際に運用する場合はDynamoDBの変更頻度を考慮しつつ、下記を参考にデータ量・データ数・圧縮の有無を調整してみてください。
Lambdaコード
DynamoDB Streams経由で起動し、変更前と変更後のデータを保存しています。 このLambda関数では、DynamoDBにどのようなデータがあるのか?を意識していません。そのため、DynamoDBの項目を変更しても、このLambda自体の修正は不要です。
app.py
import json
import os
import boto3
from datetime import datetime
from boto3.dynamodb.types import TypeDeserializer
firehose = boto3.client('firehose')
dynamodb_deserializer = TypeDeserializer()
def lambda_handler(event, context):
for item in event['Records']:
timestamp = int(item['dynamodb']['ApproximateCreationDateTime'])
to_firehose_item = {
'action': item['eventName'],
'keys': deserialize(item['dynamodb']['Keys']),
'new_image': None,
'old_image': None,
'timestamp': timestamp,
'timestamp_utc': datetime.utcfromtimestamp(timestamp)
}
if 'NewImage' in item['dynamodb']:
to_firehose_item['new_image'] = deserialize(item['dynamodb']['NewImage'])
if 'OldImage' in item['dynamodb']:
to_firehose_item['old_image'] = deserialize(item['dynamodb']['OldImage'])
firehose.put_record(
DeliveryStreamName=os.environ['DELIVERY_STREAM_NAME'],
Record={
'Data': json.dumps(to_firehose_item, default=json_default) + '\n'
}
)
def deserialize(item):
"""
{'fooId': {'S': 'bar1234'}} を {'fooId': 'bar1234'} に変換する
"""
d = {}
for key in item:
d[key] = dynamodb_deserializer.deserialize(item[key])
return d
def json_default(obj):
try:
return str(obj)
except Exception as e:
print(e)
raise TypeError('Failed convert to str.')
デプロイする
sam build
sam package \
--output-template-file packaged.yaml \
--s3-bucket cm-fujii.genki-deploy
sam deploy \
--template-file packaged.yaml \
--stack-name DynamoDB-Update-Item-Athena-Stack \
--capabilities CAPABILITY_NAMED_IAM \
--no-fail-on-empty-changeset
DynamoDBテーブルを変更してみる
データ追加
ためしに2件のデータを追加します。
{
"todoId": "t0001"
}
{
"todoId": "t2222",
"title": "iPhone 12 miniを買う"
}
データ変更
さきほどのデータを変更します。
{
"todoId": "t0001",
"title": "いますぐ寝る!!!",
"done": false
}
{
"todoId": "t2222",
"title": "iPhone 12 miniを予約する",
"done": false
}
データ削除
todoId:t0001
のデータを削除します。
S3に格納された様子
2020年10月26日分として、バッチリ保存されています。
以下はデータ追加した際のファイルの内容です(一部抜粋)。
{"action": "INSERT", "keys": {"todoId": "t0001"}, "new_image": {"todoId": "t0001"}, "old_image": null, "timestamp": 1603679304, "timestamp_utc": "2020-10-26 02:28:24"}
Amazon Athenaでデータベースとテーブルを作成する
Kinesis FirehoseがS3バケットに格納したJSONファイルを見るためのAthenaを作成していきます。 せっかくなのでPartition Projectionを使って、パーティション更新を自動化します。
データベースの作成
下記でデータベースを作成します。
CREATE DATABASE dynamodb_update_database
テーブルの作成
下記でテーブルを作成します。
CREATE EXTERNAL TABLE IF NOT EXISTS dynamodb_update_database.todo_item_update_table (
`action` string,
`keys` struct<todoid:string>,
`timestamp` bigint,
`timestamp_utc` timestamp,
`new_image` struct<todoid:string, title:string, done:boolean>,
`old_image` struct<todoid:string, title:string, done:boolean>
)
PARTITIONED BY (
`dateday` string
)
ROW FORMAT SERDE
'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = '1'
)
LOCATION
's3://cm-fujii.genki-dynamodb-update-bucket/dynamodb_item/'
TBLPROPERTIES (
'has_encrypted_data'='false',
'projection.enabled' = 'true',
'projection.dateday.type' = 'date',
'projection.dateday.range' = '2020/10/01,NOW',
'projection.dateday.format' = 'yyyy/MM/dd',
'projection.dateday.interval' = '1',
'projection.dateday.interval.unit' = 'DAYS',
'storage.location.template' = 's3://cm-fujii.genki-dynamodb-update-bucket/dynamodb_item/${dateday}'
);
できました!
試しにデータを確認する
この時点でお試し確認してみます。
10件取得
試しに下記クエリを実行します。
SELECT * FROM "dynamodb_update_database"."todo_item_update_table"
WHERE dateday = '2020/10/26'
ORDER BY timestamp
LIMIT 10;
しっかり確認できました!
特定のデータを確認
todoid:t0001
のデータのみを表示します。
SELECT * FROM "dynamodb_update_database"."todo_item_update_table"
WHERE dateday = '2020/10/26' AND keys.todoid = 't0001'
ORDER BY timestamp
LIMIT 10;
最初(INSERT)のデータを確認
action:INSERT
のデータのみを表示します。
SELECT * FROM "dynamodb_update_database"."todo_item_update_table"
WHERE dateday = '2020/10/26' AND action = 'INSERT'
ORDER BY timestamp
LIMIT 10;
タイトルに特定文字を含む
new_image
のtitle
にiPhone
を含むデータのみを表示します。
SELECT * FROM "dynamodb_update_database"."todo_item_update_table"
WHERE dateday = '2020/10/26' AND new_image.title LIKE '%iPhone%'
ORDER BY timestamp
LIMIT 10;
Amazon Athenaのパーティション自動更新を確認する
テーブル作成時、Partition Projectionを使って、dateday
のパーティションを作成しました。本来であれば、GlueやLambda等でパーティションを更新する必要がありますが、Partition Projectionを使うことで自動化しています。本当に自動化されているのかを確認します。
1日待ってDynamoDBテーブルを変更する
1日待ってからDynamoDBテーブルに1件のデータを追加し、1件のデータを変更します。
データ追加
{
"todoId": "t8888",
"title": "お米を買う",
"done": false
}
データ変更
{
"todoId": "t2222",
"title": "iPhone 12 Pro Maxを予約する",
"done": false
}
S3バケットの様子
2020年10月27日分として格納されました。
内容もバッチリです。
{"action": "INSERT", "keys": {"todoId": "t8888"}, "new_image": {"title": "\u304a\u7c73\u3092\u8cb7\u3046", "todoId": "t8888", "done": false}, "old_image": null, "timestamp": 1603763256, "timestamp_utc": "2020-10-27 01:47:36"}
{"action": "MODIFY", "keys": {"todoId": "t2222"}, "new_image": {"title": "iPhone 12 Pro Max\u3092\u4e88\u7d04\u3059\u308b", "todoId": "t2222", "done": false}, "old_image": {"title": "iPhone 12 mini\u3092\u4e88\u7d04\u3059\u308b", "todoId": "t2222", "done": false}, "timestamp": 1603763317, "timestamp_utc": "2020-10-27 01:48:37"}
Amazon Athenaで確認する
試しに下記クエリを実行します。
SELECT * FROM "dynamodb_update_database"."todo_item_update_table"
WHERE dateday = '2020/10/27'
ORDER BY timestamp
LIMIT 10;
GlueやLambdaでパーティション更新はしていませんが、、Partition Projectionのおかげでしっかり確認できました!
DynamoDBテーブルの項目を増やしてみる
項目にmemoを追加
DynamoDBテーブルの新しい項目としてmemo
を追加します。
{
"todoId": "t2222",
"title": "iPhone 12 Pro Maxを予約する",
"done": false,
"memo": "Apple Storeオンラインで予約する"
}
{
"todoId": "t8888",
"title": "お米を買う",
"done": false,
"memo": "あきたこまち!"
}
このままだとAmazon Athenaで確認できない
S3バケットのデータにはmemo
が含まれていますが、Amazon Athenaでmemo
は確認できません。
なぜなら、テーブル作成時の定義にmemo
が含まれていないからです。
{"action": "MODIFY", "keys": {"todoId": "t2222"}, "new_image": {"memo": "Apple Store\u30aa\u30f3\u30e9\u30a4\u30f3\u3067\u4e88\u7d04\u3059\u308b", "title": "iPhone 12 Pro Max\u3092\u4e88\u7d04\u3059\u308b", "todoId": "t2222", "done": false}, "old_image": {"title": "iPhone 12 Pro Max\u3092\u4e88\u7d04\u3059\u308b", "todoId": "t2222", "done": false}, "timestamp": 1603763890, "timestamp_utc": "2020-10-27 01:58:10"}
{"action": "MODIFY", "keys": {"todoId": "t8888"}, "new_image": {"memo": "\u3042\u304d\u305f\u3053\u307e\u3061\uff01", "title": "\u304a\u7c73\u3092\u8cb7\u3046", "todoId": "t8888", "done": false}, "old_image": {"title": "\u304a\u7c73\u3092\u8cb7\u3046", "todoId": "t8888", "done": false}, "timestamp": 1603764087, "timestamp_utc": "2020-10-27 02:01:27"}
SELECT * FROM "dynamodb_update_database"."todo_item_update_table"
WHERE dateday = '2020/10/27'
ORDER BY timestamp
LIMIT 10;
Amazon Athenaのテーブルを再作成する
Amazon Atenaのテーブルを削除し、memo
を含めたテーブル定義を作成します。
テーブルの削除
DROP TABLE `todo_item_update_table`;
memoを含めたテーブルを作成
new_image
とold_image
のstruct
にmemo:string
を追加しています。
CREATE EXTERNAL TABLE IF NOT EXISTS dynamodb_update_database.todo_item_update_table (
`action` string,
`keys` struct<todoid:string>,
`timestamp` bigint,
`timestamp_utc` timestamp,
`new_image` struct<todoid:string, title:string, done:boolean, memo:string>,
`old_image` struct<todoid:string, title:string, done:boolean, memo:string>
)
PARTITIONED BY (
`dateday` string
)
ROW FORMAT SERDE
'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = '1'
)
LOCATION
's3://cm-fujii.genki-dynamodb-update-bucket/dynamodb_item/'
TBLPROPERTIES (
'has_encrypted_data'='false',
'projection.enabled' = 'true',
'projection.dateday.type' = 'date',
'projection.dateday.range' = '2020/10/01,NOW',
'projection.dateday.format' = 'yyyy/MM/dd',
'projection.dateday.interval' = '1',
'projection.dateday.interval.unit' = 'DAYS',
'storage.location.template' = 's3://cm-fujii.genki-dynamodb-update-bucket/dynamodb_item/${dateday}'
);
Amazon Athenaでデータを確認する
SELECT * FROM "dynamodb_update_database"."todo_item_update_table"
WHERE dateday = '2020/10/27'
ORDER BY timestamp
LIMIT 10;
memo
が確認できました!
さいごに
DynamoDBのデータを更新したとき、DynamoDb Streams・Lambda・Kinesis Firehoseを経由してS3にファイルを格納し、Amazon Athenaで見る仕組みを作ってみました。 Kinesis FirehoseがS3に出力したデータをAmazon Athenaで見る際の参考にもなると思います。
参考
- DynamoDB ストリームから Lambda 関数を呼び出して Kinesis Firehose を経由して S3 バケットに出力してみた | Developers.IO
- [新機能]Amazon Athena ルールベースでパーティションプルーニングを自動化する Partition Projection の徹底解説 | Developers.IO
- Amazon Athena を使用したパーティション射影 - Amazon Athena
- Amazon Kinesis Data Firehose 例 - Amazon Athena
- DynamoDB - AWS Serverless Application Model
- AWS::KinesisFirehose::DeliveryStream - AWS CloudFormation
- Firehose — Boto3 Docs 1.16.3 documentation
- boto3.dynamodb.types — Boto3 Docs 1.16.3 documentation
- パーティション射影のサポートされている型 - Amazon Athena
- Amazon Kinesis Data Firehose 例 - Amazon Athena
- [新機能]Amazon Athena ルールベースでパーティションプルーニングを自動化する Partition Projection の徹底解説 | Developers.IO
- Amazon Athenaを使ってJSONファイルを検索してみる | Developers.IO
- Amazon Athena Nested-JSONのSESログファイルを検索する | Developers.IO
- Amazon Athena のパフォーマンスチューニング Tips トップ 10 | Amazon Web Services ブログ
- DDL ステートメント - Amazon Athena