Amazon Kinesis Data FirehoseでDynamic Partitioningによる出力データの動的パーティショニングを試してみた(AWS CDK)

2022.08.06

こんにちは、CX事業本部 IoT事業部の若槻です。

Amazon Kinesis Data Firehoseでは、S3 Bucketに出力されるデータのnamespace(プレフィックス)として、Timestampに加えて、レコード毎に指定のキー値を使用したプレフィクスを付与できるDynamic Partitioning(動的パーティショニング)という機能が利用できます。

今回は、Amazon Kinesis Data FirehoseでのDynamic Partitioningによる出力データの動的パーティショニングをAWS CDKで実装して試してみました。

やってみた

実装

AWS CDK v2(TypeScript)で次のようなCDKスタックを作成します。

lib/process-stack.ts

import { Construct } from 'constructs';
import {
  aws_s3,
  aws_athena,
  aws_kinesisfirehose,
  Stack,
  StackProps,
  RemovalPolicy,
  Duration,
  Size,
  aws_glue,
} from 'aws-cdk-lib';
import * as glue_alpha from '@aws-cdk/aws-glue-alpha';
import * as firehose_alpha from '@aws-cdk/aws-kinesisfirehose-alpha';
import * as firehose_destinations_alpha from '@aws-cdk/aws-kinesisfirehose-destinations-alpha';

export class ProcessStack extends Stack {
  constructor(scope: Construct, id: string, props: StackProps) {
    super(scope, id, props);

    // データソース格納バケット
    const dataBucket = new aws_s3.Bucket(this, 'dataBucket', {
      bucketName: `data-${this.account}-${this.region}`,
      removalPolicy: RemovalPolicy.DESTROY,
    });

    // Athenaクエリ結果格納バケット
    const athenaQueryResultBucket = new aws_s3.Bucket(
      this,
      'athenaQueryResultBucket',
      {
        bucketName: `athena-query-result-${this.account}`,
        removalPolicy: RemovalPolicy.DESTROY,
      }
    );

    // データカタログ
    const dataCatalog = new glue_alpha.Database(this, 'dataCatalog', {
      databaseName: 'data_catalog',
    });

    // データカタログテーブル
    const dataCatalogTable = new glue_alpha.Table(this, 'dataCatalogTable', {
      tableName: 'data_catalog_table',
      database: dataCatalog,
      bucket: dataBucket,
      s3Prefix: 'data/',
      dataFormat: glue_alpha.DataFormat.JSON,
      partitionKeys: [
        {
          name: 'area',
          type: glue_alpha.Schema.STRING,
        },
      ],
      columns: [
        {
          name: 'id',
          type: glue_alpha.Schema.STRING,
        },
        {
          name: 'count',
          type: glue_alpha.Schema.INTEGER,
        },
      ],
    });

    // areaパーティションに対するPartition Projection設定
    const cfnTable = dataCatalogTable.node.defaultChild as aws_glue.CfnTable;
    cfnTable.addPropertyOverride('TableInput.Parameters', {
      'projection.enabled': true,
      'projection.area.type': 'enum',
      'projection.area.values': 'Shinjuku,Ikebukuro,Akihabara',
      'storage.location.template':
        `s3://${dataBucket.bucketName}/data/` + '${area}',
    });

    // Athenaワークグループ
    new aws_athena.CfnWorkGroup(this, 'athenaWorkGroup', {
      name: 'athenaWorkGroup',
      workGroupConfiguration: {
        resultConfiguration: {
          outputLocation: `s3://${athenaQueryResultBucket.bucketName}/result-data`,
        },
      },
      recursiveDeleteOption: true,
    });

    // Kinesis Firehose Delivery Stream
    const deliveryStream = new firehose_alpha.DeliveryStream(
      this,
      'deliveryStream',
      {
        deliveryStreamName: 'deliveryStream',
        destinations: [
          new firehose_destinations_alpha.S3Bucket(dataBucket, {
            dataOutputPrefix: 'data/!{partitionKeyFromQuery:key1}/',
            errorOutputPrefix: 'error/!{firehose:error-output-type}/',
            bufferingInterval: Duration.seconds(60),
            bufferingSize: Size.mebibytes(64),
          }),
        ],
      }
    );

    // Delivery StreamのDynamic Partitioning設定
    const cfnDeliveryStream = deliveryStream.node
      .defaultChild as aws_kinesisfirehose.CfnDeliveryStream;
    cfnDeliveryStream.addPropertyOverride(
      'ExtendedS3DestinationConfiguration.DynamicPartitioningConfiguration',
      {
        Enabled: true,
      }
    );
    cfnDeliveryStream.addPropertyOverride(
      'ExtendedS3DestinationConfiguration.ProcessingConfiguration',
      {
        Enabled: true,
        Processors: [
          {
            Type: 'MetadataExtraction',
            Parameters: [
              {
                ParameterName: 'MetadataExtractionQuery', //クエリ文字列
                ParameterValue: '{key1: .area}',
              },
              {
                ParameterName: 'JsonParsingEngine', //putされたデータをjqエンジンでクエリする
                ParameterValue: 'JQ-1.6',
              },
            ],
          },
          {
            Type: 'AppendDelimiterToRecord', //レコード間に改行を挿入する
            Parameters: [
              {
                ParameterName: 'Delimiter',
                ParameterValue: '\\n',
              },
            ],
          },
        ],
      }
    );
  }
}
  • @aws-cdk/aws-kinesisfirehose-alpha/DeliveryStreamのL2 Construct ClassにはDynamic Partitioningを設定するためのプロパティがありません。そこで作成したConstructをL1 Construct ClassであるCfnDeliveryStreamで型アサーションし、Dynamic Partitioningに必要な設定のプロパティをオーバーライドしています。
  • 119〜131行目の指定により、レコードのJSON String中のareaキーの値をクエリし、出力されるデータのプレフィクスdata/!{partitionKeyFromQuery:key1}/で使用するようにしています。
  • 出力されたデータをAthenaでクエリをするために必要なリソースも合わせて作成しています。

上記をCDK Deployしてスタックをデプロイします。

動作確認

Delivery streamに対して下記のデータを連続でPutします。

aws firehose put-record \
  --delivery-stream-name deliveryStream \
  --cli-binary-format raw-in-base64-out \
  --record '{"Data":"{\"id\":\"u001\",\"count\":3,\"area\":\"Shinjuku\"}"}'
aws firehose put-record \
  --delivery-stream-name deliveryStream \
  --cli-binary-format raw-in-base64-out \
  --record '{"Data":"{\"id\":\"u001\",\"count\":1,\"area\":\"Akihabara\"}"}'
aws firehose put-record \
  --delivery-stream-name deliveryStream \
  --cli-binary-format raw-in-base64-out \
  --record '{"Data":"{\"id\":\"u002\",\"count\":5,\"area\":\"Ikebukuro\"}"}'
aws firehose put-record \
  --delivery-stream-name deliveryStream \
  --cli-binary-format raw-in-base64-out \
  --record '{"Data":"{\"id\":\"u002\",\"count\":2,\"area\":\"Akihabara\"}"}'
aws firehose put-record \
  --delivery-stream-name deliveryStream \
  --cli-binary-format raw-in-base64-out \
  --record '{"Data":"{\"id\":\"u003\",\"count\":4,\"area\":\"Shinjuku\"}"}'

PutからBufferingInterval(60秒)以上経過後に出力先Bucketの内容を確認すると、プレフィクス毎にデータが出力されています。Dynamic Partitiongが動作していますね。

$ aws s3 ls s3://${BUCKET_NAME} --recursive      
2022-08-06 23:26:49         84 data/Akihabara/deliveryStream-5-2022-08-06-14-24-26-7ff44042-2556-3ac1-b55a-ab0f515d6224
2022-08-06 23:26:49         42 data/Ikebukuro/deliveryStream-5-2022-08-06-14-24-27-c53b6edb-987c-37d9-abae-a67919e82275
2022-08-06 23:26:49         82 data/Shinjuku/deliveryStream-5-2022-08-06-14-24-25-6d70c97c-8bdf-3206-a588-53cd087befaf

Amazon AthenaのQuery EditorからデータをクエリしてRecordを取得してみます。

SELECT * FROM "data_catalog"."data_catalog_table" limit 10;

Recordを取得できました。パーティションキーのカラム含めて取得できていますね!

補足

buffur Sizeの制限

Dynamic Partitioningを使用するDelivery streamでは、buffur Sizeが64MBから128MBの間である必要があります。

For a delivery stream where data partitioning is enabled, the buffer size ranges from 64MB to 128MB

下記のように制限を下回るbuffer sizeを指定した場合。

lib/process-stack.ts

    // Kinesis Firehose Delivery Stream
    const deliveryStream = new firehose_alpha.DeliveryStream(
      this,
      'deliveryStream',
      {
        deliveryStreamName: 'deliveryStream',
        destinations: [
          new firehose_destinations_alpha.S3Bucket(dataBucket, {
            dataOutputPrefix: 'data/!{partitionKeyFromQuery:key1}/',
            errorOutputPrefix: 'error/!{firehose:error-output-type}/',
            bufferingInterval: Duration.seconds(60),
            bufferingSize: Size.mebibytes(32), // 制限を下回るbuffer size
          }),
        ],
      }
    );

Delivery streamの作成または更新時に次のようなエラーとなります。

cdk deploy ProcessStack

✨  Synthesis time: 4.02s

ProcessStack: deploying...
[0%] start: Publishing a2f24d2b6e5b76fb8772dc8667329d7bfdc094eb89b709aceeadbbe7a9e3fbd7:current_account-ap-northeast-1
[100%] success: Published a2f24d2b6e5b76fb8772dc8667329d7bfdc094eb89b709aceeadbbe7a9e3fbd7:current_account-ap-northeast-1
ProcessStack: creating CloudFormation changeset...
1:49:28 AM | UPDATE_FAILED        | AWS::KinesisFirehose::DeliveryStream | deliveryStream7D22820B
Resource handler returned message: "BufferingHints.SizeInMBs must be at least 64 when Dynamic Partitioning is enabled. (Servi
ce: Firehose, Status Code: 400, Request ID: e377f0b4-9204-9571-b81f-6d634b9ce8db, Extended Request ID: 7An/JoTU08GAHHM/OEmP78
NVqfYdQi5yIkhtcAZnCflFvalsO1GLvunIbpXKAKOhbv61JGsud1AdSiqcw99ct2k4wuKHXTkE)" (RequestToken: a8f08440-a686-6ef6-9ba8-ad6129a9c
b6c, HandlerErrorCode: InvalidRequest)

参考

以上