AWS Step FunctionsからAmazon Kinesis Data FirehoseにJSONデータをPutする(AWS CDK)

AWS Step FunctionsからAmazon Kinesis Data FirehoseにJSONデータをPutする(AWS CDK)

Clock Icon2022.08.08

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、CX事業本部 IoT事業部の若槻です。

最近Kinesis Data Firehoseをよく触っているのですが、そうなると「Delivery streamへのデータのPutもAWS Step Functionsからやりたい!」という意欲が必然的に湧いてきていました。

そこで今回は、AWS Step FunctionsからAmazon Kinesis Data FirehoseにJSONデータをPutする実装をAWS CDKで構築してみました。

やってみた

実装

AWS CDK v2(TypeScript)で次のようなCDKスタックを作成します。

import { Construct } from 'constructs';
import {
  aws_s3,
  aws_kinesisfirehose,
  aws_stepfunctions_tasks,
  aws_stepfunctions,
  Stack,
  StackProps,
  RemovalPolicy,
  Duration,
  Size,
} from 'aws-cdk-lib';
import * as firehose_alpha from '@aws-cdk/aws-kinesisfirehose-alpha';
import * as firehose_destinations_alpha from '@aws-cdk/aws-kinesisfirehose-destinations-alpha';

export class ProcessStack extends Stack {
  constructor(scope: Construct, id: string, props: StackProps) {
    super(scope, id, props);

    // データ格納バケット
    const dataBucket = new aws_s3.Bucket(this, 'dataBucket', {
      bucketName: `data-${this.account}-${this.region}`,
      removalPolicy: RemovalPolicy.DESTROY,
    });

    // Kinesis Firehose Delivery Stream
    const deliveryStream = new firehose_alpha.DeliveryStream(
      this,
      'deliveryStream',
      {
        deliveryStreamName: 'deliveryStream',
        destinations: [
          new firehose_destinations_alpha.S3Bucket(dataBucket, {
            dataOutputPrefix: 'data/!{partitionKeyFromQuery:key1}', //末尾"/"を省略するとオブジェクト名先頭にキー値が付加される
            errorOutputPrefix: 'error/!{firehose:error-output-type}/',
            bufferingInterval: Duration.seconds(60),
            bufferingSize: Size.mebibytes(64),
          }),
        ],
      }
    );

    // Delivery StreamのDynamic Partitioning設定
    const cfnDeliveryStream = deliveryStream.node
      .defaultChild as aws_kinesisfirehose.CfnDeliveryStream;
    cfnDeliveryStream.addPropertyOverride(
      'ExtendedS3DestinationConfiguration.DynamicPartitioningConfiguration',
      {
        Enabled: true,
      }
    );
    cfnDeliveryStream.addPropertyOverride(
      'ExtendedS3DestinationConfiguration.ProcessingConfiguration',
      {
        Enabled: true,
        Processors: [
          {
            Type: 'MetadataExtraction',
            Parameters: [
              {
                ParameterName: 'MetadataExtractionQuery', //パーティションキーをレコードから抽出するクエリ
                ParameterValue: '{key1: {dummy: ("-")} | .dummy}', //キー値として一律で"-"を返す
              },
              {
                ParameterName: 'JsonParsingEngine',
                ParameterValue: 'JQ-1.6', //クエリエンジンにjqパーサーを使用
              },
            ],
          },
          {
            Type: 'AppendDelimiterToRecord', //レコード間へ区切り文字を挿入
            Parameters: [
              {
                ParameterName: 'Delimiter',
                ParameterValue: '\\n', //改行コード
              },
            ],
          },
        ],
      }
    );

    // Delivery streamへレコードをPutするタスク
    const putDataToDeliveryStreamTask =
      new aws_stepfunctions_tasks.CallAwsService(
        this,
        'putDataToDeliveryStreamTask',
        {
          service: 'firehose',
          action: 'putRecord',
          parameters: {
            DeliveryStreamName: deliveryStream.deliveryStreamName,
            Record: {
              Data: aws_stepfunctions.JsonPath.jsonToString(
                aws_stepfunctions.JsonPath.objectAt('$.input.record')
              ),
            },
          },
          iamResources: [deliveryStream.deliveryStreamArn],
          iamAction: 'firehose:PutRecord',
          resultPath: aws_stepfunctions.JsonPath.DISCARD,
        }
      );

    // State Machine
    new aws_stepfunctions.StateMachine(this, 'stateMachine', {
      stateMachineName: 'stateMachine',
      definition: putDataToDeliveryStreamTask,
    });
  }
}

上記をCDK Deployしてスタックをデプロイします。これにより次のASL DefinitionのState Machineが作成されました。

{
  "StartAt": "putDataToDeliveryStreamTask",
  "States": {
    "putDataToDeliveryStreamTask": {
      "End": true,
      "Type": "Task",
      "ResultPath": null,
      "Resource": "arn:aws:states:::aws-sdk:firehose:putRecord",
      "Parameters": {
        "DeliveryStreamName": "deliveryStream",
        "Record": {
          "Data.$": "States.JsonToString($.input.record)"
        }
      }
    }
  }
}

動作確認

次のInputを指定してState Machineを実行します。

{
    "input": {
      "record": {
        "id": "u001",
        "count": 1,
        "area": "Akihabara"
      }
    }
}

State Machineの実行が成功しました。

Buffer期間が経過後に出力先Bucketの内容を確認すると、オブジェクトが出力されています。

aws s3 ls s3://${BUCKET_NAME} --recursive  
2022-08-07 22:55:15         42 data/-deliveryStream-14-2022-08-07-13-52-50-37a9eb3b-9570-3175-a995-61fc9df957bc

オブジェクトの内容を確認すると、PutしたJSONデータが記録されています!

{"id":"u001","count":1,"area":"Akihabara"}

おわりに

AWS Step FunctionsからAmazon Kinesis Data FirehoseにJSONデータをPutする構成をAWS CDKで構築してみました。

これもまた今後よくあるパターンの実装になりそうなので、テンプレートとして活用していきたいです。

参考

以上

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.