S3バケットに出力したDynamoDBテーブルのデータ変更履歴をAthenaでクエリしてみた(AWS CDK)

2021.11.21

こんにちは、CX事業本部 IoT事業部の若槻です。

前回の下記エントリでは、DynamoDBテーブルのデータ変更履歴をS3バケットに出力する仕組みをAWS CDKで構成しました。

今回は、S3バケットに出力されたデータを、さらにAmazon Athenaでクエリする構成をAWS CDKで実装してみました。

やってみた

下記の赤枠の部分をAWS CDKで追加で作成します。

CDKコード

前回のエントリのコードに、ハイライトした部分を追記しています。

lib/aws-cdk-app-stack.ts

import * as cdk from '@aws-cdk/core';
import * as dynamodb from '@aws-cdk/aws-dynamodb';
import * as s3 from '@aws-cdk/aws-s3';
import * as kinesis from '@aws-cdk/aws-kinesis';
import * as kinesisfirehose from '@aws-cdk/aws-kinesisfirehose';
import * as kinesisDestinations from '@aws-cdk/aws-kinesisfirehose-destinations';
import * as glue from '@aws-cdk/aws-glue';
import * as athena from '@aws-cdk/aws-athena';

export class AwsCdkAppStack extends cdk.Stack {
  constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    const deviceTableDataChengeLogsBucket = new s3.Bucket(
      this,
      'deviceTableDataChengeLogs'
    );

    const deviceTableDataChangeDataStream = new kinesis.Stream(
      this,
      'deviceTableDataChangeStream',
      {
        shardCount: 1,
      }
    );

    new kinesisfirehose.DeliveryStream(
      this,
      'deviceTableDataChangeDeliveryStream',
      {
        sourceStream: deviceTableDataChangeDataStream,
        destinations: [
          new kinesisDestinations.S3Bucket(deviceTableDataChengeLogsBucket, {
            dataOutputPrefix: 'data/!{timestamp:yyyy/MM/dd/HH/}',
            errorOutputPrefix:
              'error/!{firehose:error-output-type}/!{timestamp:yyyy/MM/dd/HH/}',
          }),
        ],
      }
    );

    new dynamodb.Table(this, 'deviceTable', {
      tableName: 'deviceTable',
      partitionKey: { name: 'deviceId', type: dynamodb.AttributeType.STRING },
      billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
      kinesisStream: deviceTableDataChangeDataStream,
    });

    //Glueデータベース
    const glueDataBase = new glue.Database(this, 'glueDataBase', {
      databaseName: 'gluedatabase',
    });

    //Glueテーブル
    new glue.Table(this, 'glueTable', {
      bucket: deviceTableDataChengeLogsBucket,
      s3Prefix: 'data/',
      database: glueDataBase,
      tableName: 'gluetable',
      columns: [
        {
          name: 'eventName',
          type: glue.Schema.STRING,
        },
        {
          name: 'dynamodb',
          type: glue.Schema.struct([
            {
              name: 'ApproximateCreationDateTime',
              type: glue.Schema.BIG_INT,
            },
            {
              name: 'Keys',
              type: glue.Schema.struct([
                {
                  name: 'deviceId',
                  type: glue.Schema.struct([
                    {
                      name: 'S',
                      type: glue.Schema.STRING,
                    },
                  ]),
                },
              ]),
            },
            {
              name: 'NewImage',
              type: glue.Schema.struct([
                {
                  name: 'deviceId',
                  type: glue.Schema.struct([
                    {
                      name: 'S',
                      type: glue.Schema.STRING,
                    },
                  ]),
                },
                {
                  name: 'deviceName',
                  type: glue.Schema.struct([
                    {
                      name: 'S',
                      type: glue.Schema.STRING,
                    },
                  ]),
                },
              ]),
            },
            {
              name: 'OldImage',
              type: glue.Schema.struct([
                {
                  name: 'deviceId',
                  type: glue.Schema.struct([
                    {
                      name: 'S',
                      type: glue.Schema.STRING,
                    },
                  ]),
                },
                {
                  name: 'deviceName',
                  type: glue.Schema.struct([
                    {
                      name: 'S',
                      type: glue.Schema.STRING,
                    },
                  ]),
                },
              ]),
            },
          ]),
        },
      ],
      dataFormat: glue.DataFormat.JSON,
    });

    //Athenaクエリ結果出力先バケット
    const athenaQueryResultBucket = new s3.Bucket(
      this,
      'athenaQueryResultBucket'
    );

    //Athenaワークグループ
    new athena.CfnWorkGroup(this, 'athenaWorkGroup', {
      name: 'athenaWorkGroup',
      workGroupConfiguration: {
        resultConfiguration: {
          outputLocation: `s3://${athenaQueryResultBucket.bucketName}/data`,
        },
      },
    });
  }
}

DynamoSBのデータ変更履歴データ例は下記のようになります。このデータのスキーマをglue.Table()columnsで定義しています。今回はeventNamedynamodbの2スキーマのみとしています。

{
  "awsRegion": "ap-northeast-1",
  "eventID": "a77152e9-4a99-40b6-bd44-ecf4760123e3",
  "eventName": "INSERT",
  "userIdentity": null,
  "recordFormat": "application/json",
  "tableName": "deviceTable",
  "dynamodb": {
    "ApproximateCreationDateTime": 1637419833903,
    "Keys": {
      "deviceId": {
        "S": "d001"
      }
    },
    "NewImage": {
      "deviceId": {
        "S": "d001"
      },
      {
      "deviceName": {
        "S": "デバイス001"
      },
    },
    "SizeBytes": 24
  },
  "eventSource": "aws:dynamodb"
}

動作

Amazon AthenaのマネジメントコンソールでSELECTクエリを実行してみます。

SELECTを*で実行してみます。

SELECT * FROM "gluedatabase"."gluetable"

すると入れ子構造のJSONはdynamodb列のように入れ子構造のまま出力され、扱いにくい形式となってしまいます。

よって入れ子構造のJSON内の値は下記のように.でアクセスしてasでカスタム列としフラット化すれば扱いやすくなります。

SELECT
    eventname,
    dynamodb.approximatecreationdatetime as approximatecreationdatetime,
    dynamodb.newimage.deviceid.s as deviceid,
    dynamodb.newimage.devicename.s as devicename
FROM "gluedatabase"."gluetable"

注意点

AthenaではUnixTimeミリ秒のデータの方はBIGINTとする必要がある

ApproximateCreationDateTimeの値はUnixTimeミリ秒となるためBIGINT型として定義する必要があります。

            {
              name: 'ApproximateCreationDateTime',
              type: glue.Schema.BIG_INT,
            },

INT型として定義した場合は下記のHIVE_BAD_DATAエラーとなります。

これはUnixTimeミリ秒はAthenaのクエリエンジンであるPrestoのINT値の範囲を超えてしまうためです。

Presto の INT 値の範囲が -2147483648~2147483647 であるため、Athena は「49612833315」を解析できませんでした。

参考

以上