こんにちは、CX事業本部 IoT事業部の若槻です。
最近Kinesis Data Firehoseをよく触っているのですが、そうなると「Delivery streamへのデータのPutもAWS Step Functionsからやりたい!」という意欲が必然的に湧いてきていました。
そこで今回は、AWS Step FunctionsからAmazon Kinesis Data FirehoseにJSONデータをPutする実装をAWS CDKで構築してみました。
やってみた
実装
AWS CDK v2(TypeScript)で次のようなCDKスタックを作成します。
lib/process-stack.ts
import { Construct } from 'constructs';
import {
aws_s3,
aws_kinesisfirehose,
aws_stepfunctions_tasks,
aws_stepfunctions,
Stack,
StackProps,
RemovalPolicy,
Duration,
Size,
} from 'aws-cdk-lib';
import * as firehose_alpha from '@aws-cdk/aws-kinesisfirehose-alpha';
import * as firehose_destinations_alpha from '@aws-cdk/aws-kinesisfirehose-destinations-alpha';
export class ProcessStack extends Stack {
constructor(scope: Construct, id: string, props: StackProps) {
super(scope, id, props);
// データ格納バケット
const dataBucket = new aws_s3.Bucket(this, 'dataBucket', {
bucketName: `data-${this.account}-${this.region}`,
removalPolicy: RemovalPolicy.DESTROY,
});
// Kinesis Firehose Delivery Stream
const deliveryStream = new firehose_alpha.DeliveryStream(
this,
'deliveryStream',
{
deliveryStreamName: 'deliveryStream',
destinations: [
new firehose_destinations_alpha.S3Bucket(dataBucket, {
dataOutputPrefix: 'data/!{partitionKeyFromQuery:key1}', //末尾"/"を省略するとオブジェクト名先頭にキー値が付加される
errorOutputPrefix: 'error/!{firehose:error-output-type}/',
bufferingInterval: Duration.seconds(60),
bufferingSize: Size.mebibytes(64),
}),
],
}
);
// Delivery StreamのDynamic Partitioning設定
const cfnDeliveryStream = deliveryStream.node
.defaultChild as aws_kinesisfirehose.CfnDeliveryStream;
cfnDeliveryStream.addPropertyOverride(
'ExtendedS3DestinationConfiguration.DynamicPartitioningConfiguration',
{
Enabled: true,
}
);
cfnDeliveryStream.addPropertyOverride(
'ExtendedS3DestinationConfiguration.ProcessingConfiguration',
{
Enabled: true,
Processors: [
{
Type: 'MetadataExtraction',
Parameters: [
{
ParameterName: 'MetadataExtractionQuery', //パーティションキーをレコードから抽出するクエリ
ParameterValue: '{key1: {dummy: ("-")} | .dummy}', //キー値として一律で"-"を返す
},
{
ParameterName: 'JsonParsingEngine',
ParameterValue: 'JQ-1.6', //クエリエンジンにjqパーサーを使用
},
],
},
{
Type: 'AppendDelimiterToRecord', //レコード間へ区切り文字を挿入
Parameters: [
{
ParameterName: 'Delimiter',
ParameterValue: '\\n', //改行コード
},
],
},
],
}
);
// Delivery streamへレコードをPutするタスク
const putDataToDeliveryStreamTask =
new aws_stepfunctions_tasks.CallAwsService(
this,
'putDataToDeliveryStreamTask',
{
service: 'firehose',
action: 'putRecord',
parameters: {
DeliveryStreamName: deliveryStream.deliveryStreamName,
Record: {
Data: aws_stepfunctions.JsonPath.jsonToString(
aws_stepfunctions.JsonPath.objectAt('$.input.record')
),
},
},
iamResources: [deliveryStream.deliveryStreamArn],
iamAction: 'firehose:PutRecord',
resultPath: aws_stepfunctions.JsonPath.DISCARD,
}
);
// State Machine
new aws_stepfunctions.StateMachine(this, 'stateMachine', {
stateMachineName: 'stateMachine',
definition: putDataToDeliveryStreamTask,
});
}
}
- AWS CDK v2ではKinesis Data FirehoseのModuleは現時点でAlpha Releaseなので、そちらを利用しています。
- Delivery streamへレコードをPutするTaskは、現時点では高レベルなConstruct classが未実装であるため、
aws_stepfunctions_tasks.CallAwsService
を使用して低レベルな実装を行っています。 - Putしたレコード間へはDynamic Partitioningによる改行コードの自動挿入処理を行っています。
上記をCDK Deployしてスタックをデプロイします。これにより次のASL DefinitionのState Machineが作成されました。
ASL
{
"StartAt": "putDataToDeliveryStreamTask",
"States": {
"putDataToDeliveryStreamTask": {
"End": true,
"Type": "Task",
"ResultPath": null,
"Resource": "arn:aws:states:::aws-sdk:firehose:putRecord",
"Parameters": {
"DeliveryStreamName": "deliveryStream",
"Record": {
"Data.$": "States.JsonToString($.input.record)"
}
}
}
}
}
動作確認
次のInputを指定してState Machineを実行します。
Input
{
"input": {
"record": {
"id": "u001",
"count": 1,
"area": "Akihabara"
}
}
}
State Machineの実行が成功しました。
Buffer期間が経過後に出力先Bucketの内容を確認すると、オブジェクトが出力されています。
aws s3 ls s3://${BUCKET_NAME} --recursive
2022-08-07 22:55:15 42 data/-deliveryStream-14-2022-08-07-13-52-50-37a9eb3b-9570-3175-a995-61fc9df957bc
オブジェクトの内容を確認すると、PutしたJSONデータが記録されています!
-deliveryStream-14-2022-08-07-13-52-50-37a9eb3b-9570-3175-a995-61fc9df957bc
{"id":"u001","count":1,"area":"Akihabara"}
おわりに
AWS Step FunctionsからAmazon Kinesis Data FirehoseにJSONデータをPutする構成をAWS CDKで構築してみました。
これもまた今後よくあるパターンの実装になりそうなので、テンプレートとして活用していきたいです。
参考
以上