Lambda+Glue+Step Functionsの構成をAWS CDKでデプロイしてみた

2024.05.29

データアナリティクス事業本部のueharaです。

今回は、Lambda+Glue+Step Functionsの構成をAWS CDKでデプロイしてみたいと思います。

はじめに

以前、Lambda+Glue+Step Functionsといった比較的軽量なETLでありがちな構成をServerless FrameworkとAWS SAMでそれぞれデプロイしてみるというブログを公開しました。

今回は上記のブログでもデプロイを実施した以下の構成をAWS CDKを用いてデプロイしたいと思います。

フォルダ構成

今回のフォルダ構成は以下の通りです。

node_modulesフォルダと、cdk.outフォルダについては省略しています。

$ tree -I "node_modules|cdk.out" --dirsfirst
.
├── bin
│   └── cdk-etl-sample.ts
├── lib
│   ├── common
│   │   └── s3.ts
│   ├── etl
│   │   └── etl-sample.ts
│   └── cdk-etl-sample-stack.ts
├── resources
│   ├── glue
│   │   └── sample_glue.py
│   └── lambda
│       └── sample_func.py
├── test
│   └── cdk-etl-sample.test.ts
├── README.md
├── cdk.json
├── jest.config.js
├── package-lock.json
├── package.json
├── parameter.ts
└── tsconfig.json

各ファイルの実装

bin/cdk-etl-sample.ts

cdk-etl-sample.ts

#!/usr/bin/env node
import 'source-map-support/register';
import * as cdk from 'aws-cdk-lib';
import { CdkEtlSampleStack } from '../lib/cdk-etl-sample-stack';
import { devParameter } from '../parameter'

const app = new cdk.App();

new CdkEtlSampleStack(app, "DevStack", {
  env: {
    account: devParameter.env?.account || process.env.CDK_DEFAULT_ACCOUNT,
    region: devParameter.env?.region || process.env.CDK_DEFAULT_REGION,
  },
  envName: devParameter.envName,
  projectName: devParameter.projectName,
});

ここでは、後述する parameter.ts に記載されている devParameter に記載のパラメーターを用いて DevStack という名前のスタックを定義しています。

これにより、スタック作成時にそれぞれの環境に適したパラメータが適用できるようになります。

仮に prod 環境のパラメータを記載した prodParameter を利用したスタックも定義したい場合は以下のように記載することができます。

// 前略
new CdkEtlSampleStack(app, "DevStack", {
  env: {
    account: devParameter.env?.account || process.env.CDK_DEFAULT_ACCOUNT,
    region: devParameter.env?.region || process.env.CDK_DEFAULT_REGION,
  },
  envName: devParameter.envName,
  projectName: devParameter.projectName,
});

new CdkEtlSampleStack(app, "ProdStack", {
  env: {
    account: prodParameter.env?.account || process.env.CDK_DEFAULT_ACCOUNT,
    region: prodParameter.env?.region || process.env.CDK_DEFAULT_REGION,
  },
  envName: prodParameter.envName,
  projectName: prodParameter.projectName,
});

lib/common/s3.ts

import * as cdk from "aws-cdk-lib";
import { Bucket, BlockPublicAccess, BucketEncryption } from "aws-cdk-lib/aws-s3";
import { Construct } from "constructs";

export interface S3ConstructProps {
  envName: string;
  projectName: string;
}

export class S3Construct extends Construct {
  public readonly dataSourceBucket: Bucket;
  public readonly glueScriptBucket: Bucket;

  constructor(scope: Construct, id: string, props: S3ConstructProps) {
    super(scope, id);

    // データソース用のS3バケット
    this.dataSourceBucket = new Bucket(this, "DataSourceBucket", {
      bucketName: `${props.projectName}-${props.envName}-data-source-bucket`,
      removalPolicy: cdk.RemovalPolicy.DESTROY,
      blockPublicAccess: BlockPublicAccess.BLOCK_ALL,
      encryption: BucketEncryption.KMS_MANAGED,
      versioned: true,
      eventBridgeEnabled: true,
    });

    // Glueのスクリプトを配置する用のS3バケット
    this.glueScriptBucket = new Bucket(this, "GlueScriptBucket", {
        bucketName: `${props.projectName}-${props.envName}-glue-script-bucket`,
        removalPolicy: cdk.RemovalPolicy.DESTROY,
        blockPublicAccess: BlockPublicAccess.BLOCK_ALL,
        encryption: BucketEncryption.KMS_MANAGED,
        versioned: true,
    });
    // Glue用のスクリプトをS3にアップロード
    new cdk.aws_s3_deployment.BucketDeployment(this, "DeployGlueScript", {
        sources: [cdk.aws_s3_deployment.Source.asset("resources/glue")],
        destinationBucket: this.glueScriptBucket,
    });
  }
}

ここではデータソース用のS3バケットと、Glueの実行スクリプトを配置するためのS3バケットを定義しています。

後述しますが、今回は resources/glue フォルダ配下にGlueで実行するためのPythonファイルを配置するので、それをS3バケットにアップロードする内容も記載しています。

lib/etl/etl-sample.ts

import * as cdk from "aws-cdk-lib";
import { Construct } from "constructs";
import * as iam from "aws-cdk-lib/aws-iam";
import * as lambda from "aws-cdk-lib/aws-lambda";
import * as glue from "aws-cdk-lib/aws-glue";
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
import * as tasks from 'aws-cdk-lib/aws-stepfunctions-tasks';
import * as events from "aws-cdk-lib/aws-events";
import * as targets from "aws-cdk-lib/aws-events-targets";

export interface ETLSampleConstructProps {
  envName: string;
  projectName: string;
}

export class ETLSampleConstruct extends Construct {
  constructor(scope: Construct, id: string, props: ETLSampleConstructProps) {
    super(scope, id);
    
    //// Lambda
    // Lambda Role
    const lambdaRole = new iam.Role (this, "LambdaRole", {
      assumedBy: new iam.ServicePrincipal("lambda.amazonaws.com"),
      description: "IAM role for Lambda",
      roleName: `${props.projectName}-${props.envName}-lambda-role`,
      managedPolicies: [
        iam.ManagedPolicy.fromAwsManagedPolicyName("service-role/AWSLambdaBasicExecutionRole"),
        iam.ManagedPolicy.fromAwsManagedPolicyName("AmazonS3FullAccess"),
      ],
    });
    // Lambda Function
    const lambdaFunc = new lambda.Function(this, "LambdaFunction", {
      functionName: `${props.projectName}-${props.envName}-lambda`,
      runtime: lambda.Runtime.PYTHON_3_9,
      code: lambda.Code.fromAsset("resources/lambda"),
      handler: "sample_func.lambda_handler",
      memorySize: 128,
      timeout: cdk.Duration.seconds(900),
      role: lambdaRole,
      architecture: lambda.Architecture.X86_64,
    });
  
    //// Glue
    // Glue Job Role
    const glueJobRole = new iam.Role(this, "GlueJobRole", {
      assumedBy: new iam.ServicePrincipal("glue.amazonaws.com"),
      description: "IAM Role for Glue Job",
      roleName: `${props.projectName}-${props.envName}-glue-job-role`,
      managedPolicies: [
        iam.ManagedPolicy.fromAwsManagedPolicyName("service-role/AWSGlueServiceRole"),
        iam.ManagedPolicy.fromAwsManagedPolicyName("AmazonS3FullAccess"),
      ],
    });
    // Glue Job
    const glueJob = new glue.CfnJob(this, "GlueJob", {
      name: `${props.projectName}-${props.envName}-glue-job`,
      role: glueJobRole.roleArn,
      command: {
        name: "pythonshell",
        pythonVersion: "3.9",
        scriptLocation: `s3://${props.projectName}-${props.envName}-glue-script-bucket/sample_glue.py`,
      },
      executionProperty: {
        maxConcurrentRuns: 3,
      },
      maxCapacity: 0.0625,
      maxRetries: 0,
      defaultArguments: {
        "--job-language": "python",
        "library-set": "analytics",
      },
    });

    //// Step Functions
    // Step Functions Role
    const stepFunctionsRole = new iam.Role(this, "StepFunctionsRole", {
      assumedBy: new iam.ServicePrincipal("states.amazonaws.com"),
      description: "IAM role for Step Functions",
      roleName: `${props.projectName}-${props.envName}-stepfunctions-role`,
      managedPolicies: [
        iam.ManagedPolicy.fromAwsManagedPolicyName("service-role/AWSLambdaRole"),
        iam.ManagedPolicy.fromAwsManagedPolicyName("service-role/AWSGlueServiceRole"),
      ],
    });
    // State Machine
    const stateMachine = new sfn.StateMachine(this, "StepFunctions", {
      stateMachineName: `${props.projectName}-${props.envName}-sf`,
      definition: sfn.Chain.start(
        new tasks.LambdaInvoke(this, "InvokeLambda", {
          lambdaFunction: lambdaFunc,
        })
      ).next(
        new tasks.GlueStartJobRun(this, "InvokeGlueJob", {
          glueJobName: glueJob.name!,
          integrationPattern: sfn.IntegrationPattern.RUN_JOB,
        })
      ),
      role: stepFunctionsRole,
    });

    //// EventBridge
    // EventBridge Role
    const eventBridgeRole = new iam.Role(this, "EventBridgeRole", {
      assumedBy: new iam.ServicePrincipal("events.amazonaws.com"),
      description: "IAM role for EventBridge",
      roleName: `${props.projectName}-${props.envName}-eventbridge-role`,
    });
    eventBridgeRole.addToPolicy(new iam.PolicyStatement({
      actions: ["states:StartExecution"],
      resources: [stateMachine.stateMachineArn],
    }));
    // EventBridge Rule
    const s3PutEventRule = new events.Rule(this, 'S3PutEventRule', {
      ruleName: `${props.projectName}-${props.envName}-sf-event`,
      eventPattern: {
        source: ["aws.s3"],
        detailType: ["Object Created"],
        detail: {
          bucket: {
            name: [`${props.projectName}-${props.envName}-data-source-bucket`],
          },
          object: {
            key: [{ prefix: "tmp/" }],
            size: [{ numeric: [">", 0] }],
          },
        },
      },
    });
    s3PutEventRule.addTarget(new targets.SfnStateMachine(stateMachine, {
      role: eventBridgeRole,
    }));
  }
}

こちらが今回メインとなるLambda+Glue+Step Functionsの構成を記載したファイルです。

簡単化のため1つのConstructとしてまとめています。

また、今回は検証用なのでS3のアクセス等は最小権限となっていないことにご注意ください。

冒頭で紹介したブログの内容と同じように、データストアバケットに /tmp プレフィックスを持つオブジェクトが作成されるとEventBridge経由でStep Functionsが起動するというものになっています。

lib/cdk-etl-sample-stack.ts

import * as cdk from 'aws-cdk-lib';
import { Construct } from 'constructs';
import { S3Construct } from './common/s3';
import { ETLSampleConstruct } from './etl/etl-sample';

export interface StackProps extends cdk.StackProps {
  envName: string;
  projectName: string;
}

export class CdkEtlSampleStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props: StackProps) {
    super(scope, id, props);

    const s3Construct = new S3Construct(this, "S3", {
      envName: props.envName,
      projectName: props.projectName,
    });

    const etlSampleConstruct = new ETLSampleConstruct(this, "ETLSample", {
      envName: props.envName,
      projectName: props.projectName
    });
  }
}

スタックとして、先に定義したS3とETLアプリケーション(Lambda+Glue+Step Functions)を定義しています。

resources/glue/sample_glue.py

import sys
import time


def main(argv):
    print("start")
    # sleep 5 minutes
    time.sleep(300)
    print("end")


main(sys.argv)

Glueで動かすPythonスクリプトになります。検証用なのでwaitするだけで特に処理はしません。

resources/lambda/sample_func.py

import time


def lambda_handler(event, context):
    print("start")
    # sleep 1 minute
    time.sleep(60)
    print("end")

    return {"message": "success"}

Lambdaで動かすPythonスクリプトになります。こちらも検証用なのでwaitするだけで特に処理はしません。

parameter.ts

import { Environment } from "aws-cdk-lib";

export interface MyParameter {
    env?: Environment;
    envName: string;
    projectName: string;
}

export const devParameter: MyParameter = {
    envName: "dev",
    projectName: "etl-sample-app",
}

こちらが環境を分離するためのパラメーターを記載するファイルになります。

現状は envNameprojectName しか記載をしていませんが、インスタンスタイプやノード数など、環境毎に定義を分けたいパラメーターを追記して管理することもできます。

デプロイ

デプロイは以下で実施できます。

$ cdk deploy DevStack

デプロイが完了すると、DevStackという名前のスタックがデプロイされていることが確認できると思います。

実行確認

試しに今回作成したデータストアバケットに tmp のprefixを持つオブジェクトキーでファイルをアップロードしたところ、Step Functionsが起動することが確認できました。

最後に

今回は、Lambda+Glue+Step Functionsの構成をAWS CDKでデプロイしてみました。

参考になりましたら幸いです。

参考文献