Amazon Kinesis Data FirehoseでDynamic Partitioningによる出力データの動的パーティショニングを試してみた(AWS CDK)
こんにちは、CX事業本部 IoT事業部の若槻です。
Amazon Kinesis Data Firehoseでは、S3 Bucketに出力されるデータのnamespace(プレフィックス)として、Timestampに加えて、レコード毎に指定のキー値を使用したプレフィクスを付与できるDynamic Partitioning(動的パーティショニング)という機能が利用できます。
今回は、Amazon Kinesis Data FirehoseでのDynamic Partitioningによる出力データの動的パーティショニングをAWS CDKで実装して試してみました。
やってみた
実装
AWS CDK v2(TypeScript)で次のようなCDKスタックを作成します。
import { Construct } from 'constructs'; import { aws_s3, aws_athena, aws_kinesisfirehose, Stack, StackProps, RemovalPolicy, Duration, Size, aws_glue, } from 'aws-cdk-lib'; import * as glue_alpha from '@aws-cdk/aws-glue-alpha'; import * as firehose_alpha from '@aws-cdk/aws-kinesisfirehose-alpha'; import * as firehose_destinations_alpha from '@aws-cdk/aws-kinesisfirehose-destinations-alpha'; export class ProcessStack extends Stack { constructor(scope: Construct, id: string, props: StackProps) { super(scope, id, props); // データソース格納バケット const dataBucket = new aws_s3.Bucket(this, 'dataBucket', { bucketName: `data-${this.account}-${this.region}`, removalPolicy: RemovalPolicy.DESTROY, }); // Athenaクエリ結果格納バケット const athenaQueryResultBucket = new aws_s3.Bucket( this, 'athenaQueryResultBucket', { bucketName: `athena-query-result-${this.account}`, removalPolicy: RemovalPolicy.DESTROY, } ); // データカタログ const dataCatalog = new glue_alpha.Database(this, 'dataCatalog', { databaseName: 'data_catalog', }); // データカタログテーブル const dataCatalogTable = new glue_alpha.Table(this, 'dataCatalogTable', { tableName: 'data_catalog_table', database: dataCatalog, bucket: dataBucket, s3Prefix: 'data/', dataFormat: glue_alpha.DataFormat.JSON, partitionKeys: [ { name: 'area', type: glue_alpha.Schema.STRING, }, ], columns: [ { name: 'id', type: glue_alpha.Schema.STRING, }, { name: 'count', type: glue_alpha.Schema.INTEGER, }, ], }); // areaパーティションに対するPartition Projection設定 const cfnTable = dataCatalogTable.node.defaultChild as aws_glue.CfnTable; cfnTable.addPropertyOverride('TableInput.Parameters', { 'projection.enabled': true, 'projection.area.type': 'enum', 'projection.area.values': 'Shinjuku,Ikebukuro,Akihabara', 'storage.location.template': `s3://${dataBucket.bucketName}/data/` + '${area}', }); // Athenaワークグループ new aws_athena.CfnWorkGroup(this, 'athenaWorkGroup', { name: 'athenaWorkGroup', workGroupConfiguration: { resultConfiguration: { outputLocation: `s3://${athenaQueryResultBucket.bucketName}/result-data`, }, }, recursiveDeleteOption: true, }); // Kinesis Firehose Delivery Stream const deliveryStream = new firehose_alpha.DeliveryStream( this, 'deliveryStream', { deliveryStreamName: 'deliveryStream', destinations: [ new firehose_destinations_alpha.S3Bucket(dataBucket, { dataOutputPrefix: 'data/!{partitionKeyFromQuery:key1}/', errorOutputPrefix: 'error/!{firehose:error-output-type}/', bufferingInterval: Duration.seconds(60), bufferingSize: Size.mebibytes(64), }), ], } ); // Delivery StreamのDynamic Partitioning設定 const cfnDeliveryStream = deliveryStream.node .defaultChild as aws_kinesisfirehose.CfnDeliveryStream; cfnDeliveryStream.addPropertyOverride( 'ExtendedS3DestinationConfiguration.DynamicPartitioningConfiguration', { Enabled: true, } ); cfnDeliveryStream.addPropertyOverride( 'ExtendedS3DestinationConfiguration.ProcessingConfiguration', { Enabled: true, Processors: [ { Type: 'MetadataExtraction', Parameters: [ { ParameterName: 'MetadataExtractionQuery', //クエリ文字列 ParameterValue: '{key1: .area}', }, { ParameterName: 'JsonParsingEngine', //putされたデータをjqエンジンでクエリする ParameterValue: 'JQ-1.6', }, ], }, { Type: 'AppendDelimiterToRecord', //レコード間に改行を挿入する Parameters: [ { ParameterName: 'Delimiter', ParameterValue: '\\n', }, ], }, ], } ); } }
@aws-cdk/aws-kinesisfirehose-alpha/DeliveryStream
のL2 Construct ClassにはDynamic Partitioningを設定するためのプロパティがありません。そこで作成したConstructをL1 Construct ClassであるCfnDeliveryStream
で型アサーションし、Dynamic Partitioningに必要な設定のプロパティをオーバーライドしています。- 119〜131行目の指定により、レコードのJSON String中の
area
キーの値をクエリし、出力されるデータのプレフィクスdata/!{partitionKeyFromQuery:key1}/
で使用するようにしています。 - 出力されたデータをAthenaでクエリをするために必要なリソースも合わせて作成しています。
上記をCDK Deployしてスタックをデプロイします。
動作確認
Delivery streamに対して下記のデータを連続でPutします。
aws firehose put-record \ --delivery-stream-name deliveryStream \ --cli-binary-format raw-in-base64-out \ --record '{"Data":"{\"id\":\"u001\",\"count\":3,\"area\":\"Shinjuku\"}"}' aws firehose put-record \ --delivery-stream-name deliveryStream \ --cli-binary-format raw-in-base64-out \ --record '{"Data":"{\"id\":\"u001\",\"count\":1,\"area\":\"Akihabara\"}"}' aws firehose put-record \ --delivery-stream-name deliveryStream \ --cli-binary-format raw-in-base64-out \ --record '{"Data":"{\"id\":\"u002\",\"count\":5,\"area\":\"Ikebukuro\"}"}' aws firehose put-record \ --delivery-stream-name deliveryStream \ --cli-binary-format raw-in-base64-out \ --record '{"Data":"{\"id\":\"u002\",\"count\":2,\"area\":\"Akihabara\"}"}' aws firehose put-record \ --delivery-stream-name deliveryStream \ --cli-binary-format raw-in-base64-out \ --record '{"Data":"{\"id\":\"u003\",\"count\":4,\"area\":\"Shinjuku\"}"}'
PutからBufferingInterval(60秒)以上経過後に出力先Bucketの内容を確認すると、プレフィクス毎にデータが出力されています。Dynamic Partitiongが動作していますね。
$ aws s3 ls s3://${BUCKET_NAME} --recursive 2022-08-06 23:26:49 84 data/Akihabara/deliveryStream-5-2022-08-06-14-24-26-7ff44042-2556-3ac1-b55a-ab0f515d6224 2022-08-06 23:26:49 42 data/Ikebukuro/deliveryStream-5-2022-08-06-14-24-27-c53b6edb-987c-37d9-abae-a67919e82275 2022-08-06 23:26:49 82 data/Shinjuku/deliveryStream-5-2022-08-06-14-24-25-6d70c97c-8bdf-3206-a588-53cd087befaf
Amazon AthenaのQuery EditorからデータをクエリしてRecordを取得してみます。
SELECT * FROM "data_catalog"."data_catalog_table" limit 10;
Recordを取得できました。パーティションキーのカラム含めて取得できていますね!
補足
buffur Sizeの制限
Dynamic Partitioningを使用するDelivery streamでは、buffur Sizeが64MBから128MBの間である必要があります。
For a delivery stream where data partitioning is enabled, the buffer size ranges from 64MB to 128MB
下記のように制限を下回るbuffer sizeを指定した場合。
// Kinesis Firehose Delivery Stream const deliveryStream = new firehose_alpha.DeliveryStream( this, 'deliveryStream', { deliveryStreamName: 'deliveryStream', destinations: [ new firehose_destinations_alpha.S3Bucket(dataBucket, { dataOutputPrefix: 'data/!{partitionKeyFromQuery:key1}/', errorOutputPrefix: 'error/!{firehose:error-output-type}/', bufferingInterval: Duration.seconds(60), bufferingSize: Size.mebibytes(32), // 制限を下回るbuffer size }), ], } );
Delivery streamの作成または更新時に次のようなエラーとなります。
cdk deploy ProcessStack ✨ Synthesis time: 4.02s ProcessStack: deploying... [0%] start: Publishing a2f24d2b6e5b76fb8772dc8667329d7bfdc094eb89b709aceeadbbe7a9e3fbd7:current_account-ap-northeast-1 [100%] success: Published a2f24d2b6e5b76fb8772dc8667329d7bfdc094eb89b709aceeadbbe7a9e3fbd7:current_account-ap-northeast-1 ProcessStack: creating CloudFormation changeset... 1:49:28 AM | UPDATE_FAILED | AWS::KinesisFirehose::DeliveryStream | deliveryStream7D22820B Resource handler returned message: "BufferingHints.SizeInMBs must be at least 64 when Dynamic Partitioning is enabled. (Servi ce: Firehose, Status Code: 400, Request ID: e377f0b4-9204-9571-b81f-6d634b9ce8db, Extended Request ID: 7An/JoTU08GAHHM/OEmP78 NVqfYdQi5yIkhtcAZnCflFvalsO1GLvunIbpXKAKOhbv61JGsud1AdSiqcw99ct2k4wuKHXTkE)" (RequestToken: a8f08440-a686-6ef6-9ba8-ad6129a9c b6c, HandlerErrorCode: InvalidRequest)
参考
- put-record — AWS CLI 2.7.21 Command Reference
- ls — AWS CLI 2.7.21 Command Reference
- class CfnTable (construct) · AWS CDK
- amazon s3 - Can I customize partitioning in Kinesis Firehose before delivering to S3? - Stack Overflow
- amazon s3 - Which class in AWS CDK have option to configure Dynamic partitioning for Kinesis delivery stream - Stack Overflow
以上