AWS Step FunctionsからAmazon Kinesis Data FirehoseにJSONデータをPutする(AWS CDK)

2022.08.08

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、CX事業本部 IoT事業部の若槻です。

最近Kinesis Data Firehoseをよく触っているのですが、そうなると「Delivery streamへのデータのPutもAWS Step Functionsからやりたい!」という意欲が必然的に湧いてきていました。

そこで今回は、AWS Step FunctionsからAmazon Kinesis Data FirehoseにJSONデータをPutする実装をAWS CDKで構築してみました。

やってみた

実装

AWS CDK v2(TypeScript)で次のようなCDKスタックを作成します。

lib/process-stack.ts

import { Construct } from 'constructs';
import {
  aws_s3,
  aws_kinesisfirehose,
  aws_stepfunctions_tasks,
  aws_stepfunctions,
  Stack,
  StackProps,
  RemovalPolicy,
  Duration,
  Size,
} from 'aws-cdk-lib';
import * as firehose_alpha from '@aws-cdk/aws-kinesisfirehose-alpha';
import * as firehose_destinations_alpha from '@aws-cdk/aws-kinesisfirehose-destinations-alpha';

export class ProcessStack extends Stack {
  constructor(scope: Construct, id: string, props: StackProps) {
    super(scope, id, props);

    // データ格納バケット
    const dataBucket = new aws_s3.Bucket(this, 'dataBucket', {
      bucketName: `data-${this.account}-${this.region}`,
      removalPolicy: RemovalPolicy.DESTROY,
    });

    // Kinesis Firehose Delivery Stream
    const deliveryStream = new firehose_alpha.DeliveryStream(
      this,
      'deliveryStream',
      {
        deliveryStreamName: 'deliveryStream',
        destinations: [
          new firehose_destinations_alpha.S3Bucket(dataBucket, {
            dataOutputPrefix: 'data/!{partitionKeyFromQuery:key1}', //末尾"/"を省略するとオブジェクト名先頭にキー値が付加される
            errorOutputPrefix: 'error/!{firehose:error-output-type}/',
            bufferingInterval: Duration.seconds(60),
            bufferingSize: Size.mebibytes(64),
          }),
        ],
      }
    );

    // Delivery StreamのDynamic Partitioning設定
    const cfnDeliveryStream = deliveryStream.node
      .defaultChild as aws_kinesisfirehose.CfnDeliveryStream;
    cfnDeliveryStream.addPropertyOverride(
      'ExtendedS3DestinationConfiguration.DynamicPartitioningConfiguration',
      {
        Enabled: true,
      }
    );
    cfnDeliveryStream.addPropertyOverride(
      'ExtendedS3DestinationConfiguration.ProcessingConfiguration',
      {
        Enabled: true,
        Processors: [
          {
            Type: 'MetadataExtraction',
            Parameters: [
              {
                ParameterName: 'MetadataExtractionQuery', //パーティションキーをレコードから抽出するクエリ
                ParameterValue: '{key1: {dummy: ("-")} | .dummy}', //キー値として一律で"-"を返す
              },
              {
                ParameterName: 'JsonParsingEngine',
                ParameterValue: 'JQ-1.6', //クエリエンジンにjqパーサーを使用
              },
            ],
          },
          {
            Type: 'AppendDelimiterToRecord', //レコード間へ区切り文字を挿入
            Parameters: [
              {
                ParameterName: 'Delimiter',
                ParameterValue: '\\n', //改行コード
              },
            ],
          },
        ],
      }
    );

    // Delivery streamへレコードをPutするタスク
    const putDataToDeliveryStreamTask =
      new aws_stepfunctions_tasks.CallAwsService(
        this,
        'putDataToDeliveryStreamTask',
        {
          service: 'firehose',
          action: 'putRecord',
          parameters: {
            DeliveryStreamName: deliveryStream.deliveryStreamName,
            Record: {
              Data: aws_stepfunctions.JsonPath.jsonToString(
                aws_stepfunctions.JsonPath.objectAt('$.input.record')
              ),
            },
          },
          iamResources: [deliveryStream.deliveryStreamArn],
          iamAction: 'firehose:PutRecord',
          resultPath: aws_stepfunctions.JsonPath.DISCARD,
        }
      );

    // State Machine
    new aws_stepfunctions.StateMachine(this, 'stateMachine', {
      stateMachineName: 'stateMachine',
      definition: putDataToDeliveryStreamTask,
    });
  }
}

上記をCDK Deployしてスタックをデプロイします。これにより次のASL DefinitionのState Machineが作成されました。

ASL

{
  "StartAt": "putDataToDeliveryStreamTask",
  "States": {
    "putDataToDeliveryStreamTask": {
      "End": true,
      "Type": "Task",
      "ResultPath": null,
      "Resource": "arn:aws:states:::aws-sdk:firehose:putRecord",
      "Parameters": {
        "DeliveryStreamName": "deliveryStream",
        "Record": {
          "Data.$": "States.JsonToString($.input.record)"
        }
      }
    }
  }
}

動作確認

次のInputを指定してState Machineを実行します。

Input

{
    "input": {
      "record": {
        "id": "u001",
        "count": 1,
        "area": "Akihabara"
      }
    }
}

State Machineの実行が成功しました。

Buffer期間が経過後に出力先Bucketの内容を確認すると、オブジェクトが出力されています。

aws s3 ls s3://${BUCKET_NAME} --recursive  
2022-08-07 22:55:15         42 data/-deliveryStream-14-2022-08-07-13-52-50-37a9eb3b-9570-3175-a995-61fc9df957bc

オブジェクトの内容を確認すると、PutしたJSONデータが記録されています!

-deliveryStream-14-2022-08-07-13-52-50-37a9eb3b-9570-3175-a995-61fc9df957bc

{"id":"u001","count":1,"area":"Akihabara"}

おわりに

AWS Step FunctionsからAmazon Kinesis Data FirehoseにJSONデータをPutする構成をAWS CDKで構築してみました。

これもまた今後よくあるパターンの実装になりそうなので、テンプレートとして活用していきたいです。

参考

以上