この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
こんにちは、CX事業本部 IoT事業部の若槻です。
前回および前々回のエントリで、Step FunctionsからKinesis Data FirehoseへのJSONデータのPutをしてみました。
- AWS Step FunctionsからAmazon Kinesis Data FirehoseにJSONデータをPutする(AWS CDK) | DevelopersIO
- AWS Step FunctionsからAmazon Kinesis Data FirehoseにJSONデータを”改行コード付きで”Putする(AWS CDK) | DevelopersIO
その際に使用したAPIはPutRecordでした。PutRecordはリクエスト毎に1レコードしかPutできないため、複数レコードをPutしたい場合はPut毎にMap Stateを回すことになりますが、レコード数が多いと非効率です。
そこで今回は、AWS Step FunctionsからAmazon Kinesis Data Firehoseに、複数のJSONデータをPutRecordBatch APIで一括でPutする構成をAWS CDKで作ってみました。
やってみた
実装
AWS CDK v2(TypeScript)で次のようなCDKスタックを作成します。
lib/process-stack.ts
import { Construct } from 'constructs';
import {
aws_s3,
aws_stepfunctions_tasks,
aws_stepfunctions,
Stack,
StackProps,
RemovalPolicy,
Duration,
Size,
} from 'aws-cdk-lib';
import * as firehose_alpha from '@aws-cdk/aws-kinesisfirehose-alpha';
import * as firehose_destinations_alpha from '@aws-cdk/aws-kinesisfirehose-destinations-alpha';
export class ProcessStack extends Stack {
constructor(scope: Construct, id: string, props: StackProps) {
super(scope, id, props);
// データ格納バケット
const dataBucket = new aws_s3.Bucket(this, 'dataBucket', {
bucketName: `data-${this.account}-${this.region}`,
removalPolicy: RemovalPolicy.DESTROY,
});
// Kinesis Firehose Delivery Stream
const deliveryStream = new firehose_alpha.DeliveryStream(
this,
'deliveryStream',
{
deliveryStreamName: 'deliveryStream',
destinations: [
new firehose_destinations_alpha.S3Bucket(dataBucket, {
dataOutputPrefix: 'data/',
errorOutputPrefix: 'error/!{firehose:error-output-type}/',
bufferingInterval: Duration.seconds(60),
bufferingSize: Size.mebibytes(64),
}),
],
}
);
// 改行コード文字列の取得
const getNewLineCodeTask = new aws_stepfunctions.Pass(
this,
'getNewLineCodeTask',
{
parameters: {
value: '\n',
},
resultPath: '$.getNewLineCodeTaskOutPut',
}
);
// レコードデータの配列を取得
const getRecordDataMap = new aws_stepfunctions.Map(
this,
'getRecordDataMapTask',
{
itemsPath: aws_stepfunctions.JsonPath.stringAt('$.input.items'),
parameters: {
item: aws_stepfunctions.JsonPath.stringAt('$$.Map.Item.Value'),
newLineCode: aws_stepfunctions.JsonPath.stringAt(
'$.getNewLineCodeTaskOutPut.value'
),
},
resultPath: '$.getRecordDataMapOutPut',
}
).iterator(
new aws_stepfunctions.Pass(this, 'getRecordDataPass', {
parameters: {
Data: aws_stepfunctions.JsonPath.format(
'{}{}',
aws_stepfunctions.JsonPath.jsonToString(
aws_stepfunctions.JsonPath.stringAt('$.item')
),
aws_stepfunctions.JsonPath.stringAt('$.newLineCode')
),
},
resultPath: '$.getRecordDataPassOutPut',
outputPath: aws_stepfunctions.JsonPath.stringAt(
'$.getRecordDataPassOutPut'
),
})
);
// Delivery streamへレコードをPutするタスク
const putRecordBatchTask = new aws_stepfunctions_tasks.CallAwsService(
this,
'putRecordBatchTask',
{
service: 'firehose',
action: 'putRecordBatch',
parameters: {
DeliveryStreamName: deliveryStream.deliveryStreamName,
Records: aws_stepfunctions.JsonPath.stringAt(
'$.getRecordDataMapOutPut[*]'
),
},
iamResources: [deliveryStream.deliveryStreamArn],
iamAction: 'firehose:putRecordBatch',
resultPath: aws_stepfunctions.JsonPath.DISCARD,
}
);
// State Machine
new aws_stepfunctions.StateMachine(this, 'stateMachine', {
stateMachineName: 'stateMachine',
definition: getNewLineCodeTask
.next(getRecordDataMap)
.next(putRecordBatchTask),
});
}
}
- Map State(
getRecordDataMap
)を使用してJSONデータの配列を、JSON文字列 + 改行コード
という配列の形式に変換しているところが肝となります。
上記をCDK Deployしてスタックをデプロイすると、次の定義のState Machineが作成されます。
Definition
{
"StartAt": "getNewLineCodeTask",
"States": {
"getNewLineCodeTask": {
"Type": "Pass",
"ResultPath": "$.getNewLineCodeTaskOutPut",
"Parameters": {
"value": "\n"
},
"Next": "getRecordDataMapTask"
},
"getRecordDataMapTask": {
"Type": "Map",
"ResultPath": "$.getRecordDataMapOutPut",
"Next": "putRecordBatchTask",
"Parameters": {
"item.$": "$$.Map.Item.Value",
"newLineCode.$": "$.getNewLineCodeTaskOutPut.value"
},
"Iterator": {
"StartAt": "getRecordDataPass",
"States": {
"getRecordDataPass": {
"Type": "Pass",
"ResultPath": "$.getRecordDataPassOutPut",
"Parameters": {
"Data.$": "States.Format('{}{}', States.JsonToString($.item), $.newLineCode)"
},
"OutputPath": "$.getRecordDataPassOutPut",
"End": true
}
}
},
"ItemsPath": "$.input.items"
},
"putRecordBatchTask": {
"End": true,
"Type": "Task",
"ResultPath": null,
"Resource": "arn:aws:states:::aws-sdk:firehose:putRecordBatch",
"Parameters": {
"DeliveryStreamName": "deliveryStream",
"Records.$": "$.getRecordDataMapOutPut[*]"
}
}
}
}
動作確認
次のInputを指定してState Machineを実行します。
Input
{
"input": {
"items": [
{
"id": "u001",
"count": 1,
"area": "Akihabara"
},
{
"id": "u002",
"count": 3,
"area": "Ikebukuro"
},
{
"id": "u003",
"count": 2,
"area": "Shinjuku"
}
]
}
}
State Machine実行が成功しました。
次のParameterを使用してPutRecordBatch APIが叩かれました。
Parameter
{
"DeliveryStreamName": "deliveryStream",
"Records": [
{
"Data": "{\"id\":\"u001\",\"count\":1,\"area\":\"Akihabara\"}\n"
},
{
"Data": "{\"id\":\"u002\",\"count\":3,\"area\":\"Ikebukuro\"}\n"
},
{
"Data": "{\"id\":\"u003\",\"count\":2,\"area\":\"Shinjuku\"}\n"
}
]
}
Buffer期間が経過後に出力先Bucketの内容を確認すると、オブジェクトが1つ出力されています。
aws s3 ls s3://${BUCKET_NAME} --recursive
2022-08-10 22:56:34 128 data/2022/08/10/13/deliveryStream-1-2022-08-10-13-55-33-04431990-8c8d-465f-b41b-fcda45713eac
オブジェクトの内容を見ると、Putしたレコードがすべて記述されていますね!
deliveryStream-1-2022-08-10-13-55-33-04431990-8c8d-465f-b41b-fcda45713eac
{"id":"u001","count":1,"area":"Akihabara"}
{"id":"u002","count":3,"area":"Ikebukuro"}
{"id":"u003","count":2,"area":"Shinjuku"}
参考
- AWS Step Functionsでは組み込み関数だけで配列のフィルターやスライスができる(AWS CDK v2) | DevelopersIO
- Step Functionsの入出力処理の制御パラメータ(InputPath、 Parameters、ResultPathおよびOutputPath)を理解するために参照したドキュメント | DevelopersIO
以上