Kinesis Data FirehoseからS3に送信するオブジェクトに付くプレフィックスをJSTにするため動的パーティショニングを使ってみた

2023.04.27

FIrehoseからS3にデータ送る場合、自動で以下のようなUTCの日付と時刻のプレフィックスがオブジェクトキーに追加されます。

/YYYY/MM/DD/HH/オブジェクト名

データをAthenaでクエリする際に、このプレフィックスを使って範囲を指定したいのですが、UTCだと日本時間(JST)とずれてしまい使いづらいなと考えてました。 以下の画像にあるようにデータの送信時刻とプレフィックスが9時間ずれています。実際のデータを送った時刻は2023/04/26 PM6時(18時)ですが、プレフィックスは2023/04/26 AM9時になっています。

この9時間のズレを補正した上でプレフィックスとして時刻を使うには何かいい方法がないかと調べると、動的パーティショニングというものがあることを知りました。 動的パーティショニングを有効にすると送信するデータを元にパーティショニングキーを自動で生成してプレフィックスを作成してくれます。 ちょうど送信するデータにデータ取得日(日本時間)としてtimestampが含まれていたので、このデータを使って動的パーティショニングをやってみました。 送りたいデータは以下のような形です。

{"temperature": "23.10", "pressure": "1014.66", "humidity": "49.91", "timestamp": "20230424001331"}

動的パーティショニングについての公式のドキュメントは以下になります。

今回CDKを使って実装してみましたので、その際の手順とコードを紹介したいと思います。

CDKのコード

CDKのコードは以下のようになります。

import {
  Stack,
  StackProps,
  RemovalPolicy,
  aws_s3,
  aws_kinesisfirehose,
  aws_iam
} from 'aws-cdk-lib';
import { Construct } from 'constructs';

export class sampleDynamicPartitionStack extends Stack {
  constructor(
    scope: Construct,
    id: string,
    props: StackProps,
  ) {
    super(scope, id, props);

    // データ保存用のS3バケットの作成
    const dynamicPartitioningDataBucket = new aws_s3.Bucket(
      this,
      'Dynamic partitioning data bucket',
      {
        bucketName: `dynamic-partitioning-data-${Stack.of(this).account}`,
        removalPolicy: RemovalPolicy.DESTROY,
        encryption: aws_s3.BucketEncryption.S3_MANAGED,
        blockPublicAccess: aws_s3.BlockPublicAccess.BLOCK_ALL,
        accessControl: aws_s3.BucketAccessControl.PRIVATE,
        autoDeleteObjects: true,
      },
    );

    // firehose用のrole
    const firehoseDeliveryStreamRole = new aws_iam.Role(this, "firehoseRole", {
      assumedBy: new aws_iam.ServicePrincipal('firehose.amazonaws.com')
    });

    // S3へのアクセス権限を付与
    firehoseDeliveryStreamRole.addToPolicy(new aws_iam.PolicyStatement({
      effect: aws_iam.Effect.ALLOW,
      resources: [
        dynamicPartitioningDataBucket.bucketArn,
        `${dynamicPartitioningDataBucket.bucketArn}/*`
      ],
      actions: ['s3:AbortMultipartUpload', 's3:GetBucketLocation', 's3:GetObject', 's3:ListBucket', 's3:ListBucketMultipartUploads', 's3:PutObject'],
    }));

    new aws_kinesisfirehose.CfnDeliveryStream(this, 'Delivery Stream', {
      deliveryStreamName: 'sample-delivery-stream',
      deliveryStreamType: 'DirectPut', // 直接データを送信する場合はDirectPut。kinesis streamからデータを送信する場合はKinesisStreamAsSource
      extendedS3DestinationConfiguration: {
        bucketArn: dynamicPartitioningDataBucket.bucketArn,
        bufferingHints: {
          intervalInSeconds: 60,// 60秒ごとにまとめてデータを送信
          sizeInMBs: 64, //Dynamic partitioning を有効にする場合は64MB以上にする必要がある
        },
        errorOutputPrefix: 'error/',
        prefix: 'sample_data/!{partitionKeyFromQuery:year}/!{partitionKeyFromQuery:month}/!{partitionKeyFromQuery:day}/!{partitionKeyFromQuery:hour}/',// プレフィックスの指定。!{partitionKeyFromQuery:パーティショニングキー名}でパーティショニングキーを指定できる。
        roleArn:firehoseDeliveryStreamRole.roleArn,
        dynamicPartitioningConfiguration: {
          enabled: true,
        },
        processingConfiguration: {
          enabled: true,
          processors: [
            {
              type: 'MetadataExtraction',
              parameters: [
                {
                  parameterName: 'MetadataExtractionQuery',
                  parameterValue: '{year: .timestamp[:4],month: .timestamp[4:6],day: .timestamp[6:8],hour: .timestamp[8:10]}', // パーティショニングキーとその値の指定
                },
                {
                  parameterName: 'JsonParsingEngine',
                  parameterValue: 'JQ-1.6',// パーティショニングキーのデータ抽出にはJQを使用する
                },
              ],
            },
            {
              type: 'AppendDelimiterToRecord',
              parameters: [
                {
                  parameterName: 'Delimiter',
                  parameterValue: '\\n',
                },
              ],
            },
          ],
        },
      },
    });
  }
}

コードの説明

Kinesis Data Firehoseに付与するS3を使用する権限

Kinesis Data Firehose(delivery Stream)に付与するS3を使用する権限は以下のようになります。 こちらは以下のAWS公式情報を参考にしました。

私はたまに抜けてしまうのですが、今回のようにオブジェクトを操作する権限の場合はリソース指定のところにarn:aws:s3:::bucket-name/*を書くのを忘れないようお願いします。

firehoseDeliveryStreamRole.addToPolicy(new aws_iam.PolicyStatement({
  effect: aws_iam.Effect.ALLOW,
  resources: [
    dynamicPartitioningDataBucket.bucketArn,
    `${dynamicPartitioningDataBucket.bucketArn}/*`
  ],
  actions: ['s3:AbortMultipartUpload', 's3:GetBucketLocation', 's3:GetObject', 's3:ListBucket', 's3:ListBucketMultipartUploads', 's3:PutObject'],
}));

Delivery Streamの定義の中で動的パーティションのプレフィックスの抽出を指定

動的パーティショニングを定義するには、ExtendedS3DestinationConfigurationというプロパティの配下でprefixdynamicPartitioningConfigurationを以下のように設定して行きます。

prefix

プレフィックスに使うパーティショニングキーの指定は!{partitionKeyFromQuery:パーティショニングキー名}でできます。 公式情報はこちら↓

今回はtimestampの文字列からデータを抽出してそれぞれ、year、month、day、hourとしてパーティショニングキーに指定しています。

prefix: 'sample_data/!{partitionKeyFromQuery:year}/!{partitionKeyFromQuery:month}/!{partitionKeyFromQuery:day}/!{partitionKeyFromQuery:hour}/',

dynamicPartitioningConfiguration

動的パーティショニングを有効にするためにはenabledtrueに設定します。

dynamicPartitioningConfiguration: {
  enabled: true,
},

パーティション用のデータの抽出

データの抽出にはprocessingConfigurationの配下でどのデータのどの部分を抽出してパーティショニングのキーに使用するかを設定します。

MetadataExtraction

MetadataExtractionはパーティショニングキーの値となるデータをどのように抽出するか設定します。 timestampの先頭4文字をyear、5文字目から6文字目をmonth、7文字目から8文字目をday、9文字目から10文字目をhourとしてパーティショニングキーを指定しています。 MetadataExtractionQueryのparameterValueにその抽出内容を記載します。 取得元のtimestampのデータは以下のような形式となっています。

"timestamp": "20230424001331"

JsonParsingEngineMetadataExtractionでデータの抽出に使うJSONパーサーを指定します。JQ-1.6を使用しています。

type: 'MetadataExtraction',
parameters: [
  {
    parameterName: 'MetadataExtractionQuery',
    parameterValue: '{year: .timestamp[:4],month: .timestamp[4:6],day: .timestamp[6:8],hour: .timestamp[8:10]}',
  },
  {
    parameterName: 'JsonParsingEngine',
    parameterValue: 'JQ-1.6',
  },
],

AppendDelimiterToRecord

AppendDelimiterToRecordはデータの区切り文字を指定します。今回は改行コード\\nを指定しています。 この指定をしない場合、S3に送られたJSONデータが1続きの以下のようなデータとなってしまい、Athenaなどからクエリができなくなるので、1つ一つのデータを区切るために使用しています。

{"timestamp":"20211001120000","message":"test"}{"timestamp":"20211001120000","message":"test"}・・・・

CDKの指定部分

{
  parameterName: 'Delimiter',
  parameterValue: '\\n',
},

動作確認

データの送信

データの送信にはAWS CLIを使って以下のコマンドで連続してJSONデータを送信します。

$ aws firehose put-record \
  --delivery-stream-name sample-delivery-stream \
  --record '{"Data": "{\"temperature\": \"23.10\", \"pressure\": \"1014.66\", \"humidity\": \"49.91\", \"timestamp\":\"20230426181331\"}"}'
$ aws firehose put-record \
  --delivery-stream-name sample-delivery-stream \
  --record '{"Data": "{\"temperature\": \"23.09\", \"pressure\": \"1014.68\", \"humidity\": \"49.75\", \"timestamp\":\"20230426181341\"}"}'
$ aws firehose put-record \
  --delivery-stream-name sample-delivery-stream \
  --record '{"Data": "{\"temperature\": \"23.09\", \"pressure\": \"1014.73\", \"humidity\": \"49.87\", \"timestamp\":\"20230426181351\"}"}'
$ aws firehose put-record \
  --delivery-stream-name sample-delivery-stream \
  --record '{"Data": "{\"temperature\": \"23.09\", \"pressure\": \"1014.63\", \"humidity\": \"49.86\", \"timestamp\":\"20230426181402\"}"}'

データの確認

プレフィックスを、先程Firehoseに送ったデータのtimestampの通り/2023/04/26/18/としてlsコマンドを打ってみると、データがS3に届いている事が確認できました。

$ aws s3 ls s3://dynamic-partitioning-data-xxxxxxxxxxxx/sample_data/2023/04/26/18/

2023-04-26 21:44:16        395 sample-delivery-stream-9-2023-04-26-12-41-53-03f51a99-95de-3a71-af84-170e4a9ce30a

オブジェクトをコンソールからダウンロードすると以下のような内容となっており、Firehoseに送信した通りの内容のデータがS3に届いている事が確認できました。

{"temperature": "23.10", "pressure": "1014.66", "humidity": "49.91", "timestamp":"20230426181331"}
{"temperature": "23.09", "pressure": "1014.68", "humidity": "49.75", "timestamp":"20230426181341"}
{"temperature": "23.09", "pressure": "1014.73", "humidity": "49.87", "timestamp":"20230426181351"}
{"temperature": "23.09", "pressure": "1014.63", "humidity": "49.86", "timestamp":"20230426181402"}

注意点

  • 動的パーティショニングはFirehose作成時のみ有効にできる機能で、作成後に有効にできません。
  • 動的パーティショニングを使用するとJQの処理などが発生する関係で追加の課金が発生します、詳しくは以下の公式情報を参照してください。
    動的パーティショニングの料金 (オプション)

以上。