AWS Step FunctionsでS3 BucketへのPut Object/Get Objectをしてみた

2022.06.23

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、CX事業本部 IoT事業部の若槻です。

分析やバッチ処理のためにデータをS3 Bucketに貯める実装をする機会がありました。

今回は、折角なので最近よく触っているAWS Step Functionsを使ってS3 BucketへのPut Object/Get Objectをする方法を確認してみました。

やってみた

実装

AWS CDK v2(TypeScript)で次のようなCDKスタックを作成します。

lib/process-stack.ts

import { Construct } from 'constructs';
import {
  aws_s3,
  aws_stepfunctions,
  aws_stepfunctions_tasks,
  RemovalPolicy,
  Stack,
  StackProps,
} from 'aws-cdk-lib';

export class ProcessStack extends Stack {
  constructor(scope: Construct, id: string, props: StackProps) {
    super(scope, id, props);

    // S3 Bucket
    const s3Bucket = new aws_s3.Bucket(this, 'dataBucket', {
      removalPolicy: RemovalPolicy.DESTROY,
    });

    // Put Object
    const putObjectTask = new aws_stepfunctions_tasks.CallAwsService(
      this,
      'putObjectTask',
      {
        service: 's3',
        action: 'putObject',
        parameters: {
          Body: aws_stepfunctions.JsonPath.stringAt('$.bodyObject'),
          Bucket: s3Bucket.bucketName,
          Key: 'data/objectHoge',
          ContentType: 'application/json',
        },
        iamResources: [`${s3Bucket.bucketArn}/*`],
        iamAction: 's3:PutObject',
        resultPath: aws_stepfunctions.DISCARD,
      },
    );

    // Get Object
    const getObjectTask = new aws_stepfunctions_tasks.CallAwsService(
      this,
      'getObjectTask',
      {
        service: 's3',
        action: 'getObject',
        parameters: {
          Bucket: s3Bucket.bucketName,
          Key: 'data/objectHoge',
        },
        iamResources: [`${s3Bucket.bucketArn}/*`],
        iamAction: 's3:GetObject',
        resultSelector: {
          body: aws_stepfunctions.JsonPath.stringToJson(
            aws_stepfunctions.JsonPath.stringAt('$.Body'),
          ),
        },
        resultPath: '$.getObjectTaskOutPut',
      },
    );

    // State Machine
    new aws_stepfunctions.StateMachine(this, 'stateMachine', {
      stateMachineName: 'stateMachine',
      definition: putObjectTask.next(getObjectTask),
    });
  }
}
  • S3:PutObjectおよびS3:GetObjectはStep Functionsの組み込みタスクがないため、CallAwsServiceを使用しています。
  • getObject時にstringToJsonを使用してJson文字列をオブジェクトに変換しています。

上記をCDK Deployしてスタックをデプロイします。これにより次のDefinitionのステートマシンが作成されます。

Definition

{
  "StartAt": "putObjectTask",
  "States": {
    "putObjectTask": {
      "Next": "getObjectTask",
      "Type": "Task",
      "ResultPath": null,
      "Resource": "arn:aws:states:::aws-sdk:s3:putObject",
      "Parameters": {
        "Body": {
          "awsRegion": "ap-northeast-1",
          "eventID": "5b4f50b1-fa9e-40fc-8f16-dc1bafb034ca",
          "eventName": "MODIFY",
          "recordFormat": "application/json",
          "eventSource": "aws:dynamodb"
        },
        "Bucket": "processstack-databucketd8691f4e-1m7fyxr0cebrl",
        "Key": "data/objectHoge",
        "ContentType": "application/json"
      }
    },
    "getObjectTask": {
      "End": true,
      "Type": "Task",
      "ResultPath": "$.getObjectTaskOutPut",
      "ResultSelector": {
        "body.$": "States.StringToJson($.Body)"
      },
      "Resource": "arn:aws:states:::aws-sdk:s3:getObject",
      "Parameters": {
        "Bucket": "processstack-databucketd8691f4e-1m7fyxr0cebrl",
        "Key": "data/objectHoge"
      }
    }
  }
}

動作確認

次の入力を指定してステートマシンを実行します。

Input

{
  "bodyObject": {
    "awsRegion": "ap-northeast-1",
    "eventID": "5b4f50b1-fa9e-40fc-8f16-dc1bafb034ca",
    "eventName": "MODIFY",
    "userIdentity": null,
    "recordFormat": "application/json",
    "eventSource": "aws:dynamodb"
  }
}

getObjectTaskの出力を見ると、Bucketに格納されたオブジェクトの内容がGetできています。

また、対象のBucketを見るとキーdata/objectHogeでオブジェクトが格納されています。

オブジェクトの内容を見るとちゃんとPutおよびGetしたオブジェクトとなっていますね。

おわりに

AWS Step FunctionsでS3 BucketへのPut Object/Get Objectをしてみました。

とりあえずJson Objectに対しては行えることが分かりました。他にはJSON Linesなどの非オブジェクトの場合はどうなのか気になるので引き続き触ってみたいと思います。結果は別途記事にします。

以上