ども、大瀧です。
先日DynamoDBはバッチ処理よりストリーム処理が得意という記事が話題になりましたが、本ブログではバッチのアーキテクチャを紹介します!!
DynamoDBのエクスポート機能
DynamoDBには継続的バックアップ(PITR)で取得したスナップショットをAmazon S3にエクスポートする機能があり、ある時点のデータを元にしたバッチ処理をテーブルの読み込み容量を消費せずに実行できます。
S3にエクスポートするときのデータ形式はDynamoDB JSONとAmazon Ionテキスト形式の2つから選択できます。IonにはDynamoDBのデータ型を保持する仕組みが内包されているため、JSONやJavascript周りのデータ型変換に悩んだことのある方にはおすすめの形式です。
エクスポートの定期実行にEventBridge、エクスポートのステータス管理にはStep Functions、バッチのクエリにAthenaを利用しました。以下のようなアーキテクチャです。
1. DynamoDBの構成
まずは、DynamoDBのエクスポートしたいテーブルでポイントインタイムリカバリをオンにします。
これでエクスポートする準備ができました。
2. Step Functionsの構成
続いてエクスポートを実行するStepFunctionの処理を実装していきます。以下のブログ記事を参考にしました。
AWS Lambdaを利用せずにExportTableToPointInTime APIを直接呼び出し、エクスポートのジョブのステータスを定期的にチェックスするシンプルなステートマシンを作成します。
AWS管理コンソールのStep Functions管理画面から[ステートマシン] - [ステートマシンの作成]をクリックし、[コードでワークフローを記述]からステートマシン定義画面表示します。ステートマシン定義は以下の通りです。12行目でAmazon Ionを指定しています。
{
"Comment": "State machine to export data from DynamoDB to S3",
"StartAt": "callExport",
"States": {
"callExport": {
"Type": "Task",
"Next": "Wait1",
"Parameters": {
"TableArn": "<DynamoDBテーブルのARN>",
"S3Bucket": "<エクスポート先S3バケット名>",
"S3Prefix": "export",
"ExportFormat": "ION"
},
"Resource": "arn:aws:states:::aws-sdk:dynamodb:exportTableToPointInTime",
"Comment": "Export data from DynamoDB",
"OutputPath": "$.ExportDescription"
},
"Wait1": {
"Type": "Wait",
"Seconds": 180,
"Comment": "Wait for export finished",
"Next": "checkExport"
},
"checkExport": {
"Type": "Task",
"Next": "checkExportStatus",
"Parameters": {
"MaxResults": 1,
"TableArn": "<DynamoDBテーブルのARN>"
},
"Resource": "arn:aws:states:::aws-sdk:dynamodb:listExports",
"OutputPath": "$.ExportSummaries[0]",
"Comment": "Export result check"
},
"checkExportStatus": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.ExportStatus",
"StringEquals": "COMPLETED",
"Next": "ExportCompleted",
"Comment": "If ExportStatus == COMPLETED then"
},
{
"Variable": "$.ExportStatus",
"StringEquals": "IN_PROGRESS",
"Next": "Wait1",
"Comment": "If ExportStatus == IN_PROGRESS then"
}
],
"Default": "ExportFailed",
"Comment": "Check export status"
},
"ExportCompleted": {
"Type": "Succeed",
"Comment": "Export completed"
},
"ExportFailed": {
"Type": "Fail",
"Comment": "Export failed"
}
}
}
実行ロールは自動作成のCloudWatchLogsDeliveryFullAccessPolicyとXRayAccessPolicyに加えて、DynamoDBとS3にアクセスするポリシーを作成し、それぞれ付与します。
DynamoDBエクスポート用のポリシードキュメント例
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": "dynamodb:ExportTableToPointInTime",
"Resource": "<DynamoDBテーブルのARN>"
},
{
"Sid": "VisualEditor1",
"Effect": "Allow",
"Action": "dynamodb:ListExports",
"Resource": "*"
}
]
}
S3書き込み用のポリシードキュメント例
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0"",
"Effect": "Allow",
"Action": [
"s3:AbortMultipartUpload",
"s3:PutObject",
"s3:PutObjectAcl"
],
"Resource": "<エクスポート先S3バケットのARN>/*"
}
]
}
これでOKです。試しにステートマシンを手動で実行し、エクスポートが正しく完了することを確認しましょう。
S3バケットの export/AWSDynamoDB/[エクスポートID]/
フォルダ以下にエクスポートしたデータが保存されます。実データは data/
フォルダに配置されます。
3. EventBridgeの構成
EventBridgeでは、Step Functionsステートマシンを定期実行するスケジュールを作成します。EventBrdige管理画面のメニューから[スケジューラ] - [スケジュール]を選択、[スケジュールを作成]ボタンをクリックし、スケジュール作成画面を表示します。
スケジュール名やスケジュールのパターンをセットしましょう。ターゲットの選択画面で「テンプレート化されたターゲット」を選択、一覧から[AWS Step Functions StartExecution]を選択し、先ほど作成したステートマシンを選択します。
これで定期的にS3バケットへのエクスポートが実行されるようになりました。
4. Athenaの構成とS3の追加設定
ここまでのステップでS3バケットの export/AWSDynamoDB/[エクスポートID]/data/
以下にIon形式のデータが格納されるようになりました。ファイルはこんな感じになります。
Ionテキスト形式の中身はこんな感じです。
5qnrxf6tqq45ndlomybdi7y3va.ion(GZip圧縮を展開したもの)
$ion_1_0 {Item:{id:1234567890123456.,created_at:"2023-06-30T09:09:47.787017+0000",amount:1.}}
$ion_1_0 {Item:{id:1234567890123456.,created_at:"2023-06-26T06:10:18.611177+0000",amount:1.}}
$ion_1_0 {Item:{id:1234567890123456.,created_at:"2023-07-01T02:44:43.976155+0000",amount:1.}}
DynamoDBテーブルのアイテムを行区切りで Item
アトリビュート下にそれぞれのアイテムの属性値が並ぶシンプルな形式です。
では、これらに対してAthenaからクエリしてみましょう。読み込みにはAmazon Ion Hive SerDeが利用できます。
CREATE TABLE文を実行してみましょう。いくつかポイントを解説します。
CREATE EXTERNAL TABLE IF NOT EXISTS `sample_table` (
`id` decimal,
`amount` decimal,
`created_at` string
)
ROW FORMAT SERDE
'com.amazon.ionhiveserde.IonHiveSerDe'
WITH SERDEPROPERTIES (
'ion.id.path_extractor' = '(Item id)',
'ion.amount.path_extractor' = '(Item amount)',
'ion.created_at.path_extractor' = '(Item created_at)'
)
STORED AS ION
LOCATION 's3://[S3バケット名]/export/AWSDynamoDB/[エクスポートID]/data/';
- 2〜4行目: エクスポートの時点でIonの各カラムのデータ型は決まっていて、Athenaのデータ型もそれに従って指定します。ドキュメントに対応表があります。数値(N)が
int
ではなくdecimal
になるのがハマりポイントでしょうか(日本語ページだと「小数点」と訳されていますので、Englishで見るのがオススメです? - 9〜11行目: このブログ記事の最重要ポイントと言っても過言ではありません。エクスポートされるIonのデータは
Item
アトリビュート下なので、パスエクストラクタ機能で(Item [アトリビュート名])
をion.[カラム名].path_extractor
に指定することでカラムへのアクセスが可能になります
作成したテーブルに対して、例えば作成日時(created_at
)を元に月次でamount
の合計を集計するクエリ例を示します。
SELECT id, SUM(amount) as amount FROM "default"."sample_table"
WHERE from_iso8601_timestamp(created_at)
BETWEEN from_iso8601_timestamp('2023-07-01T00:00:00.00')
AND from_iso8601_timestamp('2023-08-01T00:00:00.00')
GROUP BY(id)
こんな感じのAthenaのクエリを活用して、様々な集計がはかどりそうです!
ちなみにこの仕組みは、デベキャンコミュニティのデベフィートという学習ポイントの集計に利用しています。コミュニティにはどなたでも参加できるので、ぜひ勉強してデベフィートをゲットし、集計し甲斐のあるデータをモリモリ生みだしてくださいね。
おまけ: シンボリックリンクの活用
先ほどのCREATE TABLE文では元データのフォルダを直接指定していましたが、パスにDynamoDBのエクスポートIDを含むため、毎回エクスポートIDを確認してテーブルを作りなおす手間がかかってしまいます。そこで、以下の記事を参考にシンボリックリンクを設定することで使いまわしの利くCREATE TABLE文にアレンジしてみました。
シンボリックリンクのファイルを以下の通り作成します。
s3://S3バケット名/current/symlink.txt
s3://[S3バケット名]/export/AWSDynamoDB/[エクスポートID]/data/
これに対応するようにCREATE TABLE文を変更します。14行目で STORED AS INPUTFORMAT
にシンボリックリンクファイルとして読み取るSymlinkTextInputFormat
を指定、16行目でシンボリックリンクファイルを配置したS3のパスをセットしています。
CREATE EXTERNAL TABLE IF NOT EXISTS `sample_table` (
`id` decimal,
`amount` decimal,
`created_at` string
)
ROW FORMAT SERDE
'com.amazon.ionhiveserde.IonHiveSerDe'
WITH SERDEPROPERTIES (
'ion.id.path_extractor' = '(Item id)',
'ion.amount.path_extractor' = '(Item amount)',
'ion.created_at.path_extractor' = '(Item created_at)'
)
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
OUTPUTFORMAT 'com.amazon.ionhiveserde.formats.IonOutputFormat'
LOCATION 's3://[S3バケット名]/current/';
あとはStep Functionsの処理にシンボリックリンクファイルの書き換えを仕込めば、最新のエクスポートデータにクエリするための仕組みを自動化できます。例えば以下のLambda関数の実行をcheckExportStatusのChoicesのCOMPLETEDのNextに指定します。
import boto3
import json
import re
ddb = boto3.client('dynamodb')
s3 = boto3.resource('s3')
def lambda_handler(event, context):
export = ddb.describe_export(ExportArn=event['ExportArn'])
file_contents = 's3://' + export['ExportDescription']['S3Bucket'] + '/' + re.sub(r'manifest-summary.json', '', export['ExportDescription']['ExportManifest']) + 'data/'
bucket = export['ExportDescription']['S3Bucket']
key = 'current/symlink.txt'
bucket_obj = s3.Object(bucket,key)
response = bucket_.put(Body=file_contents)
return response
まとめ
DynamoDBのエクスポート機能でAmazon Ion形式でエクスポートしAthenaからクエリする様子をご紹介しました。各サービスの機能を組み合わせるときは、実際にどういうデータ構造になっているのかが連携の鍵になると思うので、同じ事をしようとする誰かの参考になれば幸いです。