【CDK】Step FunctionsでS3のCOPY Objectを実行

2024.04.22

はじめに

データアナリティクス事業本部ビッグデータチームのyosh-kです。
今回はStep Functions内でS3 ObjectのCOPY処理を実行する実装をCDKで行いたいと思います。

前提

今回実現したい構成は以下になります。

  • EventBridgeの日次スケジュールでStep Functionsを起動します。
  • Step Functions内でS3の特定のprefix配下に対してListObjectsを実行します。
  • ListObjecrsで取得された値からフォルダを取り除くための処理をFilterObjectsで実行します。
  • Map Stateで配列の要素であるs3 pathに対して順にCopyObjectを実行します。

実装

それでは実装になります。実装コードはリンクに格納しています。

@ 34_step_functions_s3_copy_object_with_cdk % tree
.
└── cdk
    ├── bin
    │   └── app.ts
    ├── cdk.json
    ├── lib
    │   ├── constructs
    │   │   ├── eventbridge.ts
    │   │   ├── s3.ts
    │   │   └── step-functions.ts
    │   └── stack
    │       └── s3-copy-flow-stack.ts
    ├── parameter.ts
    ├── test
    │   └── app.test.ts
    └── tsconfig.json
    └── jest.config.js
    └── package.json
└── README.md

7 directories, 10 files
@34_step_functions_s3_copy_object_with_cdk %

bin/app.ts

#!/usr/bin/env node
import * as cdk from "aws-cdk-lib";
import { S3CopyFlowStack } from "../lib/stack/s3-copy-flow-stack";
import { devParameter } from "../parameter";

const app = new cdk.App();
new S3CopyFlowStack(app, "S3CopyFlow", {
  description: "S3CopyFlow (tag:kasama-test-tag)",
  env: {
    account: devParameter.env?.account || process.env.CDK_DEFAULT_ACCOUNT,
    region: devParameter.env?.region || process.env.CDK_DEFAULT_REGION,
  },
  tags: {
    Repository: "kasama-test-tag",
    Environment: devParameter.envName,
  },

  projectName: devParameter.projectName,
  envName: devParameter.envName,
});
  • description: CloudFormation StackのDescriptionとなります。
  • env: deploy先のaccoun, regionを設定します。devParameterで定義していなければdeployコマンド実行環境のデフォルト値を設定します。
  • tag: 作成されるリソースに対してのタグを設定します。
  • projectName, envName: 処理の中でリソース名の一部として使用します。

lib/constructs/eventbridge.ts

// import * as cdk from "aws-cdk-lib";
import { Construct } from "constructs";
import * as events from "aws-cdk-lib/aws-events";
import * as eventsTargets from "aws-cdk-lib/aws-events-targets";
import * as sfn from "aws-cdk-lib/aws-stepfunctions";
import * as iam from "aws-cdk-lib/aws-iam";

export interface EventBridgeConstructProps {
  envName: string;
  projectName: string;
  stateMachineArn: string;
}

export class EventBridgeConstruct extends Construct {
  constructor(scope: Construct, id: string, props: EventBridgeConstructProps) {
    super(scope, id);

    // 日次で20:00 (JST) に発火するEventBridgeのルールを追加
    const dailyRule = new events.Rule(this, `DailyRule`, {
      schedule: events.Schedule.cron({
        minute: "0",
        hour: "11",
        day: "*",
        month: "*",
        year: "*",
      }),
      ruleName: `${props.projectName}-${props.envName}-daily-rule`,
    });
    // EventBridge が Step Functions を起動するための IAM ロールを作成
    const eventBridgeExecutionRole = new iam.Role(
      this,
      `EventBridgeExecutionRole`,
      {
        assumedBy: new iam.ServicePrincipal("events.amazonaws.com"), // 信頼ポリシー設定
        description:
          "An IAM role for EventBridge to Start Step Functions Execution",
        roleName: `EventBridgeExecutionRoleForStepFunctions-${props.envName}`,
      }
    );

    eventBridgeExecutionRole.addToPolicy(
      new iam.PolicyStatement({
        actions: ["states:StartExecution"], // 許可するアクション
        resources: [props.stateMachineArn], // ステートマシンのARN
      })
    );
    // ステップ関数をS3 Put Eventのターゲットとして設定
    const stateMachine = sfn.StateMachine.fromStateMachineArn(
      this,
      "ImportedStateMachine",
      props.stateMachineArn
    );
    dailyRule.addTarget(
      new eventsTargets.SfnStateMachine(stateMachine, {
        role: eventBridgeExecutionRole,
      })
    );
  }
}

EventBridge, EventBridge用のIAM Role, IAM RoleのPolicyの定義をしています。

lib/constructs/s3.ts

import * as cdk from "aws-cdk-lib";
import {
  Bucket,
  BlockPublicAccess,
  BucketEncryption,
} from "aws-cdk-lib/aws-s3";
import { Construct } from "constructs";

export interface S3ConstructProps {
  envName: string;
  projectName: string;
}

export class S3Construct extends Construct {
  public readonly dataSourceBucket: Bucket;
  public readonly dataStoreBucket: Bucket;
  constructor(scope: Construct, id: string, props: S3ConstructProps) {
    super(scope, id);

    this.dataSourceBucket = new Bucket(this, "DataSourceBucket", {
      bucketName: `${props.projectName}-${props.envName}-data-source`,
      removalPolicy: cdk.RemovalPolicy.DESTROY,
      blockPublicAccess: BlockPublicAccess.BLOCK_ALL,
      encryption: BucketEncryption.KMS_MANAGED,
      versioned: true,
      eventBridgeEnabled: true,
    });
    this.dataStoreBucket = new Bucket(this, "DataStoreBucket", {
      bucketName: `${props.projectName}-${props.envName}-data-store`,
      removalPolicy: cdk.RemovalPolicy.DESTROY,
      blockPublicAccess: BlockPublicAccess.BLOCK_ALL,
      encryption: BucketEncryption.KMS_MANAGED,
      versioned: true,
    });
  }
}

ソースデータを格納するBucketとアウトプットファイルを格納するBucketを定義しています。Bucketを他のConstructで参照するためpublic readonlyで参照できる定義としています。

lib/constructs/step-functions.ts

// import * as cdk from "aws-cdk-lib";
import { Construct } from "constructs";
import * as sfn from "aws-cdk-lib/aws-stepfunctions";
import * as sfn_tasks from "aws-cdk-lib/aws-stepfunctions-tasks";
import * as iam from "aws-cdk-lib/aws-iam";

export interface StepFunctionsConstructProps {
  envName: string;
  projectName: string;
  dataSourceBucketName: string;
  dataStoreBucketName: string;
}

export class StepFunctionsConstruct extends Construct {
  public readonly stateMachine: sfn.StateMachine;
  constructor(
    scope: Construct,
    id: string,
    props: StepFunctionsConstructProps
  ) {
    super(scope, id);

    const stepFunctionsRole = new iam.Role(this, `StepFunctionsRole`, {
      assumedBy: new iam.ServicePrincipal("states.amazonaws.com"),
      description: "An IAM role for Step Functions to access AWS services",
      roleName: `StepFunctionsExecutionRoleForS3Copy-${props.envName}`,
    });

    stepFunctionsRole.addToPolicy(
      new iam.PolicyStatement({
        actions: [
          "s3:ListBucket",
          "s3:GetObject",
          "s3:PutObject",
          "s3:CopyObject",
        ],
        resources: [
          `arn:aws:s3:::${props.dataSourceBucketName}`,
          `arn:aws:s3:::${props.dataSourceBucketName}/*`,
          `arn:aws:s3:::${props.dataStoreBucketName}`,
          `arn:aws:s3:::${props.dataStoreBucketName}/*`,
        ],
      })
    );

    const listObjectsV2 = new sfn_tasks.CallAwsService(this, "ListObjectsV2", {
      service: "s3",
      action: "listObjectsV2",
      parameters: { Bucket: props.dataSourceBucketName, Prefix: "src/" },
      iamResources: [`arn:aws:s3:::${props.dataSourceBucketName}/src/*`],
      resultPath: "$.listResult",
    });

    const filterObjects = new sfn.Pass(this, "FilterObjects", {
      parameters: {
        "FilteredContents.$": "$.listResult.Contents[?(@.Size > 0)]",
      },
      resultPath: "$.listFilteredResult",
    });

    const copyObjectMap = new sfn.Map(this, "CopyObjectsMap", {
      maxConcurrency: 1000,
      itemsPath: "$.listFilteredResult.FilteredContents",
      itemSelector: {
        "FileName.$":
          "States.ArrayGetItem(States.StringSplit($$.Map.Item.Value.Key, '/'), States.MathAdd(States.ArrayLength(States.StringSplit($$.Map.Item.Value.Key, '/')), -1))",
        "Key.$": "$$.Map.Item.Value.Key",
      },
    });
    const copyObject = new sfn_tasks.CallAwsService(this, "CopyObject", {
      service: "s3",
      action: "copyObject",
      parameters: {
        Bucket: props.dataStoreBucketName,
        CopySource: sfn.JsonPath.stringAt(
          `States.Format('${props.dataSourceBucketName}/{}', $.Key)`
        ),
        Key: sfn.JsonPath.stringAt("States.Format('target/{}', $.FileName)"),
      },
      iamResources: [`arn:aws:s3:::${props.dataStoreBucketName}/target/*`],
      resultPath: sfn.JsonPath.DISCARD,
    });
    copyObjectMap.itemProcessor(copyObject);

    const definitionBody = sfn.DefinitionBody.fromChainable(
      listObjectsV2.next(filterObjects).next(copyObjectMap)
    );
    this.stateMachine = new sfn.StateMachine(this, "StateMachine", {
      stateMachineName: `${props.projectName}-${props.envName}-CSVProcessorStateMachine`,
      definitionBody: definitionBody,
      role: stepFunctionsRole,
    });
  }
}
  • stepFunctionsRole: Step Functionsから2種類のBucketを操作するためのポリシー付きで作成しています。
  • sfn.DefinitionBody.fromChainable(): Step Functionsでの実行順を定義しています。
  • copyObjectMap.itemProcessor(copyObject): マップ構造で、各アイテムに対してcopyObjectを適用する定義になります。
  • listObjectsV2: S3バケットからオブジェクトのリストを取得します。指定されたバケット内のsrc/プレフィックス以下のオブジェクトを取得し、結果を$.listResultに保存します。
  • filterObjects: Step FunctionsのPassステートを使用して、サイズが0より大きいオブジェクトだけをフィルタリングし、その結果を$.listFilteredResultに保存します。
  • copyObjectMap: フィルタリングされたオブジェクトの各アイテムに対して並行して実行されるMapステートを定義します。各アイテムのファイル名とキーを指定します。
  • copyObject: 指定されたS3バケットから別のバケットへオブジェクトをコピーするStep Functionsのタスクを作成します。バケット名、コピー元、およびコピー先のキーはパラメータで指定します。

lib/stack/s3-copy-flow-stack.ts

import { Construct } from "constructs";
import * as cdk from "aws-cdk-lib";
import { S3Construct } from "../constructs/s3";
import {
  StepFunctionsConstruct,
  StepFunctionsConstructProps,
} from "../constructs/step-functions";
import {
  EventBridgeConstruct,
  EventBridgeConstructProps,
} from "../constructs/eventbridge";

export interface S3CopyFlowStackProps extends cdk.StackProps {
  envName: string;
  projectName: string;
}

export class S3CopyFlowStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props: S3CopyFlowStackProps) {
    super(scope, id, props);
    const s3Construct = new S3Construct(this, "S3", {
      envName: props.envName,
      projectName: props.projectName,
    });
    const stepFunctionsConstruct = new StepFunctionsConstruct(
      this,
      "StepFunctions",
      {
        envName: props.envName,
        projectName: props.projectName,
        dataSourceBucketName: s3Construct.dataSourceBucket.bucketName,
        dataStoreBucketName: s3Construct.dataStoreBucket.bucketName,
      } as StepFunctionsConstructProps
    );
    new EventBridgeConstruct(this, "EventBridge", {
      envName: props.envName,
      projectName: props.projectName,
      stateMachineArn: stepFunctionsConstruct.stateMachine.stateMachineArn,
    } as EventBridgeConstructProps);
  }
}

上記ファイルではStackを定義しその中でConstructとしてリソースを定義しています。S3 Bucket, Step Functions, EventBridgeで依存関係があるため、上記の順となっています。bin/app.tsから取得したenvName, projectNameは全てのConstructで活用するため、引数として指定しています。

parameter.ts

import { Environment } from "aws-cdk-lib";

// Parameters for Application
export interface AppParameter {
  env: Environment;
  envName: string;
  projectName: string;
}

// Example
export const devParameter: AppParameter = {
  envName: "dev",
  projectName: "kasama",
  env: { account: "xxxxxx", region: "ap-northeast-1" },
};

環境変数を定義するためのファイルとなります。accountには実際のAWS_ACCOUNT_IDを記載します。そのほかには、evnNameやprojectNameなどは一括で修正できるように変数として定義しています。

デプロイ

package.jsonがあるディレクトリで依存関係をインストールします。

npm install

次にcdk.jsonがあるディレクトリで、CDKで定義されたリソースのコードをAWS CloudFormationテンプレートに合成(変換)するプロセスを実行します。

npx cdk synth --profile <YOUR_AWS_PROFILE>

同じくcdk.jsonがあるディレクトリでデプロイコマンドを実行します。--allはCDKアプリケーションに含まれる全てのスタックをデプロイするためのオプション、--require-approval neverはセキュリティ的に敏感な変更やIAMリソースの変更を含むデプロイメント時の承認を求めるダイアログ表示を完全にスキップします。neverは、どんな変更でも事前確認なしにデプロイすることを意味します。今回は検証用なので指定していますが、慎重にデプロイする場合は必要のないオプションになるかもしれません。

npx cdk deploy --all --require-approval never --profile <YOUR_AWS_PROFILE>

実行結果

それではStep Functionsのスケジュール起動を確認したいと思います。事前準備として指定のpathに2つのcsvファイルを格納します。

JSTの20:00にEventBridgeが実行されていることを確認。

Step Functionsも正常終了。

データストア Bucketに2ファイルが連携されていることを確認。

CDK実装にあたっての参考情報

今回の実装にあたって初めてCDKを学習したのでその際の資料を参考までに残しておきたいと思います。 (CDK初心者なのでそのほかにも参考資料あるとぜひコメントいただけると学びになります!)

Step Functions内のS3操作参考

CDK実装

最後に

テスト実装やCI/CDについてもまだまだ自分にとっては未知の領域なので引き続き学んでいきたいと思います。