こんにちは、CX事業本部 IoT事業部の若槻です。
分析やバッチ処理のためにデータをS3 Bucketに貯める実装をする機会がありました。
今回は、折角なので最近よく触っているAWS Step Functionsを使ってS3 BucketへのPut Object/Get Objectをする方法を確認してみました。
やってみた
実装
AWS CDK v2(TypeScript)で次のようなCDKスタックを作成します。
lib/process-stack.ts
import { Construct } from 'constructs';
import {
aws_s3,
aws_stepfunctions,
aws_stepfunctions_tasks,
RemovalPolicy,
Stack,
StackProps,
} from 'aws-cdk-lib';
export class ProcessStack extends Stack {
constructor(scope: Construct, id: string, props: StackProps) {
super(scope, id, props);
// S3 Bucket
const s3Bucket = new aws_s3.Bucket(this, 'dataBucket', {
removalPolicy: RemovalPolicy.DESTROY,
});
// Put Object
const putObjectTask = new aws_stepfunctions_tasks.CallAwsService(
this,
'putObjectTask',
{
service: 's3',
action: 'putObject',
parameters: {
Body: aws_stepfunctions.JsonPath.stringAt('$.bodyObject'),
Bucket: s3Bucket.bucketName,
Key: 'data/objectHoge',
ContentType: 'application/json',
},
iamResources: [`${s3Bucket.bucketArn}/*`],
iamAction: 's3:PutObject',
resultPath: aws_stepfunctions.DISCARD,
},
);
// Get Object
const getObjectTask = new aws_stepfunctions_tasks.CallAwsService(
this,
'getObjectTask',
{
service: 's3',
action: 'getObject',
parameters: {
Bucket: s3Bucket.bucketName,
Key: 'data/objectHoge',
},
iamResources: [`${s3Bucket.bucketArn}/*`],
iamAction: 's3:GetObject',
resultSelector: {
body: aws_stepfunctions.JsonPath.stringToJson(
aws_stepfunctions.JsonPath.stringAt('$.Body'),
),
},
resultPath: '$.getObjectTaskOutPut',
},
);
// State Machine
new aws_stepfunctions.StateMachine(this, 'stateMachine', {
stateMachineName: 'stateMachine',
definition: putObjectTask.next(getObjectTask),
});
}
}
- S3:PutObjectおよびS3:GetObjectはStep Functionsの組み込みタスクがないため、
CallAwsService
を使用しています。 - getObject時に
stringToJson
を使用してJson文字列をオブジェクトに変換しています。
上記をCDK Deployしてスタックをデプロイします。これにより次のDefinitionのステートマシンが作成されます。
Definition
{
"StartAt": "putObjectTask",
"States": {
"putObjectTask": {
"Next": "getObjectTask",
"Type": "Task",
"ResultPath": null,
"Resource": "arn:aws:states:::aws-sdk:s3:putObject",
"Parameters": {
"Body": {
"awsRegion": "ap-northeast-1",
"eventID": "5b4f50b1-fa9e-40fc-8f16-dc1bafb034ca",
"eventName": "MODIFY",
"recordFormat": "application/json",
"eventSource": "aws:dynamodb"
},
"Bucket": "processstack-databucketd8691f4e-1m7fyxr0cebrl",
"Key": "data/objectHoge",
"ContentType": "application/json"
}
},
"getObjectTask": {
"End": true,
"Type": "Task",
"ResultPath": "$.getObjectTaskOutPut",
"ResultSelector": {
"body.$": "States.StringToJson($.Body)"
},
"Resource": "arn:aws:states:::aws-sdk:s3:getObject",
"Parameters": {
"Bucket": "processstack-databucketd8691f4e-1m7fyxr0cebrl",
"Key": "data/objectHoge"
}
}
}
}
動作確認
次の入力を指定してステートマシンを実行します。
Input
{
"bodyObject": {
"awsRegion": "ap-northeast-1",
"eventID": "5b4f50b1-fa9e-40fc-8f16-dc1bafb034ca",
"eventName": "MODIFY",
"userIdentity": null,
"recordFormat": "application/json",
"eventSource": "aws:dynamodb"
}
}
getObjectTask
の出力を見ると、Bucketに格納されたオブジェクトの内容がGetできています。
また、対象のBucketを見るとキーdata/objectHoge
でオブジェクトが格納されています。
オブジェクトの内容を見るとちゃんとPutおよびGetしたオブジェクトとなっていますね。
おわりに
AWS Step FunctionsでS3 BucketへのPut Object/Get Objectをしてみました。
とりあえずJson Objectに対しては行えることが分かりました。他にはJSON Linesなどの非オブジェクトの場合はどうなのか気になるので引き続き触ってみたいと思います。結果は別途記事にします。
以上