この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
こんにちは、CX事業本部 IoT事業部の若槻です。
Amazon Kinesis Data Firehoseでは、S3 Bucketに出力されるデータのnamespace(プレフィックス)として、Timestampに加えて、レコード毎に指定のキー値を使用したプレフィクスを付与できるDynamic Partitioning(動的パーティショニング)という機能が利用できます。
今回は、Amazon Kinesis Data FirehoseでのDynamic Partitioningによる出力データの動的パーティショニングをAWS CDKで実装して試してみました。
やってみた
実装
AWS CDK v2(TypeScript)で次のようなCDKスタックを作成します。
lib/process-stack.ts
import { Construct } from 'constructs';
import {
aws_s3,
aws_athena,
aws_kinesisfirehose,
Stack,
StackProps,
RemovalPolicy,
Duration,
Size,
aws_glue,
} from 'aws-cdk-lib';
import * as glue_alpha from '@aws-cdk/aws-glue-alpha';
import * as firehose_alpha from '@aws-cdk/aws-kinesisfirehose-alpha';
import * as firehose_destinations_alpha from '@aws-cdk/aws-kinesisfirehose-destinations-alpha';
export class ProcessStack extends Stack {
constructor(scope: Construct, id: string, props: StackProps) {
super(scope, id, props);
// データソース格納バケット
const dataBucket = new aws_s3.Bucket(this, 'dataBucket', {
bucketName: `data-${this.account}-${this.region}`,
removalPolicy: RemovalPolicy.DESTROY,
});
// Athenaクエリ結果格納バケット
const athenaQueryResultBucket = new aws_s3.Bucket(
this,
'athenaQueryResultBucket',
{
bucketName: `athena-query-result-${this.account}`,
removalPolicy: RemovalPolicy.DESTROY,
}
);
// データカタログ
const dataCatalog = new glue_alpha.Database(this, 'dataCatalog', {
databaseName: 'data_catalog',
});
// データカタログテーブル
const dataCatalogTable = new glue_alpha.Table(this, 'dataCatalogTable', {
tableName: 'data_catalog_table',
database: dataCatalog,
bucket: dataBucket,
s3Prefix: 'data/',
dataFormat: glue_alpha.DataFormat.JSON,
partitionKeys: [
{
name: 'area',
type: glue_alpha.Schema.STRING,
},
],
columns: [
{
name: 'id',
type: glue_alpha.Schema.STRING,
},
{
name: 'count',
type: glue_alpha.Schema.INTEGER,
},
],
});
// areaパーティションに対するPartition Projection設定
const cfnTable = dataCatalogTable.node.defaultChild as aws_glue.CfnTable;
cfnTable.addPropertyOverride('TableInput.Parameters', {
'projection.enabled': true,
'projection.area.type': 'enum',
'projection.area.values': 'Shinjuku,Ikebukuro,Akihabara',
'storage.location.template':
`s3://${dataBucket.bucketName}/data/` + '${area}',
});
// Athenaワークグループ
new aws_athena.CfnWorkGroup(this, 'athenaWorkGroup', {
name: 'athenaWorkGroup',
workGroupConfiguration: {
resultConfiguration: {
outputLocation: `s3://${athenaQueryResultBucket.bucketName}/result-data`,
},
},
recursiveDeleteOption: true,
});
// Kinesis Firehose Delivery Stream
const deliveryStream = new firehose_alpha.DeliveryStream(
this,
'deliveryStream',
{
deliveryStreamName: 'deliveryStream',
destinations: [
new firehose_destinations_alpha.S3Bucket(dataBucket, {
dataOutputPrefix: 'data/!{partitionKeyFromQuery:key1}/',
errorOutputPrefix: 'error/!{firehose:error-output-type}/',
bufferingInterval: Duration.seconds(60),
bufferingSize: Size.mebibytes(64),
}),
],
}
);
// Delivery StreamのDynamic Partitioning設定
const cfnDeliveryStream = deliveryStream.node
.defaultChild as aws_kinesisfirehose.CfnDeliveryStream;
cfnDeliveryStream.addPropertyOverride(
'ExtendedS3DestinationConfiguration.DynamicPartitioningConfiguration',
{
Enabled: true,
}
);
cfnDeliveryStream.addPropertyOverride(
'ExtendedS3DestinationConfiguration.ProcessingConfiguration',
{
Enabled: true,
Processors: [
{
Type: 'MetadataExtraction',
Parameters: [
{
ParameterName: 'MetadataExtractionQuery', //クエリ文字列
ParameterValue: '{key1: .area}',
},
{
ParameterName: 'JsonParsingEngine', //putされたデータをjqエンジンでクエリする
ParameterValue: 'JQ-1.6',
},
],
},
{
Type: 'AppendDelimiterToRecord', //レコード間に改行を挿入する
Parameters: [
{
ParameterName: 'Delimiter',
ParameterValue: '\\n',
},
],
},
],
}
);
}
}
@aws-cdk/aws-kinesisfirehose-alpha/DeliveryStream
のL2 Construct ClassにはDynamic Partitioningを設定するためのプロパティがありません。そこで作成したConstructをL1 Construct ClassであるCfnDeliveryStream
で型アサーションし、Dynamic Partitioningに必要な設定のプロパティをオーバーライドしています。- 119〜131行目の指定により、レコードのJSON String中の
area
キーの値をクエリし、出力されるデータのプレフィクスdata/!{partitionKeyFromQuery:key1}/
で使用するようにしています。 - 出力されたデータをAthenaでクエリをするために必要なリソースも合わせて作成しています。
上記をCDK Deployしてスタックをデプロイします。
動作確認
Delivery streamに対して下記のデータを連続でPutします。
aws firehose put-record \
--delivery-stream-name deliveryStream \
--cli-binary-format raw-in-base64-out \
--record '{"Data":"{\"id\":\"u001\",\"count\":3,\"area\":\"Shinjuku\"}"}'
aws firehose put-record \
--delivery-stream-name deliveryStream \
--cli-binary-format raw-in-base64-out \
--record '{"Data":"{\"id\":\"u001\",\"count\":1,\"area\":\"Akihabara\"}"}'
aws firehose put-record \
--delivery-stream-name deliveryStream \
--cli-binary-format raw-in-base64-out \
--record '{"Data":"{\"id\":\"u002\",\"count\":5,\"area\":\"Ikebukuro\"}"}'
aws firehose put-record \
--delivery-stream-name deliveryStream \
--cli-binary-format raw-in-base64-out \
--record '{"Data":"{\"id\":\"u002\",\"count\":2,\"area\":\"Akihabara\"}"}'
aws firehose put-record \
--delivery-stream-name deliveryStream \
--cli-binary-format raw-in-base64-out \
--record '{"Data":"{\"id\":\"u003\",\"count\":4,\"area\":\"Shinjuku\"}"}'
PutからBufferingInterval(60秒)以上経過後に出力先Bucketの内容を確認すると、プレフィクス毎にデータが出力されています。Dynamic Partitiongが動作していますね。
$ aws s3 ls s3://${BUCKET_NAME} --recursive
2022-08-06 23:26:49 84 data/Akihabara/deliveryStream-5-2022-08-06-14-24-26-7ff44042-2556-3ac1-b55a-ab0f515d6224
2022-08-06 23:26:49 42 data/Ikebukuro/deliveryStream-5-2022-08-06-14-24-27-c53b6edb-987c-37d9-abae-a67919e82275
2022-08-06 23:26:49 82 data/Shinjuku/deliveryStream-5-2022-08-06-14-24-25-6d70c97c-8bdf-3206-a588-53cd087befaf
Amazon AthenaのQuery EditorからデータをクエリしてRecordを取得してみます。
SELECT * FROM "data_catalog"."data_catalog_table" limit 10;
Recordを取得できました。パーティションキーのカラム含めて取得できていますね!
補足
buffur Sizeの制限
Dynamic Partitioningを使用するDelivery streamでは、buffur Sizeが64MBから128MBの間である必要があります。
For a delivery stream where data partitioning is enabled, the buffer size ranges from 64MB to 128MB
下記のように制限を下回るbuffer sizeを指定した場合。
lib/process-stack.ts
// Kinesis Firehose Delivery Stream
const deliveryStream = new firehose_alpha.DeliveryStream(
this,
'deliveryStream',
{
deliveryStreamName: 'deliveryStream',
destinations: [
new firehose_destinations_alpha.S3Bucket(dataBucket, {
dataOutputPrefix: 'data/!{partitionKeyFromQuery:key1}/',
errorOutputPrefix: 'error/!{firehose:error-output-type}/',
bufferingInterval: Duration.seconds(60),
bufferingSize: Size.mebibytes(32), // 制限を下回るbuffer size
}),
],
}
);
Delivery streamの作成または更新時に次のようなエラーとなります。
cdk deploy ProcessStack
✨ Synthesis time: 4.02s
ProcessStack: deploying...
[0%] start: Publishing a2f24d2b6e5b76fb8772dc8667329d7bfdc094eb89b709aceeadbbe7a9e3fbd7:current_account-ap-northeast-1
[100%] success: Published a2f24d2b6e5b76fb8772dc8667329d7bfdc094eb89b709aceeadbbe7a9e3fbd7:current_account-ap-northeast-1
ProcessStack: creating CloudFormation changeset...
1:49:28 AM | UPDATE_FAILED | AWS::KinesisFirehose::DeliveryStream | deliveryStream7D22820B
Resource handler returned message: "BufferingHints.SizeInMBs must be at least 64 when Dynamic Partitioning is enabled. (Servi
ce: Firehose, Status Code: 400, Request ID: e377f0b4-9204-9571-b81f-6d634b9ce8db, Extended Request ID: 7An/JoTU08GAHHM/OEmP78
NVqfYdQi5yIkhtcAZnCflFvalsO1GLvunIbpXKAKOhbv61JGsud1AdSiqcw99ct2k4wuKHXTkE)" (RequestToken: a8f08440-a686-6ef6-9ba8-ad6129a9c
b6c, HandlerErrorCode: InvalidRequest)
参考
- put-record — AWS CLI 2.7.21 Command Reference
- ls — AWS CLI 2.7.21 Command Reference
- class CfnTable (construct) · AWS CDK
- amazon s3 - Can I customize partitioning in Kinesis Firehose before delivering to S3? - Stack Overflow
- amazon s3 - Which class in AWS CDK have option to configure Dynamic partitioning for Kinesis delivery stream - Stack Overflow
以上