[AWS CDK] Amazon S3 Bucket を送信先とした Kinesis Data Firehose の出力プレフィクスを、レコードから抽出したタイムスタンプデータを用いて JST 形式にする

[AWS CDK] Amazon S3 Bucket を送信先とした Kinesis Data Firehose の出力プレフィクスを、レコードから抽出したタイムスタンプデータを用いて JST 形式にする

Clock Icon2023.05.31

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、CX事業本部 Delivery部の若槻です。

今回は、Amazon Kinesis Data Firehose から Amazon S3 へのデータ送信で、レコードから抽出したデータをもとに出力プレフィクスをカスタマイズする方法として Dynamic Partitioning(動的パーティショニング)があります。

今回は、レコードから抽出したタイムスタンプデータを用いて「JST 形式の出力プレフィクスを出力」する動的パーティショニングを AWS CDK で構築してみました。

出力プレフィクスの日時形式(yyyy/MM/dd/HH)は既定で UTC となる

まず動的パーティショニングを使用しない場合の動作について確認してみます。

Amazon S3 Bucket を送信先とした Kinesis Data Firehose Delivery Stream(配信ストリーム)の出力プレフィクスでは timestamp​ 名前空間を使用することができます。!{timestamp:yyyy/MM/dd/HH}のように指定すると 2023/05/31/23 のような日時形式のプレフィクスでデータが出力されます。この timestamp​ 名前空間では常に UTC 時間が使用されます。

それでは動的パーティショニングを使用しない場合の配信ストリームを AWS CDK で構築してみます。

import { aws_s3 as s3, Duration, Stack, StackProps } from 'aws-cdk-lib';
import { Construct } from 'constructs';

import * as kinesisfirehose_alpha from '@aws-cdk/aws-kinesisfirehose-alpha';
import * as kinesisfirehose_destination_alpha from '@aws-cdk/aws-kinesisfirehose-destinations-alpha';

export class CdkSampleStack extends Stack {
  constructor(scope: Construct, id: string, props: StackProps) {
    super(scope, id, props);

    const destingtionBucket = new s3.Bucket(this, 'DestinationBucket');

    new kinesisfirehose_alpha.DeliveryStream(this, 'SampleDeliveryStream', {
      deliveryStreamName: 'sample-delivery-stream',
      destinations: [
        new kinesisfirehose_destination_alpha.S3Bucket(destingtionBucket, {
          compression: kinesisfirehose_destination_alpha.Compression.GZIP,
          bufferingInterval: Duration.seconds(60),
          dataOutputPrefix: 'data/!{timestamp:yyyy/MM/dd/HH}/',
          errorOutputPrefix:
            'error/!{firehose:error-output-type}/!{timestamp:yyyy/MM/dd/HH}/',
        }),
      ],
    });
  }
}

作成された配信ストリームにレコードをプットします。

ENCODED_DATA=$(echo -n '{"attribute":1}' | base64 | tr -d '\n')
aws firehose put-record \
  --delivery-stream-name sample-delivery-stream \
  --record "{\"Data\":\"$ENCODED_DATA\"}"

Buffer interval(60秒)経過後にバケットに出力されたオブジェクトを確認してみます。

$ aws s3 ls ${bucketName}/data --recursive                 
2023-05-31 23:19:38         35 data/2023/05/31/14/sample-delivery-stream-1-2023-05-31-14-18-37-3e504448-15a0-4a01-9f35-73efeb926c50.gz

オブジェクトの作成日時(2023-05-31 23:19:38)は JST 形式ですが、プレフィクス(data/2023/05/31/14/)の日時は9時間前の UTC 形式となっていますね。

レコードから抽出したタイムスタンプデータを用いて出力プレフィクスの日時を JST にする

UTC ではなく JST の日時形式で出力プレフィクスを付与したい場合は、Dynamic Partitioning(動的パーティショニング)を使用します。

動的パーティショニングを使う場合は、Lambda Processor の構築が別途必要となる partitionKeyFromLambda 名前空間と、Configuration で MetadataExtractionQuery を記述するのみの partitionKeyFromQuery 名前空間のいずれか(もしくは両方)を使用して出力プレフィクスをカスタマイズすることができます。

そしてレコードから抽出したタイムスタンプデータを用いて、JST 形式のプレフィクスを出力したい場合は、MetadataExtractionQuery を使用することで実現できます。

MetadataExtractionQuery を使用した動的パーティショニングを設定した配信ストリームを AWS CDK で構築してみます。注意点として作成済みの配信ストリームに動的パーティショニングを後で設定することはできないため、新規に配信ストリームを作成する必要があります。

import {
  aws_kinesisfirehose as kinesisfirehose,
  aws_s3 as s3,
  Duration,
  Size,
  Stack,
  StackProps,
} from 'aws-cdk-lib';
import { Construct } from 'constructs';

import * as kinesisfirehose_alpha from '@aws-cdk/aws-kinesisfirehose-alpha';
import * as kinesisfirehose_destination_alpha from '@aws-cdk/aws-kinesisfirehose-destinations-alpha';

export class CdkSampleStack extends Stack {
  constructor(scope: Construct, id: string, props: StackProps) {
    super(scope, id, props);

    const destingtionBucket = new s3.Bucket(this, 'DestinationBucket');

    const deliveryStream = new kinesisfirehose_alpha.DeliveryStream(
      this,
      'SampleDeliveryStream2',
      {
        deliveryStreamName: 'sample-delivery-stream-2',
        destinations: [
          new kinesisfirehose_destination_alpha.S3Bucket(destingtionBucket, {
            compression: kinesisfirehose_destination_alpha.Compression.GZIP,
            bufferingInterval: Duration.seconds(60),
            bufferingSize: Size.mebibytes(64), // 動的パーティショニングを使用する場合はバッファサイズを 64 MiB 以上にする必要がある
            dataOutputPrefix:
              'data/' +
              '!{partitionKeyFromQuery:year}/' +
              '!{partitionKeyFromQuery:month}/' +
              '!{partitionKeyFromQuery:day}/' +
              '!{partitionKeyFromQuery:hour}/',
            errorOutputPrefix:
              'error/!{firehose:error-output-type}/!{timestamp:yyyy/MM/dd/HH}/',
          }),
        ],
      }
    );

    // 動的パーティショニングは L2 コンストラクタで設定できないため、Cfn リソースを取得して設定する
    const cfnDeliveryStream = deliveryStream.node
      .defaultChild as kinesisfirehose.CfnDeliveryStream;

    cfnDeliveryStream.addPropertyOverride(
      'ExtendedS3DestinationConfiguration.DynamicPartitioningConfiguration',
      {
        Enabled: true,
      }
    );

    cfnDeliveryStream.addPropertyOverride(
      'ExtendedS3DestinationConfiguration.ProcessingConfiguration',
      {
        Enabled: true,
        Processors: [
          {
            Type: 'MetadataExtraction',
            Parameters: [
              {
                ParameterName: 'MetadataExtractionQuery',
                ParameterValue:
                  '{' +
                  '"year": (.timestamp | tonumber | (. + (9 * 3600)) | strftime("%Y")),' +
                  '"month": (.timestamp | tonumber | (. + (9 * 3600)) | strftime("%m")),' +
                  '"day": (.timestamp | tonumber | (. + (9 * 3600)) | strftime("%d")),' +
                  '"hour": (.timestamp | tonumber | (. + (9 * 3600)) | strftime("%H"))' +
                  '}',
              },
              {
                ParameterName: 'JsonParsingEngine',
                ParameterValue: 'JQ-1.6',
              },
            ],
          },
        ],
      }
    );
  }
}

上記の MetadataExtractionQuery のパラメータ値として指定している '{ "year": (.timestamp | tonumber | (. + (9 * 3600)) | strftime("%Y")), "month": (.timestamp | tonumber | (. + (9 * 3600)) | strftime("%m")), "day": (.timestamp | tonumber | (. + (9 * 3600)) | strftime("%d")) , "hour": (.timestamp | tonumber | (. + (9 * 3600)) | strftime("%H")) }' という jq 文法は レコードに含まれる timestamp を数値に変換し(tonumber)、その値に 9 時間(秒で表現)を加え、さらに strftime 関数でフォーマットしています。その結果を dataOutputPrefix に記述した partitionKeyFromQuery 名前空間でパースして出力プレフィクスをカスタマイズしています。

CDK デプロイすると構築された配信ストリームで動的パーティショニングが設定できました。

UTC 2023-06-01 00:00:00 の UNIXエポック形式のタイムスタンプをレコードに含めてプットします。

ENCODED_DATA=$(echo -n '{"timestamp":1685577600}' | base64 | tr -d '\n')
aws firehose put-record \
  --delivery-stream-name sample-delivery-stream-2 \
  --record "{\"Data\":\"$ENCODED_DATA\"}"

Buffer interval(60秒)経過後にバケットに出力されたオブジェクトを確認してみます。

$ aws s3 ls ${bucketName}/data --recursive                        
2023-06-01 02:04:17         44 data/2023/06/01/09/sample-delivery-stream-2-2-2023-05-31-17-02-32-d6cc0e41-ead1-3c7e-beb9-1a13bcb87ee4.gz

9時間追加されたJST形式のプレフィクスで出力されていることが確認できました!

JQ-1.6 の仕様

JSON パースエンジンに使用する JQ-1.6 の仕様は下記のマニュアルで確認できます。

クエリが失敗した場合

JSONデータの形式の不備などにより、jq のクエリが失敗する場合があります。

例えば{"timestamp":2023-05-30T00:00:00Z}は、値の文字列がカッコで囲まれておらず不正な形式のJSONデータとなっています。また{"timestamp":"2023-05-30T00:00:00Z"}はJSONとしては正しいですが、jq のクエリの tonumber で数値としてパースできないためエラーとなります。

エラーの内容は配信ストリームのコンソールメニューから確認できます。

CloudWatch Logs にもイベントログとして記録されます。

またエラーとなったレコードは errorOutputPrefix で指定したプレフィクスに出力されます。

errorOutputPrefix のプレフィクスも dataOutputPrefix と同様に partitionKeyFromQuery 名前空間を使用することができるので、両者の日時プレフィクスを合わせておくと良いかと思います。

おわりに

Amazon S3 Bucket を送信先とした Kinesis Data Firehose の出力プレフィクスを、レコードから抽出したタイムスタンプデータを用いて JST 形式にする方法を確認してみました。

JQ クエリによりある程度の出力プレフィクスのカスタマイズであれば Lambda Processor を構築せずとも実現できるので、ぜひご活用ください。

参考

以上

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.