この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
こんにちは、CX事業本部 IoT事業部の若槻です。
前回の下記エントリでは、DynamoDBテーブルのデータ変更履歴をS3バケットに出力する仕組みをAWS CDKで構成しました。
今回は、S3バケットに出力されたデータを、さらにAmazon Athenaでクエリする構成をAWS CDKで実装してみました。
やってみた
下記の赤枠の部分をAWS CDKで追加で作成します。
CDKコード
前回のエントリのコードに、ハイライトした部分を追記しています。
lib/aws-cdk-app-stack.ts
import * as cdk from '@aws-cdk/core';
import * as dynamodb from '@aws-cdk/aws-dynamodb';
import * as s3 from '@aws-cdk/aws-s3';
import * as kinesis from '@aws-cdk/aws-kinesis';
import * as kinesisfirehose from '@aws-cdk/aws-kinesisfirehose';
import * as kinesisDestinations from '@aws-cdk/aws-kinesisfirehose-destinations';
import * as glue from '@aws-cdk/aws-glue';
import * as athena from '@aws-cdk/aws-athena';
export class AwsCdkAppStack extends cdk.Stack {
constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);
const deviceTableDataChengeLogsBucket = new s3.Bucket(
this,
'deviceTableDataChengeLogs'
);
const deviceTableDataChangeDataStream = new kinesis.Stream(
this,
'deviceTableDataChangeStream',
{
shardCount: 1,
}
);
new kinesisfirehose.DeliveryStream(
this,
'deviceTableDataChangeDeliveryStream',
{
sourceStream: deviceTableDataChangeDataStream,
destinations: [
new kinesisDestinations.S3Bucket(deviceTableDataChengeLogsBucket, {
dataOutputPrefix: 'data/!{timestamp:yyyy/MM/dd/HH/}',
errorOutputPrefix:
'error/!{firehose:error-output-type}/!{timestamp:yyyy/MM/dd/HH/}',
}),
],
}
);
new dynamodb.Table(this, 'deviceTable', {
tableName: 'deviceTable',
partitionKey: { name: 'deviceId', type: dynamodb.AttributeType.STRING },
billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
kinesisStream: deviceTableDataChangeDataStream,
});
//Glueデータベース
const glueDataBase = new glue.Database(this, 'glueDataBase', {
databaseName: 'gluedatabase',
});
//Glueテーブル
new glue.Table(this, 'glueTable', {
bucket: deviceTableDataChengeLogsBucket,
s3Prefix: 'data/',
database: glueDataBase,
tableName: 'gluetable',
columns: [
{
name: 'eventName',
type: glue.Schema.STRING,
},
{
name: 'dynamodb',
type: glue.Schema.struct([
{
name: 'ApproximateCreationDateTime',
type: glue.Schema.BIG_INT,
},
{
name: 'Keys',
type: glue.Schema.struct([
{
name: 'deviceId',
type: glue.Schema.struct([
{
name: 'S',
type: glue.Schema.STRING,
},
]),
},
]),
},
{
name: 'NewImage',
type: glue.Schema.struct([
{
name: 'deviceId',
type: glue.Schema.struct([
{
name: 'S',
type: glue.Schema.STRING,
},
]),
},
{
name: 'deviceName',
type: glue.Schema.struct([
{
name: 'S',
type: glue.Schema.STRING,
},
]),
},
]),
},
{
name: 'OldImage',
type: glue.Schema.struct([
{
name: 'deviceId',
type: glue.Schema.struct([
{
name: 'S',
type: glue.Schema.STRING,
},
]),
},
{
name: 'deviceName',
type: glue.Schema.struct([
{
name: 'S',
type: glue.Schema.STRING,
},
]),
},
]),
},
]),
},
],
dataFormat: glue.DataFormat.JSON,
});
//Athenaクエリ結果出力先バケット
const athenaQueryResultBucket = new s3.Bucket(
this,
'athenaQueryResultBucket'
);
//Athenaワークグループ
new athena.CfnWorkGroup(this, 'athenaWorkGroup', {
name: 'athenaWorkGroup',
workGroupConfiguration: {
resultConfiguration: {
outputLocation: `s3://${athenaQueryResultBucket.bucketName}/data`,
},
},
});
}
}
DynamoSBのデータ変更履歴データ例は下記のようになります。このデータのスキーマをglue.Table()
のcolumns
で定義しています。今回はeventName
とdynamodb
の2スキーマのみとしています。
{
"awsRegion": "ap-northeast-1",
"eventID": "a77152e9-4a99-40b6-bd44-ecf4760123e3",
"eventName": "INSERT",
"userIdentity": null,
"recordFormat": "application/json",
"tableName": "deviceTable",
"dynamodb": {
"ApproximateCreationDateTime": 1637419833903,
"Keys": {
"deviceId": {
"S": "d001"
}
},
"NewImage": {
"deviceId": {
"S": "d001"
},
{
"deviceName": {
"S": "デバイス001"
},
},
"SizeBytes": 24
},
"eventSource": "aws:dynamodb"
}
動作
Amazon AthenaのマネジメントコンソールでSELECTクエリを実行してみます。
SELECTを*
で実行してみます。
SELECT * FROM "gluedatabase"."gluetable"
すると入れ子構造のJSONはdynamodb
列のように入れ子構造のまま出力され、扱いにくい形式となってしまいます。
よって入れ子構造のJSON内の値は下記のように.
でアクセスしてas
でカスタム列としフラット化すれば扱いやすくなります。
SELECT
eventname,
dynamodb.approximatecreationdatetime as approximatecreationdatetime,
dynamodb.newimage.deviceid.s as deviceid,
dynamodb.newimage.devicename.s as devicename
FROM "gluedatabase"."gluetable"
(クエリ実行時のキャプチャは取り忘れましたが、)DATE_FORMAT
やFROM_UNIXTIME
を使えばapproximatecreationdatetimeの形式を変更することも可能です。
SELECT
eventname,
DATE_FORMAT(FROM_UNIXTIME(dynamodb.approximatecreationdatetime/1000, 'Asia/Tokyo'),'%Y-%m-%d %H:%i:%s') as timestamp,
dynamodb.newimage.deviceid.s as deviceid,
dynamodb.newimage.devicename.s as devicename
FROM "gluedatabase"."gluetable"
注意点
AthenaではUnixTimeミリ秒のデータの方はBIGINTとする必要がある
CDKでのGlueテーブルのcolumnの定義で、ApproximateCreationDateTime
の値はUnixTimeミリ秒となるためBIGINT型として定義する必要があります。
{
name: 'ApproximateCreationDateTime',
type: glue.Schema.BIG_INT,
},
INT型として定義した場合は下記のHIVE_BAD_DATA
エラーとなります。
これはUnixTimeミリ秒はAthenaのクエリエンジンであるPrestoのINT値の範囲を超えてしまうためです。
Presto の INT 値の範囲が -2147483648~2147483647 であるため、Athena は「49612833315」を解析できませんでした。
参考
- AWS Glueデータカタログへのテーブルの作成とAthenaの設定をCloudFormationでやってみた | DevelopersIO
- 【新機能】Amazon DynamoDB Table を S3 に Export して Amazon Athena でクエリを実行する | DevelopersIO
- JSONSerDe によるマッピングを使って、入れ子の JSON から Amazon Athena のテーブルを作成する | Amazon Web Services ブログ
- prestoの日付関数 - Qiita
- AWS Athenaでのタイムスタンプの扱い方 まとめ - Qiita
- AWS WAFのログをAthenaで整形する - サーバーワークスエンジニアブログ
以上