Amazon Kinesis Data FirehoseでLambda Processorを使わずにレコード間に改行コードを自動挿入するアイデア(AWS CDK)

2022.08.07

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、CX事業本部 IoT事業部の若槻です。

Amazon Kinesis Data Firehoseを利用する際には、Delivery streamにPutしたJSON形式のデータがAmazon Athenaでクエリ可能なJSON Lines形式で出力されるように、次のいずれかの対応を取ることが多いかと思います。

しかし前者の対応だとクライアント側で対処が難しい場合もあります。また後者の対応はProcessor用のLambda関数を実装する必要がありIaC含めた実装コストが掛かります。

今回は、Amazon Kinesis Data Firehoseでレコード間に改行コードを自動挿入するための、上記2つとは異なる第3の方法をAWS CDKのテンプレートとともにご紹介します。

方法

Amazon Kinesis Data Firehoseでは、S3 Bucketに出力されるデータのnamespace(プレフィックス)として、レコード毎に指定のキー値を使用したプレフィクスを付与できるDynamic Partitioningという機能が利用できます。

このDynamic Partitioningでは付随する機能としてレコード間に任意の区切り記号を自動挿入でき、これを改行コードの挿入に利用できないか?というのが今回のアイデアです。

この区切り文字自動挿入の機能は単独利用はできない(詳細後述)のですが、フォルダ(プレフィクス)に対するDynamic Partitioningが不要なケースでも対応できるように、次の2つの観点で方法を確認してみました。

  • フォルダ(プレフィクス)に対するDynamic Partitioningを利用する場合
  • フォルダ(プレフィクス)に対するDynamic Partitioningが不要な場合

フォルダ(プレフィクス)に対するDynamic Partitioningを利用する場合

Dynamic Partitioningの機能を利用したい場合は、前回の以下エントリで紹介した方法がそのまま利用できます。

これにより例えばareaキーに対するDynamic Partitioningを設定したDelivery streamに対して次のようなJSON形式のレコードを個別にPutした場合。

{"id":"u001","count":3,"area":"Shinjuku"}
{"id":"u001","count":1,"area":"Akihabara"}
{"id":"u002","count":5,"area":"Ikebukuro"}
{"id":"u002","count":2,"area":"Akihabara"}
{"id":"u003","count":4,"area":"Shinjuku"}

出力先のS3 Bucketにはdata/${area}というフォルダ(プレフィクス)でパーティション値毎にオブジェクトが格納され、その内容はレコードが改行区切りとなったJSON Linesとなっています。

data/Akihabara/deliveryStream-5-2022-08-06-14-24-26-7ff44042-2556-3ac1-b55a-ab0f515d6224

{"id":"u001","count":1,"area":"Akihabara"}
{"id":"u002","count":2,"area":"Akihabara"}

data/Ikebukuro/deliveryStream-5-2022-08-06-14-24-27-c53b6edb-987c-37d9-abae-a67919e82275

{"id":"u002","count":5,"area":"Ikebukuro"}

data/Shinjuku/deliveryStream-5-2022-08-06-14-24-25-6d70c97c-8bdf-3206-a588-53cd087befaf

{"id":"u001","count":3,"area":"Shinjuku"}
{"id":"u003","count":4,"area":"Shinjuku"}

Dynamic Partitioningを利用する場合はこれで良さそうですね。

フォルダ(プレフィクス)に対するDynamic Partitioningが不要な場合

一方でフォルダ(プレフィクス)に対するDynamic Partitioningが不要な場合。もしかするとこちらのケースの方が多いかも知れません。

その場合はDynamic Partitioningで次のような実装を行います。

lib/process-stack.ts

import { Construct } from 'constructs';
import {
  aws_s3,
  aws_athena,
  aws_kinesisfirehose,
  Stack,
  StackProps,
  RemovalPolicy,
  Duration,
  Size,
} from 'aws-cdk-lib';
import * as glue_alpha from '@aws-cdk/aws-glue-alpha';
import * as firehose_alpha from '@aws-cdk/aws-kinesisfirehose-alpha';
import * as firehose_destinations_alpha from '@aws-cdk/aws-kinesisfirehose-destinations-alpha';

export class ProcessStack extends Stack {
  constructor(scope: Construct, id: string, props: StackProps) {
    super(scope, id, props);

    // データソース格納バケット
    const dataBucket = new aws_s3.Bucket(this, 'dataBucket', {
      bucketName: `data-${this.account}-${this.region}`,
      removalPolicy: RemovalPolicy.DESTROY,
    });

    // Athenaクエリ結果格納バケット
    const athenaQueryResultBucket = new aws_s3.Bucket(
      this,
      'athenaQueryResultBucket',
      {
        bucketName: `athena-query-result-${this.account}`,
        removalPolicy: RemovalPolicy.DESTROY,
      }
    );

    // データカタログ
    const dataCatalog = new glue_alpha.Database(this, 'dataCatalog', {
      databaseName: 'data_catalog',
    });

    // データカタログテーブル
    new glue_alpha.Table(this, 'dataCatalogTable', {
      tableName: 'data_catalog_table',
      database: dataCatalog,
      bucket: dataBucket,
      s3Prefix: 'data/',
      dataFormat: glue_alpha.DataFormat.JSON,
      columns: [
        {
          name: 'id',
          type: glue_alpha.Schema.STRING,
        },
        {
          name: 'count',
          type: glue_alpha.Schema.INTEGER,
        },
        {
          name: 'area',
          type: glue_alpha.Schema.STRING,
        },
      ],
    });

    // Athenaワークグループ
    new aws_athena.CfnWorkGroup(this, 'athenaWorkGroup', {
      name: 'athenaWorkGroup',
      workGroupConfiguration: {
        resultConfiguration: {
          outputLocation: `s3://${athenaQueryResultBucket.bucketName}/result-data`,
        },
      },
      recursiveDeleteOption: true,
    });

    // Kinesis Firehose Delivery Stream
    const deliveryStream = new firehose_alpha.DeliveryStream(
      this,
      'deliveryStream',
      {
        deliveryStreamName: 'deliveryStream',
        destinations: [
          new firehose_destinations_alpha.S3Bucket(dataBucket, {
            dataOutputPrefix: 'data/!{partitionKeyFromQuery:key1}', //末尾"/"を省略するとオブジェクト名先頭にキー値が付加される
            errorOutputPrefix: 'error/!{firehose:error-output-type}/',
            bufferingInterval: Duration.seconds(60),
            bufferingSize: Size.mebibytes(64),
          }),
        ],
      }
    );

    // Delivery StreamのDynamic Partitioning設定
    const cfnDeliveryStream = deliveryStream.node
      .defaultChild as aws_kinesisfirehose.CfnDeliveryStream;
    cfnDeliveryStream.addPropertyOverride(
      'ExtendedS3DestinationConfiguration.DynamicPartitioningConfiguration',
      {
        Enabled: true,
      }
    );
    cfnDeliveryStream.addPropertyOverride(
      'ExtendedS3DestinationConfiguration.ProcessingConfiguration',
      {
        Enabled: true,
        Processors: [
          {
            Type: 'MetadataExtraction',
            Parameters: [
              {
                ParameterName: 'MetadataExtractionQuery', //パーティションキーをレコードから抽出するクエリ
                ParameterValue:
                  '{key1: {dummy: ("-")} | .dummy}', //全てのレコードでキー値として"-"を返す
              },
              {
                ParameterName: 'JsonParsingEngine',
                ParameterValue: 'JQ-1.6', //クエリエンジンにjqを使用
              },
            ],
          },
          {
            Type: 'AppendDelimiterToRecord', //レコード間の区切り記号の挿入
            Parameters: [
              {
                ParameterName: 'Delimiter',
                ParameterValue: '\\n', //改行コードを挿入
              },
            ],
          },
        ],
      }
    );
  }
}

Dynamic Partitioningを利用する場合とのパラメータ上の変更点が主に2箇所あります。

  • 83行目のdataOutputPrefixの指定で、data/!{partitionKeyFromQuery:key1}として末尾の/を省略することにより、パーティションキーの値がオブジェクト名の先頭に付与されるようになります。これにより出力されたデータをクエリする側(Glue,Athena)でパーティションの余計な考慮は必要なくなります。
  • 前述の指定によりパーティションキーがフォルダ名に使用されなくなるため、パーティションキーの値は正直何でもよくなります。よって、112行目では全てのレコードで固定のキー値として-を返すように{key1: {dummyKey: ("-")} | .dummyVal}という指定をしています。
#レコードの内容によらず、固定のキー値"-"が返る
$ echo '{"key": "val"}' | jq '{dummyKey: ("-")} | .dummyKey'
"-"

上記をCDK Deployしたら動作確認をしてみます。

Delivery streamに対して下記のデータを連続でPutします。

aws firehose put-record \
  --delivery-stream-name deliveryStream \
  --cli-binary-format raw-in-base64-out \
  --record '{"Data":"{\"id\":\"u001\",\"count\":3,\"area\":\"Shinjuku\"}"}'
aws firehose put-record \
  --delivery-stream-name deliveryStream \
  --cli-binary-format raw-in-base64-out \
  --record '{"Data":"{\"id\":\"u001\",\"count\":1,\"area\":\"Akihabara\"}"}'
aws firehose put-record \
  --delivery-stream-name deliveryStream \
  --cli-binary-format raw-in-base64-out \
  --record '{"Data":"{\"id\":\"u002\",\"count\":5,\"area\":\"Ikebukuro\"}"}'
aws firehose put-record \
  --delivery-stream-name deliveryStream \
  --cli-binary-format raw-in-base64-out \
  --record '{"Data":"{\"id\":\"u002\",\"count\":2,\"area\":\"Akihabara\"}"}'
aws firehose put-record \
  --delivery-stream-name deliveryStream \
  --cli-binary-format raw-in-base64-out \
  --record '{"Data":"{\"id\":\"u003\",\"count\":4,\"area\":\"Shinjuku\"}"}'

出力先Bucketの内容を確認すると、プレフィクスとしてオブジェクト名先頭に-が付与されたオブジェクトが出力されています。

$ aws s3 ls s3://${BUCKET_NAME} --recursive  
2022-08-07 19:26:37        212 data/-deliveryStream-11-2022-08-07-10-24-12-03105d79-867c-3a5f-aa33-ba46de8d7e59

オブジェクトをダウンロードして開くと、ちゃんと改行コード区切りのJSON Lines形式になっています!

-deliveryStream-11-2022-08-07-10-24-12-03105d79-867c-3a5f-aa33-ba46de8d7e59

{"id":"u001","count":3,"area":"Shinjuku"}
{"id":"u001","count":1,"area":"Akihabara"}
{"id":"u002","count":5,"area":"Ikebukuro"}
{"id":"u002","count":2,"area":"Akihabara"}
{"id":"u003","count":4,"area":"Shinjuku"}

よってAthenaからのクエリでもちゃんと全レコードを取得することができました!

SELECT * FROM "data_catalog"."data_catalog_table" limit 10;

注意点

Dynamic Partitioning設定時はプレフィクスにキーを必ず含める必要がある

わざわざオブジェクト名先頭にキー値を含めなくて良さそうな気もしますね。

lib/process-stack.ts

    // Kinesis Firehose Delivery Stream
    const deliveryStream = new firehose_alpha.DeliveryStream(
      this,
      'deliveryStream',
      {
        deliveryStreamName: 'deliveryStream',
        destinations: [
          new firehose_destinations_alpha.S3Bucket(dataBucket, {
            //dataOutputPrefix: 'data/!{partitionKeyFromQuery:key1}',
            dataOutputPrefix: 'data/',
            errorOutputPrefix: 'error/!{firehose:error-output-type}/',
            bufferingInterval: Duration.seconds(60),
            bufferingSize: Size.mebibytes(64),
          }),
        ],
      }
    );

しかし上記のようにしてCDK DeployするとS3 Prefix should contain Dynamic Partitioning namespaces when Dynamic Partitioning is enabledというエラーとなります。

$ cdk deploy ProcessStack

✨  Synthesis time: 3.68s

ProcessStack: deploying...
[0%] start: Publishing c7a2755120b90491b73493d8f5689b26ec8850f7caf2c600ef1fa9820ebaf012:current_account-ap-northeast-1
[100%] success: Published c7a2755120b90491b73493d8f5689b26ec8850f7caf2c600ef1fa9820ebaf012:current_account-ap-northeast-1
ProcessStack: creating CloudFormation changeset...
7:41:00 PM | UPDATE_FAILED        | AWS::KinesisFirehose::DeliveryStream | deliveryStream7D22820B
Resource handler returned message: "S3 Prefix should contain Dynamic Partitioning namespaces when Dynamic Partitioning is
enabled (Service: Firehose, Status Code: 400, Request ID: fd98c6f6-2298-1621-a6f1-5e0d7f19b8f2, Extended Request ID: 9gAL7
J9nP0LWtmQ6VJEFGDs9W3gdJZcGAxnCKTgW5XCcpqwpGC7Vwnel9trA+o7o1hjeMzYZQZM35OYaKk4w64Oe8IFfk6Qf)" (RequestToken: fef60b0d-d1b2
-4f9a-fba3-02eaff8c923f, HandlerErrorCode: InvalidRequest)

Dynamic Partitioning設定時はプレフィクスにキーを必ず含める必要があるようです。

レコード間の区切り記号の自動挿入のみを利用することはできない

ではProcessorとしてMetadataExtractionの設定を行わず、AppendDelimiterToRecordのみ追加した場合はどうでしょうか。

lib/process-stack.ts

    cfnDeliveryStream.addPropertyOverride(
      'ExtendedS3DestinationConfiguration.ProcessingConfiguration',
      {
        Enabled: true,
        Processors: [
          /*
          {
            Type: 'MetadataExtraction',
            Parameters: [
              {
                ParameterName: 'MetadataExtractionQuery', //パーティションキーをレコードから抽出するクエリ
                ParameterValue:
                  '{key1: {dummy: ("-")} | .dummy}', //全てのレコードでキー値として"-"を返す
              },
              {
                ParameterName: 'JsonParsingEngine',
                ParameterValue: 'JQ-1.6', //クエリエンジンにjqを使用
              },
            ],
          },
          */
          {
            Type: 'AppendDelimiterToRecord', //レコード間の区切り記号の挿入
            Parameters: [
              {
                ParameterName: 'Delimiter',
                ParameterValue: '\\n', //改行コードを挿入
              },
            ],
          },
        ],
      }
    );

この場合はCDK DeployするとMetadataExtraction processor should be present when S3 Prefix has partitionKeyFromQuer y namespace.というエラーとなります。

$ cdk deploy ProcessStack

✨  Synthesis time: 2.84s

ProcessStack: deploying...
[0%] start: Publishing 7ce817b4e584f994374681cb3fc93f5555d6d0a7c58a6d0c20974b4c86059c04:current_account-ap-northeast-1
[100%] success: Published 7ce817b4e584f994374681cb3fc93f5555d6d0a7c58a6d0c20974b4c86059c04:current_account-ap-northeast-1
ProcessStack: creating CloudFormation changeset...
7:49:06 PM | UPDATE_FAILED        | AWS::KinesisFirehose::DeliveryStream | deliveryStream7D22820B
Resource handler returned message: "MetadataExtraction processor should be present when S3 Prefix has partitionKeyFromQuer
y namespace. (Service: Firehose, Status Code: 400, Request ID: e418df94-d1d9-933d-bf71-45758e2828a3, Extended Request ID:
HlviRU2SQiWhsJWrKhaOtMC2j5PKEqIukkH4rvwqvqD/css/i5JUiuLuoQGPqR3n13eLP6nL4Hhq94uoym8W4jEx8/VsdID3)" (RequestToken: 187ecd39
-2d51-faa2-6736-a92d37e60a45, HandlerErrorCode: InvalidRequest)

レコード間の区切り記号の自動挿入のみを利用することはできないようです。

Dynamic Partitioningのキー値は空文字とすることはできない。

今回プレフィクスに使用する値としてハイフン-を指定しましたが、'{key1: {dummy: ("")} | .dummy}'というクエリで空文字を返すのは出来ないのでしょうか。

lib/process-stack.ts

    cfnDeliveryStream.addPropertyOverride(
      'ExtendedS3DestinationConfiguration.ProcessingConfiguration',
      {
        Enabled: true,
        Processors: [
          {
            Type: 'MetadataExtraction',
            Parameters: [
              {
                ParameterName: 'MetadataExtractionQuery', //パーティションキーをレコードから抽出するクエリ
                ParameterValue:
                  //'{key1: {dummy: ("-")} | .dummy}',
                  '{key1: {dummy: ("")} | .dummy}',
              },
              {
                ParameterName: 'JsonParsingEngine',
                ParameterValue: 'JQ-1.6', //クエリエンジンにjqを使用
              },
            ],
          },
          {
            Type: 'AppendDelimiterToRecord', //レコード間の区切り記号の挿入
            Parameters: [
              {
                ParameterName: 'Delimiter',
                ParameterValue: '\\n', //改行コードを挿入
              },
            ],
          },
        ],
      }
    );

上記の実装とした場合は、Putしたレコードが次のようなpartitionKeys values must not be null or emptyというエラーとなり正常に配信されなくなります。

{
  "attemptsMade": 1,
  "arrivalTimestamp": 1659819856785,
  "errorCode": "DynamicPartitioning.MetadataExtractionFailed",
  "errorMessage": "partitionKeys values must not be null or empty",
  "attemptEndingTimestamp": 1659819922676,
  "rawData": "eyJpZCI6InUwMDEiLCJjb3VudCI6MywiYXJlYSI6IlNoaW5qdWt1In0="
}

パーティションキーの値は必ず空文字以外の文字列とする必要があるようです。

オブジェクト名の先頭が"_"だとAthenaのクエリ対象とならない

空文字がだめならと当初アンダースコア_をパーティションキーの値にしようとしました。Putしたデータの配信までは問題なく行われました。

$ aws s3 ls s3://${BUCKET_NAME} --recursive  
2022-08-07 19:26:37        212 data/_deliveryStream-11-2022-08-07-10-24-12-03105d79-867c-3a5f-aa33-ba46de8d7e59

後はAthenaでクエリするだけで良さそでしたが、上記オブジェクトに記載されたレコードをSELECTクエリで取得できませんでした。どうやらオブジェクト名の先頭が_だとAthenaのクエリ対象とならないようです。

そこで今回は代わりにハイフン-を使う実装としました。

おわりに

Amazon Kinesis Data FirehoseでLambda Processorを使わずにレコード間に改行コードを自動挿入するアイデアのご紹介でした。

もしかしたら正攻法ではないかも知れませんが実装コストは下がりそうです。今後のアップデートでレコード間の区切り記号の挿入機能だけを単独で使えるアップデートがあれば嬉しいですね。

参考

以上