AWS Step FunctionsからAmazon Kinesis Data Firehoseに複数のJSONデータをPutRecordBatch APIでPutする(AWS CDK)

2022.08.10

こんにちは、CX事業本部 IoT事業部の若槻です。

前回および前々回のエントリで、Step FunctionsからKinesis Data FirehoseへのJSONデータのPutをしてみました。

その際に使用したAPIはPutRecordでした。PutRecordはリクエスト毎に1レコードしかPutできないため、複数レコードをPutしたい場合はPut毎にMap Stateを回すことになりますが、レコード数が多いと非効率です。

そこで今回は、AWS Step FunctionsからAmazon Kinesis Data Firehoseに、複数のJSONデータをPutRecordBatch APIで一括でPutする構成をAWS CDKで作ってみました。

やってみた

実装

AWS CDK v2(TypeScript)で次のようなCDKスタックを作成します。

lib/process-stack.ts

import { Construct } from 'constructs';
import {
  aws_s3,
  aws_stepfunctions_tasks,
  aws_stepfunctions,
  Stack,
  StackProps,
  RemovalPolicy,
  Duration,
  Size,
} from 'aws-cdk-lib';
import * as firehose_alpha from '@aws-cdk/aws-kinesisfirehose-alpha';
import * as firehose_destinations_alpha from '@aws-cdk/aws-kinesisfirehose-destinations-alpha';

export class ProcessStack extends Stack {
  constructor(scope: Construct, id: string, props: StackProps) {
    super(scope, id, props);

    // データ格納バケット
    const dataBucket = new aws_s3.Bucket(this, 'dataBucket', {
      bucketName: `data-${this.account}-${this.region}`,
      removalPolicy: RemovalPolicy.DESTROY,
    });

    // Kinesis Firehose Delivery Stream
    const deliveryStream = new firehose_alpha.DeliveryStream(
      this,
      'deliveryStream',
      {
        deliveryStreamName: 'deliveryStream',
        destinations: [
          new firehose_destinations_alpha.S3Bucket(dataBucket, {
            dataOutputPrefix: 'data/',
            errorOutputPrefix: 'error/!{firehose:error-output-type}/',
            bufferingInterval: Duration.seconds(60),
            bufferingSize: Size.mebibytes(64),
          }),
        ],
      }
    );

    // 改行コード文字列の取得
    const getNewLineCodeTask = new aws_stepfunctions.Pass(
      this,
      'getNewLineCodeTask',
      {
        parameters: {
          value: '\n',
        },
        resultPath: '$.getNewLineCodeTaskOutPut',
      }
    );

    // レコードデータの配列を取得
    const getRecordDataMap = new aws_stepfunctions.Map(
      this,
      'getRecordDataMapTask',
      {
        itemsPath: aws_stepfunctions.JsonPath.stringAt('$.input.items'),
        parameters: {
          item: aws_stepfunctions.JsonPath.stringAt('$$.Map.Item.Value'),
          newLineCode: aws_stepfunctions.JsonPath.stringAt(
            '$.getNewLineCodeTaskOutPut.value'
          ),
        },
        resultPath: '$.getRecordDataMapOutPut',
      }
    ).iterator(
      new aws_stepfunctions.Pass(this, 'getRecordDataPass', {
        parameters: {
          Data: aws_stepfunctions.JsonPath.format(
            '{}{}',
            aws_stepfunctions.JsonPath.jsonToString(
              aws_stepfunctions.JsonPath.stringAt('$.item')
            ),
            aws_stepfunctions.JsonPath.stringAt('$.newLineCode')
          ),
        },
        resultPath: '$.getRecordDataPassOutPut',
        outputPath: aws_stepfunctions.JsonPath.stringAt(
          '$.getRecordDataPassOutPut'
        ),
      })
    );

    // Delivery streamへレコードをPutするタスク
    const putRecordBatchTask = new aws_stepfunctions_tasks.CallAwsService(
      this,
      'putRecordBatchTask',
      {
        service: 'firehose',
        action: 'putRecordBatch',
        parameters: {
          DeliveryStreamName: deliveryStream.deliveryStreamName,
          Records: aws_stepfunctions.JsonPath.stringAt(
            '$.getRecordDataMapOutPut[*]'
          ),
        },
        iamResources: [deliveryStream.deliveryStreamArn],
        iamAction: 'firehose:putRecordBatch',
        resultPath: aws_stepfunctions.JsonPath.DISCARD,
      }
    );

    // State Machine
    new aws_stepfunctions.StateMachine(this, 'stateMachine', {
      stateMachineName: 'stateMachine',
      definition: getNewLineCodeTask
        .next(getRecordDataMap)
        .next(putRecordBatchTask),
    });
  }
}
  • Map State(getRecordDataMap)を使用してJSONデータの配列を、JSON文字列 + 改行コードという配列の形式に変換しているところが肝となります。

上記をCDK Deployしてスタックをデプロイすると、次の定義のState Machineが作成されます。

Definition

{
  "StartAt": "getNewLineCodeTask",
  "States": {
    "getNewLineCodeTask": {
      "Type": "Pass",
      "ResultPath": "$.getNewLineCodeTaskOutPut",
      "Parameters": {
        "value": "\n"
      },
      "Next": "getRecordDataMapTask"
    },
    "getRecordDataMapTask": {
      "Type": "Map",
      "ResultPath": "$.getRecordDataMapOutPut",
      "Next": "putRecordBatchTask",
      "Parameters": {
        "item.$": "$$.Map.Item.Value",
        "newLineCode.$": "$.getNewLineCodeTaskOutPut.value"
      },
      "Iterator": {
        "StartAt": "getRecordDataPass",
        "States": {
          "getRecordDataPass": {
            "Type": "Pass",
            "ResultPath": "$.getRecordDataPassOutPut",
            "Parameters": {
              "Data.$": "States.Format('{}{}', States.JsonToString($.item), $.newLineCode)"
            },
            "OutputPath": "$.getRecordDataPassOutPut",
            "End": true
          }
        }
      },
      "ItemsPath": "$.input.items"
    },
    "putRecordBatchTask": {
      "End": true,
      "Type": "Task",
      "ResultPath": null,
      "Resource": "arn:aws:states:::aws-sdk:firehose:putRecordBatch",
      "Parameters": {
        "DeliveryStreamName": "deliveryStream",
        "Records.$": "$.getRecordDataMapOutPut[*]"
      }
    }
  }
}

動作確認

次のInputを指定してState Machineを実行します。

Input

{
  "input": {
    "items": [
      {
        "id": "u001",
        "count": 1,
        "area": "Akihabara"
      },
      {
        "id": "u002",
        "count": 3,
        "area": "Ikebukuro"
      },
      {
        "id": "u003",
        "count": 2,
        "area": "Shinjuku"
      }
    ]
  }
}

State Machine実行が成功しました。

次のParameterを使用してPutRecordBatch APIが叩かれました。

Parameter

{
  "DeliveryStreamName": "deliveryStream",
  "Records": [
    {
      "Data": "{\"id\":\"u001\",\"count\":1,\"area\":\"Akihabara\"}\n"
    },
    {
      "Data": "{\"id\":\"u002\",\"count\":3,\"area\":\"Ikebukuro\"}\n"
    },
    {
      "Data": "{\"id\":\"u003\",\"count\":2,\"area\":\"Shinjuku\"}\n"
    }
  ]
}

Buffer期間が経過後に出力先Bucketの内容を確認すると、オブジェクトが1つ出力されています。

aws s3 ls s3://${BUCKET_NAME} --recursive   
2022-08-10 22:56:34        128 data/2022/08/10/13/deliveryStream-1-2022-08-10-13-55-33-04431990-8c8d-465f-b41b-fcda45713eac

オブジェクトの内容を見ると、Putしたレコードがすべて記述されていますね!

deliveryStream-1-2022-08-10-13-55-33-04431990-8c8d-465f-b41b-fcda45713eac

{"id":"u001","count":1,"area":"Akihabara"}
{"id":"u002","count":3,"area":"Ikebukuro"}
{"id":"u003","count":2,"area":"Shinjuku"}

参考

以上