はじめに
データアナリティクス事業本部ビッグデータチームのyosh-kです。
今回はStep Functions内でLambdaを実行する実装をCDKで行いたいと思います。
前提
今回実現したい構成は以下になります。
- EventBridgeでdata-source Bucketへの
Object Created
イベントを検知し、Step Functionsを起動します。 - Step Functions内Lambdaを実行します。
- Lambdaでデータ加工した出力先としてdata-store Bucketにcsvファイルを生成します。
加工処理前のcsvファイルの中身になります。
animal_info.csv
"animal_id","animal_name","species","weight_kg","age_years","habitat","arrival_year"
"001","Luna","Tiger","120","5","Savanna","2018"
"002","Bella","Elephant","2500","8","Jungle","2015"
"003","Max","Lion","190","6","Savanna","2017"
"004","Daisy","Giraffe","800","4","Savanna","2019"
"005","Charlie","Rhino","2300","7","Jungle","2016"
lambda内の処理は以下ブログのGlue Python ScriptをLambda用に修正した形になります。
【CDK】Step FuctionsでGlue Python Shellを実行
実装
それでは実装になります。実装コードはリンクに格納しています。
@36_step_functions_lambda_with_cdk % tree
.
├── README.md
├── cdk
│ ├── bin
│ │ └── app.ts
│ ├── cdk.json
│ ├── jest.config.js
│ ├── lib
│ │ ├── constructs
│ │ │ ├── eventbridge.ts
│ │ │ ├── lambda.ts
│ │ │ ├── s3.ts
│ │ │ └── step-functions.ts
│ │ └── stack
│ │ └── etl-stack.ts
│ ├── package.json
│ ├── parameter.ts
│ ├── test
│ │ └── app.test.ts
│ └── tsconfig.json
└── resources
└── lambda
├── etl_handler.py
└── lib
└── get_logger.py
10 directories, 15 files
@36_step_functions_lambda_with_cdk %
bin/app.ts
#!/usr/bin/env node
import * as cdk from "aws-cdk-lib";
import { ETLStack } from "../lib/stack/etl-stack";
import { devParameter, prodParameter } from "../parameter";
const app = new cdk.App();
// This context need to be specified in args
const argContext = "environment";
let envKey = app.node.tryGetContext(argContext);
if (envKey == undefined) {
console.warn(
`Warning: Environment key not specified, defaulting to 'dev'. For specifying environment, use context option. ex) cdk deploy -c ${argContext}=dev/prod`
);
// throw new Error(
// `Please specify environment with context option. ex) cdk deploy -c ${argContext}=dev`
// );
envKey = "dev";
}
let parameter;
if (envKey === "dev") {
parameter = devParameter;
} else {
parameter = prodParameter;
}
new ETLStack(app, `CMKasamaETL${envKey.toUpperCase()}`, {
description: `${parameter.projectName}-${parameter.envName}-test-tag`,
env: {
account: parameter.env?.account || process.env.CDK_DEFAULT_ACCOUNT,
region: parameter.env?.region || process.env.CDK_DEFAULT_REGION,
},
tags: {
Repository: `${parameter.projectName}-${parameter.envName}-test-tag`,
Environment: parameter.envName,
},
projectName: parameter.projectName,
envName: parameter.envName,
});
envKey
:cdk deploy
コマンド実行時に引数としてenvironmentを受け取ります。environmentの値に応じてパラメータを設定することで環境に応じた設定を実現するようにしています。envKeyコマンドライン引数がなければデフォルトでdevを設定するようにしていますが、プロジェクトに応じては、引数がなければエラーで返す実装でも良いと思います。description
: CloudFormation StackのDescriptionとなります。env
: deploy先のaccoun, regionを設定します。parameterで定義していなければdeployコマンド実行環境のデフォルト値を設定します。tag
: 作成されるリソースに対してのタグを設定します。projectName, envName
: 処理の中でリソース名の一部として使用します。
lib/constructs/eventbridge.ts
import { Construct } from "constructs";
import * as events from "aws-cdk-lib/aws-events";
import * as eventsTargets from "aws-cdk-lib/aws-events-targets";
import * as sfn from "aws-cdk-lib/aws-stepfunctions";
import * as iam from "aws-cdk-lib/aws-iam";
export interface EventBridgeConstructProps {
envName: string;
projectName: string;
dataSourceBucketName: string;
stateMachineArn: string;
}
export class EventBridgeConstruct extends Construct {
constructor(scope: Construct, id: string, props: EventBridgeConstructProps) {
super(scope, id);
const s3PutRule = new events.Rule(this, "S3PutRule", {
eventPattern: {
source: ["aws.s3"],
detailType: ["Object Created"],
detail: {
bucket: {
name: [props.dataSourceBucketName],
},
object: {
key: [{ prefix: "input/" }],
size: [{ numeric: [">", 0] }],
},
},
},
ruleName: `${props.projectName}-${props.envName}-etl-s3-put-rule`,
});
// EventBridge が Step Functions を起動するための IAM ロールを作成
const eventBridgeExecutionRole = new iam.Role(
this,
`EventBridgeExecutionRole`,
{
assumedBy: new iam.ServicePrincipal("events.amazonaws.com"), // 信頼ポリシー設定
description:
"An IAM role for EventBridge to Start Step Functions Execution",
roleName: `EventBridgeExecutionRoleForStepFunctions-${props.envName}`,
}
);
eventBridgeExecutionRole.addToPolicy(
new iam.PolicyStatement({
actions: ["states:StartExecution"], // 許可するアクション
resources: [props.stateMachineArn], // ステートマシンのARN
})
);
// ステップ関数をS3 Put Eventのターゲットとして設定
const stateMachine = sfn.StateMachine.fromStateMachineArn(
this,
"ImportedStateMachine",
props.stateMachineArn
);
s3PutRule.addTarget(
new eventsTargets.SfnStateMachine(stateMachine, {
role: eventBridgeExecutionRole,
})
);
}
}
EventBridge, EventBridge用のIAM Role, IAM RoleのPolicyの定義をしています。フォルダが作成された際には起動してほしくないため、ファイルサイズが0以上という条件をつけています。
lib/constructs/lambda.ts
import * as cdk from "aws-cdk-lib";
import * as lambda from "aws-cdk-lib/aws-lambda";
import { Construct } from "constructs";
import * as iam from "aws-cdk-lib/aws-iam";
export interface LambdaConstructProps {
envName: string;
projectName: string;
dataSourceBucketName: string;
dataStoreBucketName: string;
}
export class LambdaConstruct extends Construct {
public readonly LambdaName: string;
constructor(scope: Construct, id: string, props: LambdaConstructProps) {
super(scope, id);
const lambdaRole = new iam.Role(this, "LambdaExecutionRole", {
assumedBy: new iam.ServicePrincipal("lambda.amazonaws.com"),
managedPolicies: [
iam.ManagedPolicy.fromAwsManagedPolicyName(
"service-role/AWSLambdaBasicExecutionRole"
),
],
});
const pandasLayer = lambda.LayerVersion.fromLayerVersionArn(
this,
"AwsSdkPandasLayer",
"arn:aws:lambda:ap-northeast-1:336392948345:layer:AWSSDKPandas-Python312-Arm64:8"
);
this.LambdaName = `${props.projectName}-${props.envName}-etl-handler`;
new lambda.Function(this, "EtlHandler", {
functionName: this.LambdaName,
runtime: lambda.Runtime.PYTHON_3_12,
code: lambda.Code.fromAsset("../resources/lambda"),
handler: "etl_handler.main",
layers: [pandasLayer],
memorySize: 512,
timeout: cdk.Duration.seconds(900),
role: lambdaRole,
environment: {
S3_OUTPUT_BUCKET: props.dataStoreBucketName,
S3_OUTPUT_KEY_PREFIX: "output/",
},
architecture: lambda.Architecture.ARM_64,
});
// 任意の追加ポリシーをLambdaのIAMロールにアタッチ
lambdaRole.addToPolicy(
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: ["s3:ListBucket", "s3:GetObject", "s3:PutObject"],
resources: [
`arn:aws:s3:::${props.dataStoreBucketName}`,
`arn:aws:s3:::${props.dataStoreBucketName}/*`,
`arn:aws:s3:::${props.dataSourceBucketName}`,
`arn:aws:s3:::${props.dataSourceBucketName}/*`,
],
})
);
}
}
Lambda、Lambda用IAM Roleを定義しています。lambda上でpandasを使用するため、AWS Lambda Managed Layers
のARNを指定しています。
lib/constructs/s3.ts
import * as cdk from "aws-cdk-lib";
import {
Bucket,
BlockPublicAccess,
BucketEncryption,
} from "aws-cdk-lib/aws-s3";
import { Construct } from "constructs";
export interface S3ConstructProps {
envName: string;
projectName: string;
}
export class S3Construct extends Construct {
public readonly dataSourceBucket: Bucket;
public readonly dataStoreBucket: Bucket;
constructor(scope: Construct, id: string, props: S3ConstructProps) {
super(scope, id);
this.dataSourceBucket = new Bucket(this, "DataSourceBucket", {
bucketName: `${props.projectName}-${props.envName}-data-source`,
removalPolicy: cdk.RemovalPolicy.DESTROY,
blockPublicAccess: BlockPublicAccess.BLOCK_ALL,
encryption: BucketEncryption.KMS_MANAGED,
versioned: true,
eventBridgeEnabled: true,
});
this.dataStoreBucket = new Bucket(this, "DataStoreBucket", {
bucketName: `${props.projectName}-${props.envName}-data-store`,
removalPolicy: cdk.RemovalPolicy.DESTROY,
blockPublicAccess: BlockPublicAccess.BLOCK_ALL,
encryption: BucketEncryption.KMS_MANAGED,
versioned: true,
});
}
}
インプットファイルを格納するBucket、アウトプットファイルを格納するBucketを定義しています。Bucketを他のConstructで参照するためpublic readonlyで参照できる定義としています。
lib/constructs/step-functions.ts
import { Construct } from "constructs";
import * as sfn from "aws-cdk-lib/aws-stepfunctions";
import * as sfn_tasks from "aws-cdk-lib/aws-stepfunctions-tasks";
import * as iam from "aws-cdk-lib/aws-iam";
import * as lambda from "aws-cdk-lib/aws-lambda";
export interface StepFunctionsConstructProps {
envName: string;
projectName: string;
dataSourceBucketName: string;
dataStoreBucketName: string;
lambdaName: string;
}
export class StepFunctionsConstruct extends Construct {
public readonly stateMachine: sfn.StateMachine;
constructor(
scope: Construct,
id: string,
props: StepFunctionsConstructProps
) {
super(scope, id);
const stepFunctionsRole = new iam.Role(this, `StepFunctionsRole`, {
assumedBy: new iam.ServicePrincipal("states.amazonaws.com"),
description: "An IAM role for Step Functions to access AWS services",
roleName: `${props.projectName}-${props.envName}-stepfunctions-role`,
managedPolicies: [
iam.ManagedPolicy.fromAwsManagedPolicyName(
"service-role/AWSLambdaBasicExecutionRole"
),
iam.ManagedPolicy.fromAwsManagedPolicyName(
"service-role/AWSLambdaRole"
),
],
});
stepFunctionsRole.addToPolicy(
new iam.PolicyStatement({
actions: [
"s3:ListBucket",
"s3:GetObject",
"s3:PutObject",
"s3:CopyObject",
],
resources: [
`arn:aws:s3:::${props.dataSourceBucketName}`,
`arn:aws:s3:::${props.dataSourceBucketName}/*`,
`arn:aws:s3:::${props.dataStoreBucketName}`,
`arn:aws:s3:::${props.dataStoreBucketName}/*`,
],
})
);
const lambdaFunction = lambda.Function.fromFunctionName(
this,
"LambdaFunctionByName",
props.lambdaName
);
// Lambda Invoke Task
const lambdaInvoke = new sfn_tasks.LambdaInvoke(
this,
"InvokeLambdaFunction",
{
lambdaFunction: lambdaFunction,
inputPath: "$",
outputPath: "$.Payload",
}
);
/// ジョブ失敗時の振る舞いを定義
const jobFailed = new sfn.Fail(this, "JobFailed", {
errorPath: "$.Error",
causePath: "$.Cause",
});
lambdaInvoke.addCatch(jobFailed, {
errors: ["States.ALL"],
});
// Define the State Machine
const definitionBody = sfn.DefinitionBody.fromChainable(lambdaInvoke);
this.stateMachine = new sfn.StateMachine(this, "StateMachine", {
definitionBody: definitionBody,
stateMachineName: `${props.projectName}-${props.envName}-statemachine`,
role: stepFunctionsRole,
});
}
}
lambdaFunction
: AWS CDKを使用して、指定された名前の既存のLambda関数を参照しています。LambdaInvoke
: このタスクは、ステートマシンの一部として定義され、指定されたLambda関数を呼び出します。入力パスと出力パスを通じて、Lambda関数の入力と結果の処理方法を制御します。jobFailed
: ジョブが失敗した場合の動作を定義する Fail ステートです。ジョブ失敗時のエラーメッセージや原因をカスタマイズできます。addCatch(jobFailed, {errors: ["States.ALL"]})
: Lambda呼び出しタスクが失敗した場合に、あらゆる例外をキャッチして jobFailed ステートに遷移させるように設定します。今回はStates.ALLであらゆる例外をキャッチしていますが、実際の運用ではそれぞれの例外に沿った処理を検討してみる方が良いかもしれません。
lib/stack/etl-stack.ts
import { Construct } from "constructs";
import * as cdk from "aws-cdk-lib";
import { S3Construct } from "../constructs/s3";
import { LambdaConstruct, LambdaConstructProps } from "../constructs/lambda";
import {
StepFunctionsConstruct,
StepFunctionsConstructProps,
} from "../constructs/step-functions";
import {
EventBridgeConstruct,
EventBridgeConstructProps,
} from "../constructs/eventbridge";
export interface ETLStackProps extends cdk.StackProps {
envName: string;
projectName: string;
}
export class ETLStack extends cdk.Stack {
constructor(scope: Construct, id: string, props: ETLStackProps) {
super(scope, id, props);
const s3Construct = new S3Construct(this, "S3", {
envName: props.envName,
projectName: props.projectName,
});
const lambdaConstruct = new LambdaConstruct(this, "Lambda", {
envName: props.envName,
projectName: props.projectName,
dataSourceBucketName: s3Construct.dataSourceBucket.bucketName,
dataStoreBucketName: s3Construct.dataStoreBucket.bucketName,
} as LambdaConstructProps);
const stepFunctionsConstruct = new StepFunctionsConstruct(
this,
"StepFunctions",
{
envName: props.envName,
projectName: props.projectName,
dataSourceBucketName: s3Construct.dataSourceBucket.bucketName,
dataStoreBucketName: s3Construct.dataStoreBucket.bucketName,
lambdaName: lambdaConstruct.LambdaName,
} as StepFunctionsConstructProps
);
new EventBridgeConstruct(this, "EventBridge", {
envName: props.envName,
projectName: props.projectName,
dataSourceBucketName: s3Construct.dataSourceBucket.bucketName,
stateMachineArn: stepFunctionsConstruct.stateMachine.stateMachineArn,
} as EventBridgeConstructProps);
}
}
上記ファイルではStackを定義しその中でConstructとしてリソースを定義しています。S3 Bucket、Lambda、Step Functions, EventBridgeで依存関係があるため、上記の順となっています。bin/app.ts
から取得したenvName, projectNameは全てのConstructで活用するため、引数として指定しています。
parameter.ts
import { Environment } from "aws-cdk-lib";
// Parameters for Application
export interface AppParameter {
env: Environment;
envName: string;
projectName: string;
}
// Example
export const devParameter: AppParameter = {
envName: "dev",
projectName: "cm-kasama",
env: {},
// env: { account: "xxxxxx", region: "ap-northeast-1" },
};
export const prodParameter: AppParameter = {
envName: "prod",
projectName: "cm-kasama",
env: {},
// env: { account: "xxxxxx", region: "ap-northeast-1" },
};
環境変数を定義するためのファイルとなります。accountには実際のAWS_ACCOUNT_IDを記載します。そのほかには、evnNameやprojectNameなどは一括で修正できるように変数として定義しています。私の場合はprofileで指定したIAM Roleに紐づくAWS Account、Regionへデプロイするためenvを空で設定しています。
resources/lambda/etl_handler.py
import os
import boto3
import traceback
import pandas as pd
from io import StringIO
from datetime import datetime
from zoneinfo import ZoneInfo
from lib.get_logger import GetLogger
get_logger = GetLogger(__name__)
logger = get_logger.logger
def main(event, context):
try:
# ジョブから渡されたパラメータを使用
s3_input_bucket = event["detail"]["bucket"]["name"]
s3_input_key = event["detail"]["object"]["key"]
s3_output_bucket = os.environ["S3_OUTPUT_BUCKET"]
s3_output_key = os.environ["S3_OUTPUT_KEY_PREFIX"]
logger.info(f"s3_input_bucket:{s3_input_bucket}")
logger.info(f"s3_input_key:{s3_input_key}")
# Boto3クライアントの初期化
s3_client = boto3.client("s3")
# S3からCSVファイルを読み込む
response = s3_client.get_object(Bucket=s3_input_bucket, Key=s3_input_key)
input_data = pd.read_csv(response["Body"])
# 動物園に来てからの年数を計算して新しい列に追加
current_year = datetime.now(ZoneInfo("Asia/Tokyo")).year
input_data["years_in_zoo"] = current_year - input_data["arrival_year"]
# Pandas DataFrameをCSV文字列に変換
output_csv = input_data.to_csv(index=False)
output_file = StringIO(output_csv)
current_time = datetime.now(ZoneInfo("Asia/Tokyo")).strftime("%Y-%m-%d-%H-%M-%S")
# 現在の日時を取得し、ファイル名を生成
output_filename = f"{s3_output_key}output_{current_time}.csv"
# 加工後のCSVをS3に保存
s3_client.put_object(
Bucket=s3_output_bucket,
Key=output_filename,
Body=output_file.getvalue(),
)
logger.info(f"Successful put {s3_output_bucket}/{output_filename}")
return {"statusCode": 200, "body": f"Successful put {s3_output_bucket}/{output_filename}"}
except Exception as e:
tb = traceback.format_exc()
logger.error(f"Unexpected error: {str(e)}\nTraceback: {tb}")
raise e
inputとなるcsvファイルのarrival_yearカラムを元に動物園に来てからの年数を計算して、years_in_zooカラムを生成し、timestamp型のsuffixを付与した新規csvファイルをoutputとなる格納先へputする処理となっています。
resources/lambda/lib/get_logger.py
import sys
import logging
class GetLogger:
def __init__(self, name):
self.logger = logging.getLogger(name)
self.logger.setLevel(logging.DEBUG)
formatter = logging.Formatter(
"%(asctime)s - %(levelname)s - %(name)s - %(funcName)s - line:%(lineno)d - %(message)s"
)
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(formatter)
warning_handler = logging.StreamHandler(sys.stderr)
warning_handler.setLevel(logging.WARNING)
warning_handler.setFormatter(formatter)
self.logger.addHandler(console_handler)
self.logger.addHandler(warning_handler)
loggerの設定を定義しています。詳細は以下のブログをご確認いただければと思います。
【Python】 loggingモジュールでLEVEL毎にファイルの出力先を変更しrotateさせる方法
デプロイ
package.jsonがあるディレクトリで依存関係をインストールします。
npm install
次にcdk.jsonがあるディレクトリで、CDKで定義されたリソースのコードをAWS CloudFormationテンプレートに合成(変換)するプロセスを実行します。
npx cdk synth --profile <YOUR_AWS_PROFILE>
同じくcdk.jsonがあるディレクトリでデプロイコマンドを実行します。--all
はCDKアプリケーションに含まれる全てのスタックをデプロイするためのオプション、--require-approval never
はセキュリティ的に敏感な変更やIAMリソースの変更を含むデプロイメント時の承認を求めるダイアログ表示を完全にスキップします。never
は、どんな変更でも事前確認なしにデプロイすることを意味します。今回は検証用なので指定していますが、慎重にデプロイする場合は必要のないオプションになるかもしれません。-c
でenvironmentを指定し、環境に合わせたデプロイを行います。
npx cdk deploy --all --require-approval never -c environment=dev --profile <YOUR_AWS_PROFILE>
実行結果
正常系
それではS3にファイルをPutし、Step Functionsの起動を確認したいと思います。
S3Put時間帯にEventBridgeが実行されていることを確認。(本ジョブ実行前に2回ほど直前で実行しているログがありますが無視していただければと思います。)
Step Functions正常終了。
Lambda正常終了。
data store Bucketにファイルが生成されていること、データ加工が想定通りであることを確認。
異常系
データ加工対象のカラムを保持していないcsvファイルをPut。
JobFailedとなり、Step Functionsが異常終了。
Lambdaも同様。
最後に
フォルダやファイル構成、例外処理、CI/CDなどプロジェクトに落とし込むための要件を全ては満たしていないので、引き続き学習したいと思います。