データアナリティクス事業本部のueharaです。
今回は、Lambda+Glue+Step Functionsの構成をAWS CDKでデプロイしてみたいと思います。
はじめに
以前、Lambda+Glue+Step Functionsといった比較的軽量なETLでありがちな構成をServerless FrameworkとAWS SAMでそれぞれデプロイしてみるというブログを公開しました。
今回は上記のブログでもデプロイを実施した以下の構成をAWS CDKを用いてデプロイしたいと思います。
フォルダ構成
今回のフォルダ構成は以下の通りです。
※node_modules
フォルダと、cdk.out
フォルダについては省略しています。
$ tree -I "node_modules|cdk.out" --dirsfirst
.
├── bin
│ └── cdk-etl-sample.ts
├── lib
│ ├── common
│ │ └── s3.ts
│ ├── etl
│ │ └── etl-sample.ts
│ └── cdk-etl-sample-stack.ts
├── resources
│ ├── glue
│ │ └── sample_glue.py
│ └── lambda
│ └── sample_func.py
├── test
│ └── cdk-etl-sample.test.ts
├── README.md
├── cdk.json
├── jest.config.js
├── package-lock.json
├── package.json
├── parameter.ts
└── tsconfig.json
各ファイルの実装
bin/cdk-etl-sample.ts
cdk-etl-sample.ts
#!/usr/bin/env node
import 'source-map-support/register';
import * as cdk from 'aws-cdk-lib';
import { CdkEtlSampleStack } from '../lib/cdk-etl-sample-stack';
import { devParameter } from '../parameter'
const app = new cdk.App();
new CdkEtlSampleStack(app, "DevStack", {
env: {
account: devParameter.env?.account || process.env.CDK_DEFAULT_ACCOUNT,
region: devParameter.env?.region || process.env.CDK_DEFAULT_REGION,
},
envName: devParameter.envName,
projectName: devParameter.projectName,
});
ここでは、後述する parameter.ts
に記載されている devParameter
に記載のパラメーターを用いて DevStack
という名前のスタックを定義しています。
これにより、スタック作成時にそれぞれの環境に適したパラメータが適用できるようになります。
仮に prod
環境のパラメータを記載した prodParameter
を利用したスタックも定義したい場合は以下のように記載することができます。
// 前略
new CdkEtlSampleStack(app, "DevStack", {
env: {
account: devParameter.env?.account || process.env.CDK_DEFAULT_ACCOUNT,
region: devParameter.env?.region || process.env.CDK_DEFAULT_REGION,
},
envName: devParameter.envName,
projectName: devParameter.projectName,
});
new CdkEtlSampleStack(app, "ProdStack", {
env: {
account: prodParameter.env?.account || process.env.CDK_DEFAULT_ACCOUNT,
region: prodParameter.env?.region || process.env.CDK_DEFAULT_REGION,
},
envName: prodParameter.envName,
projectName: prodParameter.projectName,
});
lib/common/s3.ts
import * as cdk from "aws-cdk-lib";
import { Bucket, BlockPublicAccess, BucketEncryption } from "aws-cdk-lib/aws-s3";
import { Construct } from "constructs";
export interface S3ConstructProps {
envName: string;
projectName: string;
}
export class S3Construct extends Construct {
public readonly dataSourceBucket: Bucket;
public readonly glueScriptBucket: Bucket;
constructor(scope: Construct, id: string, props: S3ConstructProps) {
super(scope, id);
// データソース用のS3バケット
this.dataSourceBucket = new Bucket(this, "DataSourceBucket", {
bucketName: `${props.projectName}-${props.envName}-data-source-bucket`,
removalPolicy: cdk.RemovalPolicy.DESTROY,
blockPublicAccess: BlockPublicAccess.BLOCK_ALL,
encryption: BucketEncryption.KMS_MANAGED,
versioned: true,
eventBridgeEnabled: true,
});
// Glueのスクリプトを配置する用のS3バケット
this.glueScriptBucket = new Bucket(this, "GlueScriptBucket", {
bucketName: `${props.projectName}-${props.envName}-glue-script-bucket`,
removalPolicy: cdk.RemovalPolicy.DESTROY,
blockPublicAccess: BlockPublicAccess.BLOCK_ALL,
encryption: BucketEncryption.KMS_MANAGED,
versioned: true,
});
// Glue用のスクリプトをS3にアップロード
new cdk.aws_s3_deployment.BucketDeployment(this, "DeployGlueScript", {
sources: [cdk.aws_s3_deployment.Source.asset("resources/glue")],
destinationBucket: this.glueScriptBucket,
});
}
}
ここではデータソース用のS3バケットと、Glueの実行スクリプトを配置するためのS3バケットを定義しています。
後述しますが、今回は resources/glue
フォルダ配下にGlueで実行するためのPythonファイルを配置するので、それをS3バケットにアップロードする内容も記載しています。
lib/etl/etl-sample.ts
import * as cdk from "aws-cdk-lib";
import { Construct } from "constructs";
import * as iam from "aws-cdk-lib/aws-iam";
import * as lambda from "aws-cdk-lib/aws-lambda";
import * as glue from "aws-cdk-lib/aws-glue";
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
import * as tasks from 'aws-cdk-lib/aws-stepfunctions-tasks';
import * as events from "aws-cdk-lib/aws-events";
import * as targets from "aws-cdk-lib/aws-events-targets";
export interface ETLSampleConstructProps {
envName: string;
projectName: string;
}
export class ETLSampleConstruct extends Construct {
constructor(scope: Construct, id: string, props: ETLSampleConstructProps) {
super(scope, id);
//// Lambda
// Lambda Role
const lambdaRole = new iam.Role (this, "LambdaRole", {
assumedBy: new iam.ServicePrincipal("lambda.amazonaws.com"),
description: "IAM role for Lambda",
roleName: `${props.projectName}-${props.envName}-lambda-role`,
managedPolicies: [
iam.ManagedPolicy.fromAwsManagedPolicyName("service-role/AWSLambdaBasicExecutionRole"),
iam.ManagedPolicy.fromAwsManagedPolicyName("AmazonS3FullAccess"),
],
});
// Lambda Function
const lambdaFunc = new lambda.Function(this, "LambdaFunction", {
functionName: `${props.projectName}-${props.envName}-lambda`,
runtime: lambda.Runtime.PYTHON_3_9,
code: lambda.Code.fromAsset("resources/lambda"),
handler: "sample_func.lambda_handler",
memorySize: 128,
timeout: cdk.Duration.seconds(900),
role: lambdaRole,
architecture: lambda.Architecture.X86_64,
});
//// Glue
// Glue Job Role
const glueJobRole = new iam.Role(this, "GlueJobRole", {
assumedBy: new iam.ServicePrincipal("glue.amazonaws.com"),
description: "IAM Role for Glue Job",
roleName: `${props.projectName}-${props.envName}-glue-job-role`,
managedPolicies: [
iam.ManagedPolicy.fromAwsManagedPolicyName("service-role/AWSGlueServiceRole"),
iam.ManagedPolicy.fromAwsManagedPolicyName("AmazonS3FullAccess"),
],
});
// Glue Job
const glueJob = new glue.CfnJob(this, "GlueJob", {
name: `${props.projectName}-${props.envName}-glue-job`,
role: glueJobRole.roleArn,
command: {
name: "pythonshell",
pythonVersion: "3.9",
scriptLocation: `s3://${props.projectName}-${props.envName}-glue-script-bucket/sample_glue.py`,
},
executionProperty: {
maxConcurrentRuns: 3,
},
maxCapacity: 0.0625,
maxRetries: 0,
defaultArguments: {
"--job-language": "python",
"library-set": "analytics",
},
});
//// Step Functions
// Step Functions Role
const stepFunctionsRole = new iam.Role(this, "StepFunctionsRole", {
assumedBy: new iam.ServicePrincipal("states.amazonaws.com"),
description: "IAM role for Step Functions",
roleName: `${props.projectName}-${props.envName}-stepfunctions-role`,
managedPolicies: [
iam.ManagedPolicy.fromAwsManagedPolicyName("service-role/AWSLambdaRole"),
iam.ManagedPolicy.fromAwsManagedPolicyName("service-role/AWSGlueServiceRole"),
],
});
// State Machine
const stateMachine = new sfn.StateMachine(this, "StepFunctions", {
stateMachineName: `${props.projectName}-${props.envName}-sf`,
definition: sfn.Chain.start(
new tasks.LambdaInvoke(this, "InvokeLambda", {
lambdaFunction: lambdaFunc,
})
).next(
new tasks.GlueStartJobRun(this, "InvokeGlueJob", {
glueJobName: glueJob.name!,
integrationPattern: sfn.IntegrationPattern.RUN_JOB,
})
),
role: stepFunctionsRole,
});
//// EventBridge
// EventBridge Role
const eventBridgeRole = new iam.Role(this, "EventBridgeRole", {
assumedBy: new iam.ServicePrincipal("events.amazonaws.com"),
description: "IAM role for EventBridge",
roleName: `${props.projectName}-${props.envName}-eventbridge-role`,
});
eventBridgeRole.addToPolicy(new iam.PolicyStatement({
actions: ["states:StartExecution"],
resources: [stateMachine.stateMachineArn],
}));
// EventBridge Rule
const s3PutEventRule = new events.Rule(this, 'S3PutEventRule', {
ruleName: `${props.projectName}-${props.envName}-sf-event`,
eventPattern: {
source: ["aws.s3"],
detailType: ["Object Created"],
detail: {
bucket: {
name: [`${props.projectName}-${props.envName}-data-source-bucket`],
},
object: {
key: [{ prefix: "tmp/" }],
size: [{ numeric: [">", 0] }],
},
},
},
});
s3PutEventRule.addTarget(new targets.SfnStateMachine(stateMachine, {
role: eventBridgeRole,
}));
}
}
こちらが今回メインとなるLambda+Glue+Step Functionsの構成を記載したファイルです。
簡単化のため1つのConstructとしてまとめています。
また、今回は検証用なのでS3のアクセス等は最小権限となっていないことにご注意ください。
冒頭で紹介したブログの内容と同じように、データストアバケットに /tmp
プレフィックスを持つオブジェクトが作成されるとEventBridge経由でStep Functionsが起動するというものになっています。
lib/cdk-etl-sample-stack.ts
import * as cdk from 'aws-cdk-lib';
import { Construct } from 'constructs';
import { S3Construct } from './common/s3';
import { ETLSampleConstruct } from './etl/etl-sample';
export interface StackProps extends cdk.StackProps {
envName: string;
projectName: string;
}
export class CdkEtlSampleStack extends cdk.Stack {
constructor(scope: Construct, id: string, props: StackProps) {
super(scope, id, props);
const s3Construct = new S3Construct(this, "S3", {
envName: props.envName,
projectName: props.projectName,
});
const etlSampleConstruct = new ETLSampleConstruct(this, "ETLSample", {
envName: props.envName,
projectName: props.projectName
});
}
}
スタックとして、先に定義したS3とETLアプリケーション(Lambda+Glue+Step Functions)を定義しています。
resources/glue/sample_glue.py
import sys
import time
def main(argv):
print("start")
# sleep 5 minutes
time.sleep(300)
print("end")
main(sys.argv)
Glueで動かすPythonスクリプトになります。検証用なのでwaitするだけで特に処理はしません。
resources/lambda/sample_func.py
import time
def lambda_handler(event, context):
print("start")
# sleep 1 minute
time.sleep(60)
print("end")
return {"message": "success"}
Lambdaで動かすPythonスクリプトになります。こちらも検証用なのでwaitするだけで特に処理はしません。
parameter.ts
import { Environment } from "aws-cdk-lib";
export interface MyParameter {
env?: Environment;
envName: string;
projectName: string;
}
export const devParameter: MyParameter = {
envName: "dev",
projectName: "etl-sample-app",
}
こちらが環境を分離するためのパラメーターを記載するファイルになります。
現状は envName
と projectName
しか記載をしていませんが、インスタンスタイプやノード数など、環境毎に定義を分けたいパラメーターを追記して管理することもできます。
デプロイ
デプロイは以下で実施できます。
$ cdk deploy DevStack
デプロイが完了すると、DevStack
という名前のスタックがデプロイされていることが確認できると思います。
実行確認
試しに今回作成したデータストアバケットに tmp
のprefixを持つオブジェクトキーでファイルをアップロードしたところ、Step Functionsが起動することが確認できました。
最後に
今回は、Lambda+Glue+Step Functionsの構成をAWS CDKでデプロイしてみました。
参考になりましたら幸いです。