AWS Step FunctionsでLambda関数の結果をS3にPUTするステートマシンを作ってみた

こんにちは。サービス開発室の武田です。Lambda関数で生成したデータをS3にPUTするAWS Step Functionsステートマシンを作って試してみました。
2024.03.30

こんにちは。サービス開発室の武田です。

何か処理をして、その結果をS3バケットにアップロードするという操作は「よくあるパターン」のひとつでしょう。AWS Step FunctionsにはAWS SDKサービス統合が用意されており、コードを書くことなくAWSサービスと連携できます。

今回はLambda呼び出しのタスクでJSONデータを出力し、S3へPUTするタスクでそのデータをアップロードする一連のステートマシンを試してみました。もちろんLambda関数の中でS3へアップロードもできますが、データ作成とアップロードを別のタスクに分割することが今回の目的です。

ただしStep Functionsの制限から、データサイズが 256KB を超えるケースではこの方法は使えません。その場合はLambda関数の中でS3へアップロードする処理も実装してください。

CDKの準備

動作確認するにあたって準備したCDKのソースは次のものです。

import * as cdk from "aws-cdk-lib";
import { Construct } from "constructs";
import {
  aws_s3 as s3,
  aws_stepfunctions as sfn,
  aws_stepfunctions_tasks as sfn_tasks,
  aws_lambda as lambda,
} from "aws-cdk-lib";

export class ExampleSfnS3UploadLambdaResultStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    const s3bucket = new s3.Bucket(this, "S3UploadBucket", {
      enforceSSL: true,
      removalPolicy: cdk.RemovalPolicy.DESTROY,
      autoDeleteObjects: true,
    });

    const hello_function = new lambda.Function(this, "HelloFunction", {
      code: lambda.Code.fromInline(`def handler(event, context):
  return {
    "message": "Hello, World"
  }`),
      runtime: lambda.Runtime.PYTHON_3_12,
      handler: "index.handler",
    });

    const s3_upload_state_machine = new sfn.StateMachine(
      this,
      "S3UploadStateMachine",
      {
        definitionBody: sfn.DefinitionBody.fromChainable(
          new sfn_tasks.LambdaInvoke(this, "HelloFunction Task", {
            lambdaFunction: hello_function,
            outputPath: "$.Payload",
          }).next(
            new sfn_tasks.CallAwsService(this, "S3 PutObject Task", {
              service: "s3",
              action: "putObject",
              parameters: {
                "Body.$": "$",
                Bucket: s3bucket.bucketName,
                "Key.$": "States.Format('{}.json', $$.Execution.StartTime)",
              },
              iamResources: [`${s3bucket.bucketArn}/*`],
            })
          )
        ),
      }
    );
  }
}

HelloFunction Taskがデータを生成するLambda関数を呼び出すタスクです。Lambda関数の出力を次のタスクに引き渡します。続いて呼び出されるS3 PutObject Taskが、そのデータをS3バケットへアップロードするタスクです。CDKでAWSサービス統合を利用する場合はCallAwsServiceメソッドを使用し、各パラメーターを設定するだけです。

ポイントとしては、今回タスクを分割することで、HelloFunction Taskから呼び出されるLambda関数にS3バケットへアクセスする権限の追加が不要な点です。

デプロイすると、次のようなシンプルなステートマシンが作成されます。

動作確認

ステートマシンを実行しましょう。制限などされていなければ成功するはずです。

実行できたらS3バケットを確認します。想定どおりオブジェクトが追加されていればOKです。

中身はLambda関数が出力した内容そのままとなっています。

まとめ

タスクを分割することで、データ生成部分がLambda関数以外となっても少ない修正で済みます。またS3へアップロードする部分も自分でLambda関数を用意しなければいけないのでは手間ですが、サービス統合で済ませられるのは便利ですね。

参考URL