こんにちは。サービス開発室の武田です。
何か処理をして、その結果をS3バケットにアップロードするという操作は「よくあるパターン」のひとつでしょう。AWS Step FunctionsにはAWS SDKサービス統合が用意されており、コードを書くことなくAWSサービスと連携できます。
今回はLambda呼び出しのタスクでJSONデータを出力し、S3へPUTするタスクでそのデータをアップロードする一連のステートマシンを試してみました。もちろんLambda関数の中でS3へアップロードもできますが、データ作成とアップロードを別のタスクに分割することが今回の目的です。
ただしStep Functionsの制限から、データサイズが 256KB を超えるケースではこの方法は使えません。その場合はLambda関数の中でS3へアップロードする処理も実装してください。
CDKの準備
動作確認するにあたって準備したCDKのソースは次のものです。
import * as cdk from "aws-cdk-lib";
import { Construct } from "constructs";
import {
aws_s3 as s3,
aws_stepfunctions as sfn,
aws_stepfunctions_tasks as sfn_tasks,
aws_lambda as lambda,
} from "aws-cdk-lib";
export class ExampleSfnS3UploadLambdaResultStack extends cdk.Stack {
constructor(scope: Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);
const s3bucket = new s3.Bucket(this, "S3UploadBucket", {
enforceSSL: true,
removalPolicy: cdk.RemovalPolicy.DESTROY,
autoDeleteObjects: true,
});
const hello_function = new lambda.Function(this, "HelloFunction", {
code: lambda.Code.fromInline(`def handler(event, context):
return {
"message": "Hello, World"
}`),
runtime: lambda.Runtime.PYTHON_3_12,
handler: "index.handler",
});
const s3_upload_state_machine = new sfn.StateMachine(
this,
"S3UploadStateMachine",
{
definitionBody: sfn.DefinitionBody.fromChainable(
new sfn_tasks.LambdaInvoke(this, "HelloFunction Task", {
lambdaFunction: hello_function,
outputPath: "$.Payload",
}).next(
new sfn_tasks.CallAwsService(this, "S3 PutObject Task", {
service: "s3",
action: "putObject",
parameters: {
"Body.$": "$",
Bucket: s3bucket.bucketName,
"Key.$": "States.Format('{}.json', $$.Execution.StartTime)",
},
iamResources: [`${s3bucket.bucketArn}/*`],
})
)
),
}
);
}
}
HelloFunction Task
がデータを生成するLambda関数を呼び出すタスクです。Lambda関数の出力を次のタスクに引き渡します。続いて呼び出されるS3 PutObject Task
が、そのデータをS3バケットへアップロードするタスクです。CDKでAWSサービス統合を利用する場合はCallAwsService
メソッドを使用し、各パラメーターを設定するだけです。
ポイントとしては、今回タスクを分割することで、HelloFunction Task
から呼び出されるLambda関数にS3バケットへアクセスする権限の追加が不要な点です。
デプロイすると、次のようなシンプルなステートマシンが作成されます。
動作確認
ステートマシンを実行しましょう。制限などされていなければ成功するはずです。
実行できたらS3バケットを確認します。想定どおりオブジェクトが追加されていればOKです。
中身はLambda関数が出力した内容そのままとなっています。
まとめ
タスクを分割することで、データ生成部分がLambda関数以外となっても少ない修正で済みます。またS3へアップロードする部分も自分でLambda関数を用意しなければいけないのでは手間ですが、サービス統合で済ませられるのは便利ですね。