はじめに
先日、複数のデータに対して同じ処理を実行させたいという場面があり、Step FunctionsのMapステートを使って実装しました。とても便利だったので、復習を兼ねて記事に残しておこうと思います。
やりたいこと
下図のようなフローをCDKで作成します。
まずGetListで処理を実行させたいデータの一覧を取得します。続いてその一覧をMapステートに渡して、各データに対してSomeProcessを並列に実行させるようにします。
環境
本記事を執筆したときの環境は以下になります。
- AWS CDK 2.84.0
- Node.js v18.16.0
また、CDKのインストールやブートストラップ等は実施済みであるとします。もしまだ実施していないという場合は、公式サイトを参考にしてみてください。
AWS CDK の開始方法 - AWS Cloud Development Kit (AWS CDK) v2
やってみる
CDKプロジェクトの作成
プロジェクトフォルダを作成し、下記コマンドを実行します。ここでは言語としてTypeScriptを選択します。
cdk init --language typescript
プロジェクトフォルダ内に必要なファイルが作成されます。
Lambda関数用コードの作成
まずはGetList
とSomeProcess
の2つのLambda関数用コードを作成します。
プロジェクトフォルダ直下にlambda
というフォルダを作成し、get-list.ts
とsome-process.ts
を作成します。
get-list.ts
のコードは以下のようにしてみました。
interface Person {
name: string
age: number
address: string
}
export const handler = async (): Promise<Person[]> => {
try {
console.log("start get-list.ts")
// 並列に処理したい各データ
const data = [
{
name: 'Tanaka',
age: 20,
address: 'Tokyo',
},
{
name: 'Mizuno',
age: 30,
address: 'Osaka',
},
{
name: 'Yamada',
age: 25,
address: 'Nagoya',
},
{
name: 'Sato',
age: 28,
address: 'Fukuoka',
},
]
return data
} catch (error) {
throw error
}
}
DBか何かから人物のデータ一覧を取得して、それぞれのデータに対して何か並列に処理を行うというイメージです。Mapステートに渡すために、データを配列形式で返します。
some-process.ts
のコードは次のようにしてみました。
interface ProcessEvent {
param: {
name: string
age: number
}
}
export const handler = async (event: ProcessEvent): Promise<void> => {
try {
console.log(`start some-process.ts name: ${event.param.name} age: ${event.param.age}`)
// 時間がかかる処理
await sleep(5000)
console.log(`end some-process.ts name: ${event.param.name} age: ${event.param.age}`)
} catch (error) {
throw error
}
}
const sleep = (msec: number) =>
new Promise((resolve) => setTimeout(resolve, msec))
このLambda関数は人物のデータを受け取って、何か時間のかかる処理を行うというイメージです。引数としてProcessEvent
型の値を受け取るように定義しています。
Step Functionsの作成
続いてStep Functionsのリソースを作っていきます。
lib
フォルダ配下に自動的に作成されたts
ファイルがあると思いますので、そのファイルを次のように編集します。
import * as cdk from 'aws-cdk-lib';
import { Duration } from 'aws-cdk-lib';
import { Runtime } from 'aws-cdk-lib/aws-lambda';
import { NodejsFunction } from 'aws-cdk-lib/aws-lambda-nodejs';
import { LambdaInvoke } from 'aws-cdk-lib/aws-stepfunctions-tasks';
import { LogLevel, StateMachine, Map } from 'aws-cdk-lib/aws-stepfunctions'
import { Construct } from 'constructs';
import { LogGroup, RetentionDays } from 'aws-cdk-lib/aws-logs';
// import * as sqs from 'aws-cdk-lib/aws-sqs';
export class CdkStepFunctionsMapStack extends cdk.Stack {
constructor(scope: Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);
// データ一覧を取得するLambda関数
const getListLambda = new NodejsFunction(
this,
'GetListLambda',
{
entry: './lambda/get-list.ts',
handler: 'handler',
timeout: Duration.minutes(10),
memorySize: 256,
runtime: Runtime.NODEJS_18_X,
}
)
// 各データに対して処理するLambda関数
const someProcessLambda = new NodejsFunction(
this,
'SomeProcessLambda',
{
entry: './lambda/some-process.ts',
handler: 'handler',
timeout: Duration.minutes(10),
memorySize: 256,
runtime: Runtime.NODEJS_18_X,
}
)
// データ一覧を取得するLambda関数を実行するプロセス
const getListProcessState = new LambdaInvoke(
this,
'getListProcessState',
{
lambdaFunction: getListLambda,
}
)
// 各データに対して処理するLambda関数を実行するプロセス
const someProcessProcessState = new LambdaInvoke(
this,
'someProcessProcessState',
{
lambdaFunction: someProcessLambda,
}
)
// 人物一覧を分割するためのMapステート
const mapPersonsProcessState = new Map(this, 'mapPersons', {
itemsPath: '$.Payload',
maxConcurrency: 5,
resultPath: '$.mapOutput',
parameters: {
param: {
'name.$': '$$.Map.Item.Value.name',
'age.$': '$$.Map.Item.Value.age',
},
},
})
// Map関数のイテレータを指定
mapPersonsProcessState.iterator(someProcessProcessState)
// リトライ設定
getListProcessState.addRetry({
interval: Duration.seconds(10),
maxAttempts: 2,
backoffRate: 2,
})
someProcessProcessState.addRetry({
interval: Duration.seconds(10),
maxAttempts: 2,
backoffRate: 2,
})
// Step Functionsを作成
const sampleStateMachine = new StateMachine(
this,
'sampleStateMachine',
{
stateMachineName: `sampleStateMachine`,
definition:
getListProcessState.next(mapPersonsProcessState),
logs: {
level: LogLevel.ALL,
destination: new LogGroup(
this,
'SampleStateMachineLogGroup',
{
retention: RetentionDays.ONE_WEEK,
}
),
},
}
)
}
}
まず、NodejsFunction
で作成したget-list.ts
とsome-process.ts
をLambda関数化します。そして、LambdaInvoke
でStep FunctionsからLambda関数を呼び出すためのアクションを作成します。
MapステートのitemPath
は、処理したい配列が格納されているJSONパスを指定します。今回の場合、getList
からの出力は以下のようなJSONになります。
{
"ExecutedVersion": "$LATEST",
"Payload": [
{
"name": "Tanaka",
"age": 20,
"address": "Tokyo"
},
{
"name": "Mizuno",
"age": 30,
"address": "Osaka"
},
{
"name": "Yamada",
"age": 25,
"address": "Nagoya"
},
{
"name": "Sato",
"age": 28,
"address": "Fukuoka"
}
],
(以下略)
$
はオブジェクトそのものを指すので、処理したい配列が格納されているパスは$.Payload
となります。
また、someProcessは引数として以下のオブジェクト型を要求していました。
param: {
name: string
age: number
}
そこで、Mapステートのparameters
を指定して、この型に合致する値が渡されるようにしています。$$.Map.Item.Value
にはMapステートで分割された配列の各要素、つまり
{
"name": "Tanaka",
"age": 20,
"address": "Tokyo"
}
のような値が入っています。そこで、そのうちのname
とage
を抜き出して指定しています。
Mapステートの同時実行数を5に設定しているので、配列のうち5つずつ並列に処理されることになります。
そして、Mapステートで分割された各要素に対して実行させる処理として、someProcess
を設定しています。
Step Functionsを作成する際には、まずgetList
を呼び出し、その次にMapステートが実行されるように定義しています。MapステートのイテレータとしてsomeProcess
を呼ぶように既に指定しているので、これだけの定義で大丈夫です。
動作確認
CDKをデプロイします。
cdk deploy
デプロイの途中でこのようなメッセージが表示された場合、yを入力します。
スタックのARNとTotal timeが表示されればデプロイ完了です。
AWSマネジメントコンソールでStep Functionsのページを開きます。sampleStateMachine
が作成されています。
sampleStateMachineをクリックして、ページ下部の「実行を開始」をクリックします。
このようなウインドウが表示されますが、最初に実行されるgetList
は特に入力はあってもなくても関係がないので、このままでOKです。
実行するとこのように進行状況が図で表示されます。
数秒待つと緑色に変わり、処理が成功します。
どのような処理が行われたかは「イベント」を見るとわかります。
上記のイベントのうち、MapStateEntered
イベントを開いてみると、以下のようにイベントが定義されています。getList
で取得した人物のデータが配列で渡されています。
{
"name": "mapPersons",
"input": {
"ExecutedVersion": "$LATEST",
"Payload": [
{
"name": "Tanaka",
"age": 20,
"address": "Tokyo"
},
{
"name": "Mizuno",
"age": 30,
"address": "Osaka"
},
{
"name": "Yamada",
"age": 25,
"address": "Nagoya"
},
{
"name": "Sato",
"age": 28,
"address": "Fukuoka"
}
],
(以下略)
そして、someProcessProcessState
イベントを見てみると、配列の要素の一つがインプットになっていることがわかります。
開始から終了までの時間を見てみると、6秒ほどでした。someProcess
では5秒待つ処理を入れており、順次実行されるとすると20秒かかるはずなので、並列に実行されたことがわかります。
おわりに
同じ処理を複数のデータに対して並列に実行させたいというケースはよくあると思います。そういうときにStep Functionsを使って簡単に実装できるので便利だと思いました。
この記事がどなたかの参考になれば幸いです。