Amazon Data Firehose の S3 Destination の改行区切り文字の挿入設定を AWS CDK で構成してみた

Amazon Data Firehose の S3 Destination の改行区切り文字の挿入設定を AWS CDK で構成してみた

Clock Icon2024.02.25

こんにちは、CX 事業本部製造ビジネステクノロジー部の若槻です。

今年 2 月に、Amazon Kinesis Data Firehose は Amazon Data Firehose という名前に変更されましたね。

今回は、そんな Amazon Data Firehose の S3 Destination のレコード間の改行区切り文字の挿入(New line delimiter)設定を AWS CDK で構成してみたのでご紹介します。

やってみた

CDK アルファモジュールのインストール

CDK で Firehose の L2 Construct を使う場合はアルファモジュールを別途インストールする必要があります。

npm i -D @aws-cdk/aws-kinesisfirehose-alpha
npm i -D @aws-cdk/aws-kinesisfirehose-destinations-alpha

CDK コード

S3 バケットにデータ出力を行う Delivery Stream を作成し、その Delivery Stream に対して改行区切り文字の挿入を行う構成の CDK コードを作成します。

import {
  aws_s3,
  aws_kinesisfirehose,
  Stack,
  RemovalPolicy,
  Duration,
  CfnOutput,
} from 'aws-cdk-lib';
import { Construct } from 'constructs';
import * as aws_firehose_alpha from '@aws-cdk/aws-kinesisfirehose-alpha';
import * as aws_firehose_destinations_alpha from '@aws-cdk/aws-kinesisfirehose-destinations-alpha';

export class CdkSampleStack extends Stack {
  constructor(scope: Construct, id: string) {
    super(scope, id);

    // データ出力先の S3 バケットを作成
    const destinationBucket = new aws_s3.Bucket(this, 'DestinationBucket', {
      removalPolicy: RemovalPolicy.DESTROY,
      autoDeleteObjects: true,
    });

    // 出力先の S3 バケット名を出力
    new CfnOutput(this, 'DestinationBucketName', {
      value: destinationBucket.bucketName,
    });

    // Delivery Stream を作成
    const deliveryStream = new aws_firehose_alpha.DeliveryStream(
      this,
      'DeliveryStream',
      {
        destinations: [
          new aws_firehose_destinations_alpha.S3Bucket(destinationBucket, {
            bufferingInterval: Duration.seconds(10),
          }),
        ],
      }
    );

    // ストリーム名を出力
    new CfnOutput(this, 'DeliveryStreamName', {
      value: deliveryStream.deliveryStreamName,
    });

    // DeliveryStream の L1 Construct を取得
    const cfnDeliveryStream = deliveryStream.node
      .defaultChild as aws_kinesisfirehose.CfnDeliveryStream;

    // ProcessingConfiguration を設定
    cfnDeliveryStream.addPropertyOverride(
      'ExtendedS3DestinationConfiguration.ProcessingConfiguration',
      {
        Enabled: true,
        Processors: [
          {
            Type: 'AppendDelimiterToRecord', //レコード間に改行文字を挿入
            Parameters: [],
          },
        ],
      }
    );
  }
}

ポイントとしては、レコード間に改行文字を入れるだけであれば、AppendDelimiterToRecord タイプのプロセッサーをパラメーター無しで指定するだけです。

デプロイ

npx cdk deploy --require-approval never --method=direct

作成された delivery stream の設定を確認すると、New line delimiter(改行区切り)が有効化されています。

動作確認

put 前の S3 バケットが空であることを確認

aws s3 ls s3://${bucketName} --recursive

データを Delivery stream に put する

データが単一オブジェクト内で改行されるかどうかを確認するため、3 つのデータを Delivery stream に put します。

aws firehose put-record \
  --delivery-stream-name ${deliveryStream} \
  --cli-binary-format raw-in-base64-out \
  --record '{"Data":"{\"id\":\"u001\"}"}'
aws firehose put-record \
  --delivery-stream-name ${deliveryStream} \
  --cli-binary-format raw-in-base64-out \
  --record '{"Data":"{\"id\":\"u002\"}"}'
aws firehose put-record \
  --delivery-stream-name ${deliveryStream} \
  --cli-binary-format raw-in-base64-out \
  --record '{"Data":"{\"id\":\"u003\"}"}'

注意点としてすべてのデータを buffering interval である 10 秒以内に put するようにして下さい。これにより全てのデータが同じファイルに出力されます。

バケットに作成されたオブジェクトを確認

aws s3 ls s3://${bucketName} --recursive

オブジェクトの中身を確認

出力されたオブエジェクトの中身を確認すると、レコードとなる各 JSON 文字列が 改行されている ことが確認できました。

$ aws s3 cp s3://${bucketName}/${objectKey} -
{"id":"u001"}
{"id":"u002"}
{"id":"u003"}

改行区切りを設定しない場合

念の為、改行区切りを設定しない場合も認してみます。

CDK コード

import { aws_s3, Stack, RemovalPolicy, Duration, CfnOutput } from 'aws-cdk-lib';
import { Construct } from 'constructs';
import * as aws_firehose_alpha from '@aws-cdk/aws-kinesisfirehose-alpha';
import * as aws_firehose_destinations_alpha from '@aws-cdk/aws-kinesisfirehose-destinations-alpha';

export class CdkSampleStack extends Stack {
  constructor(scope: Construct, id: string) {
    super(scope, id);

    // データ出力先の S3 バケットを作成
    const destinationBucket = new aws_s3.Bucket(this, 'DestinationBucket', {
      removalPolicy: RemovalPolicy.DESTROY,
      autoDeleteObjects: true,
    });

    // 出力先の S3 バケット名を出力
    new CfnOutput(this, 'DestinationBucketName', {
      value: destinationBucket.bucketName,
    });

    // Delivery Stream を作成
    const deliveryStream = new aws_firehose_alpha.DeliveryStream(
      this,
      'DeliveryStream',
      {
        destinations: [
          new aws_firehose_destinations_alpha.S3Bucket(destinationBucket, {
            bufferingInterval: Duration.seconds(10),
          }),
        ],
      }
    );

    // ストリーム名を出力
    new CfnOutput(this, 'DeliveryStreamName', {
      value: deliveryStream.deliveryStreamName,
    });
  }
}

作成された delivery stream の設定を確認すると、New line delimiter(改行区切り)が無効となっています。

put 前の S3 バケットが空であることを確認

aws s3 ls s3://${bucketName} --recursive

データを delivery stream に put する

データが単一オブジェクト内で改行されるかどうかを確認するため、3 つのデータを Delivery stream に put します。

aws firehose put-record \
  --delivery-stream-name ${deliveryStream} \
  --cli-binary-format raw-in-base64-out \
  --record '{"Data":"{\"id\":\"d100\"}"}'
aws firehose put-record \
  --delivery-stream-name ${deliveryStream} \
  --cli-binary-format raw-in-base64-out \
  --record '{"Data":"{\"id\":\"d101\"}"}'
aws firehose put-record \
  --delivery-stream-name ${deliveryStream} \
  --cli-binary-format raw-in-base64-out \
  --record '{"Data":"{\"id\":\"d102\"}"}'

バケットに作成されたオブジェクトを確認

aws s3 ls s3://${bucketName} --recursive

オブジェクトの中身を確認

$ aws s3 cp s3://${bucketName}/${objectKey} -
{"id":"d100"}{"id":"d101"}{"id":"d102"}

出力されたオブエジェクトの中身を確認すると、レコードとなる各 JSON 文字列が 改行されていない ことが確認できました。

おわりに

Amazon Data Firehose の S3 Destination のレコード間の改行区切り文字の挿入(New line delimiter)設定を AWS CDK で構成してみたのでご紹介しました。

実は以前に下記ブログでも同様のことを実現する方法を紹介していましたが、その時は無駄に複雑なことをしていたため、今回の方法の方がシンプルで正攻法だと思います。

以上

この記事をシェアする

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.