こんにちは、CX事業本部 IoT事業部の若槻です。
以前のエントリでStep FunctionsからのPut Object/Get Objectを行いましたが、その際にPutおよびGetを試したデータはJson Objectでした。
Json Object
//Json Object
[
{
"key1": "val1",
"key2": "val2",
"key3": "val3"
},
{
"key1": "val10",
"key2": "val20",
"key3": "val30"
},
{
"key1": "val100",
"key2": "val200",
"key3": "val300"
}
]
しかしBucketに格納されるJsonデータは、各レコードが改行記号(\n
)で区切られたJSON Linesの方が
Amazon Athenaなどによる分析用途には適しています。
Json Object
//JSON Lines
{"key1":"val1","key2":"val2","key3":"val3"}
{"key1":"val10","key2":"val20","key2":"val30"}
{"key1":"val100","key2":"val200","key2":"val300"}
そこで今回は、AWS Step FunctionsでJSON LinesデータをS3 BucketにPut Object/Get Objectできるのか確認してみました。
確認してみた
Put Object(できなかった)
AWS CDK v2(TypeScript)で次のようなCDKスタックを作成します。
lib/process-stack.ts
import { Construct } from 'constructs';
import {
aws_s3,
aws_stepfunctions,
aws_stepfunctions_tasks,
RemovalPolicy,
Stack,
StackProps,
} from 'aws-cdk-lib';
export class ProcessStack extends Stack {
constructor(scope: Construct, id: string, props: StackProps) {
super(scope, id, props);
// S3 Bucket
const s3Bucket = new aws_s3.Bucket(this, 'dataBucket', {
removalPolicy: RemovalPolicy.DESTROY,
});
// Json ObjectをJSON Linesに変換
const convertJsonObjectToJsonLinesTask =
new aws_stepfunctions_tasks.EvaluateExpression(
this,
'convertJsonObjectToJsonLinesTask',
{
expression: aws_stepfunctions.JsonPath.format(
'{}.map( x => JSON.stringify(x) ).join("\\n")',
aws_stepfunctions.JsonPath.stringAt('$.object'),
),
resultPath: '$.convertJsonObjectToJsonLinesTask',
},
);
// Put Object
const putObjectTask = new aws_stepfunctions_tasks.CallAwsService(
this,
'putObjectTask',
{
service: 's3',
action: 'putObject',
parameters: {
Body: aws_stepfunctions.JsonPath.stringAt(
'$.convertJsonObjectToJsonLinesTask',
),
Bucket: s3Bucket.bucketName,
Key: 'data/putObject',
},
iamResources: [`${s3Bucket.bucketArn}/*`],
iamAction: 's3:PutObject',
resultPath: aws_stepfunctions.DISCARD,
},
);
// State Machine
new aws_stepfunctions.StateMachine(this, 'stateMachine', {
stateMachineName: 'stateMachine',
definition: convertJsonObjectToJsonLinesTask.next(putObjectTask),
});
}
}
上記をCDK Deployしてスタックをデプロイします。
次の入力を指定してステートマシンを実行します。
Input
{
"object": [
{
"key1": "val1",
"key2": "val2",
"key3": "val3"
},
{
"key1": "val10",
"key2": "val20",
"key3": "val30"
},
{
"key1": "val100",
"key2": "val200",
"key3": "val300"
}
]
}
実行が成功しました。putObjectTask
のParameterを見るとエスケープされたJSON Linesデータが文字列としてBody
に指定されています。
そしてBucket内に作成されたObjectを開いてみると、JSON Linesではなくエスケープされた文字列のまま書き込まれてしまっています。
JSON LinesではないのでS3 Selectでクエリを掛けても上手くパースできません。
またPut Object時のContentTypeをapplication/json
に指定するなど試してみましたが、だめでした。
Get Object(できた)
AWS CDK v2(TypeScript)で次のようなCDKスタックを作成します。
lib/process-stack.ts
import { Construct } from 'constructs';
import {
aws_s3,
aws_stepfunctions,
aws_stepfunctions_tasks,
RemovalPolicy,
Stack,
StackProps,
} from 'aws-cdk-lib';
export class ProcessStack extends Stack {
constructor(scope: Construct, id: string, props: StackProps) {
super(scope, id, props);
// S3 Bucket
const s3Bucket = new aws_s3.Bucket(this, 'dataBucket', {
removalPolicy: RemovalPolicy.DESTROY,
});
// Get Object
const getObjectTask = new aws_stepfunctions_tasks.CallAwsService(
this,
'getObjectTask',
{
service: 's3',
action: 'getObject',
parameters: {
Bucket: s3Bucket.bucketName,
Key: 'data/jsonlObject',
},
iamResources: [`${s3Bucket.bucketArn}/*`],
iamAction: 's3:GetObject',
resultPath: '$.getObjectTask',
},
);
// JSON LinesをJson Objectに変換
const convertJsonLinesToJsonObjectTask =
new aws_stepfunctions_tasks.EvaluateExpression(
this,
'convertJsonLinesToJsonObjectTask',
{
expression: aws_stepfunctions.JsonPath.format(
'`{}`.split("\\n").map( d => JSON.parse(d) )',
aws_stepfunctions.JsonPath.stringAt('$.getObjectTask.Body'),
),
resultPath: '$.convertJsonLinesToJsonObjectTask',
},
);
// State Machine
new aws_stepfunctions.StateMachine(this, 'stateMachine', {
stateMachineName: 'stateMachine',
definition: getObjectTask.next(convertJsonLinesToJsonObjectTask),
});
}
}
次のJSON Linesのデータを書き込んだオブジェクトをS3 Bucketに格納します。
ステートマシンを実行します。
実行が成功しました。getObject
の出力を見るとエスケープされていないJSON Linesデータが文字列として取得できています。
convertJsonLinesToJsonObjectTask
の出力を見るとJSON LinesからJson Objectに形式を変換されたデータが取得できています!
ハマった箇所
convertJsonLinesToJsonObjectTask
でのexpression
パラメータの指定を次のようにするとLambda側で上手くパースできずにエラーとなります。テンプレート文字列の指定を"{}"
や\'{}\'とすると
だめでした。
'"{}".split("\\n").map( d => JSON.parse(d) )'
'\'{}\'.split("\\n").map( d => JSON.parse(d) )'
色々試した結果次のようにバッククォートを使用すると上手く行きました。
'`{}`.split("\\n").map( d => JSON.parse(d) )'
おわりに
AWS Step FunctionsでJSON LinesデータをS3 BucketにPut Object/Get Objectできるのか確認してみました。
結論としては次のようになりました。
- Put Object:できなかった
- Get Object:できた
JSON LinesデータをPutする良い方法を知っている方がいれば教えて欲しいです。
以上