DynamoDB のデータを Amazon Ion 形式で S3 にエクスポートし Athena からクエリする

DynamoDBのエクスポート機能でAmazon Ion形式でエクスポートしAthenaからクエリして、集計バッチとして利用する様子をご紹介します
2023.07.15

ども、大瀧です。
先日DynamoDBはバッチ処理よりストリーム処理が得意という記事が話題になりましたが、本ブログではバッチのアーキテクチャを紹介します!!

DynamoDBのエクスポート機能

DynamoDBには継続的バックアップ(PITR)で取得したスナップショットをAmazon S3にエクスポートする機能があり、ある時点のデータを元にしたバッチ処理をテーブルの読み込み容量を消費せずに実行できます。

S3にエクスポートするときのデータ形式はDynamoDB JSONとAmazon Ionテキスト形式の2つから選択できます。IonにはDynamoDBのデータ型を保持する仕組みが内包されているため、JSONやJavascript周りのデータ型変換に悩んだことのある方にはおすすめの形式です。

エクスポートの定期実行にEventBridge、エクスポートのステータス管理にはStep Functions、バッチのクエリにAthenaを利用しました。以下のようなアーキテクチャです。

1. DynamoDBの構成

まずは、DynamoDBのエクスポートしたいテーブルでポイントインタイムリカバリをオンにします。

これでエクスポートする準備ができました。

2. Step Functionsの構成

続いてエクスポートを実行するStepFunctionの処理を実装していきます。以下のブログ記事を参考にしました。

AWS Lambdaを利用せずにExportTableToPointInTime APIを直接呼び出し、エクスポートのジョブのステータスを定期的にチェックスするシンプルなステートマシンを作成します。

AWS管理コンソールのStep Functions管理画面から[ステートマシン] - [ステートマシンの作成]をクリックし、[コードでワークフローを記述]からステートマシン定義画面表示します。ステートマシン定義は以下の通りです。12行目でAmazon Ionを指定しています。

{
  "Comment": "State machine to export data from DynamoDB to S3",
  "StartAt": "callExport",
  "States": {
    "callExport": {
      "Type": "Task",
      "Next": "Wait1",
      "Parameters": {
        "TableArn": "<DynamoDBテーブルのARN>",
        "S3Bucket": "<エクスポート先S3バケット名>",
        "S3Prefix": "export",
        "ExportFormat": "ION"
      },
      "Resource": "arn:aws:states:::aws-sdk:dynamodb:exportTableToPointInTime",
      "Comment": "Export data from DynamoDB",
      "OutputPath": "$.ExportDescription"
    },
    "Wait1": {
      "Type": "Wait",
      "Seconds": 180,
      "Comment": "Wait for export finished",
      "Next": "checkExport"
    },
    "checkExport": {
      "Type": "Task",
      "Next": "checkExportStatus",
      "Parameters": {
        "MaxResults": 1,
        "TableArn": "<DynamoDBテーブルのARN>"
      },
      "Resource": "arn:aws:states:::aws-sdk:dynamodb:listExports",
      "OutputPath": "$.ExportSummaries[0]",
      "Comment": "Export result check"
    },
    "checkExportStatus": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.ExportStatus",
          "StringEquals": "COMPLETED",
          "Next": "ExportCompleted",
          "Comment": "If ExportStatus == COMPLETED then"
        },
        {
          "Variable": "$.ExportStatus",
          "StringEquals": "IN_PROGRESS",
          "Next": "Wait1",
          "Comment": "If ExportStatus == IN_PROGRESS then"
        }
      ],
      "Default": "ExportFailed",
      "Comment": "Check export status"
    },
    "ExportCompleted": {
      "Type": "Succeed",
      "Comment": "Export completed"
    },
    "ExportFailed": {
      "Type": "Fail",
      "Comment": "Export failed"
    }
  }
}

実行ロールは自動作成のCloudWatchLogsDeliveryFullAccessPolicyとXRayAccessPolicyに加えて、DynamoDBとS3にアクセスするポリシーを作成し、それぞれ付与します。

DynamoDBエクスポート用のポリシードキュメント例

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": "dynamodb:ExportTableToPointInTime",
            "Resource": "<DynamoDBテーブルのARN>"
        },
        {
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": "dynamodb:ListExports",
            "Resource": "*"
        }
    ]
}

S3書き込み用のポリシードキュメント例

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0"",
            "Effect": "Allow",
            "Action": [
                "s3:AbortMultipartUpload",
                "s3:PutObject",
                "s3:PutObjectAcl"
            ],
            "Resource": "<エクスポート先S3バケットのARN>/*"
        }
    ]
}

これでOKです。試しにステートマシンを手動で実行し、エクスポートが正しく完了することを確認しましょう。

S3バケットの export/AWSDynamoDB/[エクスポートID]/ フォルダ以下にエクスポートしたデータが保存されます。実データは data/ フォルダに配置されます。

3. EventBridgeの構成

EventBridgeでは、Step Functionsステートマシンを定期実行するスケジュールを作成します。EventBrdige管理画面のメニューから[スケジューラ] - [スケジュール]を選択、[スケジュールを作成]ボタンをクリックし、スケジュール作成画面を表示します。

スケジュール名やスケジュールのパターンをセットしましょう。ターゲットの選択画面で「テンプレート化されたターゲット」を選択、一覧から[AWS Step Functions StartExecution]を選択し、先ほど作成したステートマシンを選択します。

これで定期的にS3バケットへのエクスポートが実行されるようになりました。

4. Athenaの構成とS3の追加設定

ここまでのステップでS3バケットの export/AWSDynamoDB/[エクスポートID]/data/ 以下にIon形式のデータが格納されるようになりました。ファイルはこんな感じになります。

Ionテキスト形式の中身はこんな感じです。

5qnrxf6tqq45ndlomybdi7y3va.ion(GZip圧縮を展開したもの)

 $ion_1_0 {Item:{id:1234567890123456.,created_at:"2023-06-30T09:09:47.787017+0000",amount:1.}}
 $ion_1_0 {Item:{id:1234567890123456.,created_at:"2023-06-26T06:10:18.611177+0000",amount:1.}}
 $ion_1_0 {Item:{id:1234567890123456.,created_at:"2023-07-01T02:44:43.976155+0000",amount:1.}}

DynamoDBテーブルのアイテムを行区切りで Item アトリビュート下にそれぞれのアイテムの属性値が並ぶシンプルな形式です。

では、これらに対してAthenaからクエリしてみましょう。読み込みにはAmazon Ion Hive SerDeが利用できます。

CREATE TABLE文を実行してみましょう。いくつかポイントを解説します。

CREATE EXTERNAL TABLE IF NOT EXISTS `sample_table` (
  `id` decimal,
  `amount` decimal,
  `created_at` string
)
ROW FORMAT SERDE
 'com.amazon.ionhiveserde.IonHiveSerDe'
WITH SERDEPROPERTIES (
 'ion.id.path_extractor' = '(Item id)',
 'ion.amount.path_extractor' = '(Item amount)',
 'ion.created_at.path_extractor' = '(Item created_at)'
 )
STORED AS ION
LOCATION 's3://[S3バケット名]/export/AWSDynamoDB/[エクスポートID]/data/';
  • 2〜4行目: エクスポートの時点でIonの各カラムのデータ型は決まっていて、Athenaのデータ型もそれに従って指定します。ドキュメントに対応表があります。数値(N)が int ではなく decimal になるのがハマりポイントでしょうか(日本語ページだと「小数点」と訳されていますので、Englishで見るのがオススメです?
  • 9〜11行目: このブログ記事の最重要ポイントと言っても過言ではありません。エクスポートされるIonのデータは Item アトリビュート下なので、パスエクストラクタ機能で (Item [アトリビュート名])ion.[カラム名].path_extractor に指定することでカラムへのアクセスが可能になります

作成したテーブルに対して、例えば作成日時(created_at)を元に月次でamountの合計を集計するクエリ例を示します。

SELECT id, SUM(amount) as amount FROM "default"."sample_table"
WHERE from_iso8601_timestamp(created_at)
  BETWEEN from_iso8601_timestamp('2023-07-01T00:00:00.00')
  AND from_iso8601_timestamp('2023-08-01T00:00:00.00')
GROUP BY(id)

こんな感じのAthenaのクエリを活用して、様々な集計がはかどりそうです!

ちなみにこの仕組みは、デベキャンコミュニティのデベフィートという学習ポイントの集計に利用しています。コミュニティにはどなたでも参加できるので、ぜひ勉強してデベフィートをゲットし、集計し甲斐のあるデータをモリモリ生みだしてくださいね。

おまけ: シンボリックリンクの活用

先ほどのCREATE TABLE文では元データのフォルダを直接指定していましたが、パスにDynamoDBのエクスポートIDを含むため、毎回エクスポートIDを確認してテーブルを作りなおす手間がかかってしまいます。そこで、以下の記事を参考にシンボリックリンクを設定することで使いまわしの利くCREATE TABLE文にアレンジしてみました。

シンボリックリンクのファイルを以下の通り作成します。

s3://S3バケット名/current/symlink.txt

s3://[S3バケット名]/export/AWSDynamoDB/[エクスポートID]/data/

これに対応するようにCREATE TABLE文を変更します。14行目で STORED AS INPUTFORMAT にシンボリックリンクファイルとして読み取るSymlinkTextInputFormatを指定、16行目でシンボリックリンクファイルを配置したS3のパスをセットしています。

CREATE EXTERNAL TABLE IF NOT EXISTS `sample_table` (
  `id` decimal,
  `amount` decimal,
  `created_at` string
)
ROW FORMAT SERDE
 'com.amazon.ionhiveserde.IonHiveSerDe'
WITH SERDEPROPERTIES (
 'ion.id.path_extractor' = '(Item id)',
 'ion.amount.path_extractor' = '(Item amount)',
 'ion.created_at.path_extractor' = '(Item created_at)'
 )
STORED AS
  INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
  OUTPUTFORMAT 'com.amazon.ionhiveserde.formats.IonOutputFormat'
LOCATION 's3://[S3バケット名]/current/';

あとはStep Functionsの処理にシンボリックリンクファイルの書き換えを仕込めば、最新のエクスポートデータにクエリするための仕組みを自動化できます。例えば以下のLambda関数の実行をcheckExportStatusのChoicesのCOMPLETEDのNextに指定します。

import boto3
import json
import re

ddb = boto3.client('dynamodb')
s3 = boto3.resource('s3')

def lambda_handler(event, context):
    export = ddb.describe_export(ExportArn=event['ExportArn'])
    file_contents = 's3://' + export['ExportDescription']['S3Bucket'] + '/' + re.sub(r'manifest-summary.json', '', export['ExportDescription']['ExportManifest']) + 'data/'
    bucket = export['ExportDescription']['S3Bucket']
    key = 'current/symlink.txt'
    bucket_obj = s3.Object(bucket,key)
    response = bucket_.put(Body=file_contents)
    return response

まとめ

DynamoDBのエクスポート機能でAmazon Ion形式でエクスポートしAthenaからクエリする様子をご紹介しました。各サービスの機能を組み合わせるときは、実際にどういうデータ構造になっているのかが連携の鍵になると思うので、同じ事をしようとする誰かの参考になれば幸いです。

参考URL