AWS Step FunctionsからAmazon Kinesis Data FirehoseにJSONデータをPutする(AWS CDK)
こんにちは、CX事業本部 IoT事業部の若槻です。
最近Kinesis Data Firehoseをよく触っているのですが、そうなると「Delivery streamへのデータのPutもAWS Step Functionsからやりたい!」という意欲が必然的に湧いてきていました。
そこで今回は、AWS Step FunctionsからAmazon Kinesis Data FirehoseにJSONデータをPutする実装をAWS CDKで構築してみました。
やってみた
実装
AWS CDK v2(TypeScript)で次のようなCDKスタックを作成します。
import { Construct } from 'constructs'; import { aws_s3, aws_kinesisfirehose, aws_stepfunctions_tasks, aws_stepfunctions, Stack, StackProps, RemovalPolicy, Duration, Size, } from 'aws-cdk-lib'; import * as firehose_alpha from '@aws-cdk/aws-kinesisfirehose-alpha'; import * as firehose_destinations_alpha from '@aws-cdk/aws-kinesisfirehose-destinations-alpha'; export class ProcessStack extends Stack { constructor(scope: Construct, id: string, props: StackProps) { super(scope, id, props); // データ格納バケット const dataBucket = new aws_s3.Bucket(this, 'dataBucket', { bucketName: `data-${this.account}-${this.region}`, removalPolicy: RemovalPolicy.DESTROY, }); // Kinesis Firehose Delivery Stream const deliveryStream = new firehose_alpha.DeliveryStream( this, 'deliveryStream', { deliveryStreamName: 'deliveryStream', destinations: [ new firehose_destinations_alpha.S3Bucket(dataBucket, { dataOutputPrefix: 'data/!{partitionKeyFromQuery:key1}', //末尾"/"を省略するとオブジェクト名先頭にキー値が付加される errorOutputPrefix: 'error/!{firehose:error-output-type}/', bufferingInterval: Duration.seconds(60), bufferingSize: Size.mebibytes(64), }), ], } ); // Delivery StreamのDynamic Partitioning設定 const cfnDeliveryStream = deliveryStream.node .defaultChild as aws_kinesisfirehose.CfnDeliveryStream; cfnDeliveryStream.addPropertyOverride( 'ExtendedS3DestinationConfiguration.DynamicPartitioningConfiguration', { Enabled: true, } ); cfnDeliveryStream.addPropertyOverride( 'ExtendedS3DestinationConfiguration.ProcessingConfiguration', { Enabled: true, Processors: [ { Type: 'MetadataExtraction', Parameters: [ { ParameterName: 'MetadataExtractionQuery', //パーティションキーをレコードから抽出するクエリ ParameterValue: '{key1: {dummy: ("-")} | .dummy}', //キー値として一律で"-"を返す }, { ParameterName: 'JsonParsingEngine', ParameterValue: 'JQ-1.6', //クエリエンジンにjqパーサーを使用 }, ], }, { Type: 'AppendDelimiterToRecord', //レコード間へ区切り文字を挿入 Parameters: [ { ParameterName: 'Delimiter', ParameterValue: '\\n', //改行コード }, ], }, ], } ); // Delivery streamへレコードをPutするタスク const putDataToDeliveryStreamTask = new aws_stepfunctions_tasks.CallAwsService( this, 'putDataToDeliveryStreamTask', { service: 'firehose', action: 'putRecord', parameters: { DeliveryStreamName: deliveryStream.deliveryStreamName, Record: { Data: aws_stepfunctions.JsonPath.jsonToString( aws_stepfunctions.JsonPath.objectAt('$.input.record') ), }, }, iamResources: [deliveryStream.deliveryStreamArn], iamAction: 'firehose:PutRecord', resultPath: aws_stepfunctions.JsonPath.DISCARD, } ); // State Machine new aws_stepfunctions.StateMachine(this, 'stateMachine', { stateMachineName: 'stateMachine', definition: putDataToDeliveryStreamTask, }); } }
- AWS CDK v2ではKinesis Data FirehoseのModuleは現時点でAlpha Releaseなので、そちらを利用しています。
- Delivery streamへレコードをPutするTaskは、現時点では高レベルなConstruct classが未実装であるため、
aws_stepfunctions_tasks.CallAwsService
を使用して低レベルな実装を行っています。 - Putしたレコード間へはDynamic Partitioningによる改行コードの自動挿入処理を行っています。
上記をCDK Deployしてスタックをデプロイします。これにより次のASL DefinitionのState Machineが作成されました。
{ "StartAt": "putDataToDeliveryStreamTask", "States": { "putDataToDeliveryStreamTask": { "End": true, "Type": "Task", "ResultPath": null, "Resource": "arn:aws:states:::aws-sdk:firehose:putRecord", "Parameters": { "DeliveryStreamName": "deliveryStream", "Record": { "Data.$": "States.JsonToString($.input.record)" } } } } }
動作確認
次のInputを指定してState Machineを実行します。
{ "input": { "record": { "id": "u001", "count": 1, "area": "Akihabara" } } }
State Machineの実行が成功しました。
Buffer期間が経過後に出力先Bucketの内容を確認すると、オブジェクトが出力されています。
aws s3 ls s3://${BUCKET_NAME} --recursive 2022-08-07 22:55:15 42 data/-deliveryStream-14-2022-08-07-13-52-50-37a9eb3b-9570-3175-a995-61fc9df957bc
オブジェクトの内容を確認すると、PutしたJSONデータが記録されています!
{"id":"u001","count":1,"area":"Akihabara"}
おわりに
AWS Step FunctionsからAmazon Kinesis Data FirehoseにJSONデータをPutする構成をAWS CDKで構築してみました。
これもまた今後よくあるパターンの実装になりそうなので、テンプレートとして活用していきたいです。
参考
以上