cdkを使ったStepFunctionsでのループ込実装手続きを振り返りつつ書いてみた
はじめに
業務効率向上を目的にLambda関数を実装しましたが制限時間内に全然収まらず、代替案としてプロセス毎にLambda関数として切り分けた上でStepFunctionsにて繋いでみました。
StepFunctionsを使った実装例はDevIOにも記事として公開されていますが、ループとcdkを用いた例については見当たらず、どう取り掛かったものか悩みどころでした。出来上がって実際に現在も動作しているケースを元に、ループと例外を盛り込みつつ一つ一つやってみたことを書いてみました。
プロセスの整理
大まかな段階毎に切り分けていくのは勿論ながら、それ以外にも検討するポイントがあります。
繰り返し処理の長時間化
繰り返す処理の時間長期化が想定できる場合、繰り返し回数が多いと制限に掛かりやすくなります。繰り返し判定をStepFunctionsに任せて、Lambda関数は処理のみの実施を行う形で検討します。
例外の発生
途中で例外が発生した場合にそのまま通して実施するか途中で止めるかの二択になります。途中で止める場合、各関数個別で対処するのは処理の重複が発生しやすいため、例外処理専用のプロセスの作成も検討します。
既存リソースへの依存
依存が不可避の場合は、該当のリソース名やリソースARN等、cdkがリソースを取得する際に必要なパラメータを事前に把握しておきます。今回は詳細には触れていません。
ロジックの組み立て
まずはcdkで以下の順に組み立てていきます。
- Lambda関数定義
- コードの空実装
- Task定義
- Task Chain定義
- コード実装
Lambda関数定義
import iam = require('@aws-cdk/aws-iam'); import lambda = require('@aws-cdk/aws-lambda'); const executionLambdaRole = new iam.Role(this, `LambdaRole`, { assumedBy: new iam.ServicePrincipal('lambda.amazonaws.com'), managedPolicies: [ iam.ManagedPolicy.fromAwsManagedPolicyName('service-role/AWSLambdaBasicExecutionRole') ] }); const function_1 = new lambda.Function(this, `function_1`, { runtime: lambda.Runtime.PYTHON_3_8, code: lambda.AssetCode.fromAsset('/path/to/asset'), handler: 'function_1.lambda_handler', timeout: cdk.Duration.seconds(900), role: executionLambdaRole, });
処理の段階毎に作成します。途中で中断されることを防ぐため、timeoutには最大時間を指定しました。
ロール指定の省略時にはcdkが必要なロールを自動作成しますが、複数のLambda関数を用いる場合、指定しなければ関数の数だけロールが作成されます。ロールの共有が問題なければ、事前に必要な分だけ作成しておきましょう。
Lambda関数分だけ夫々行いますが、必要に応じて共通の例外用Lambda関数定義も行っておきます。(例外時用の専用定義はなく、const errorCatch
等にするくらいです。)
コード実装ファイルの作成
handlerに指定した数だけ実装ファイルを用意します。今回の場合は/path/to/asset/function_1.py
になります。実装内容については、現時点では各Lambda関数共に「Hello, World」的な内容にしてしまっても問題ありません。
Task定義
import sfn = require('@aws-cdk/aws-stepfunctions'); const createDB = new sfn.Task(this, 'Task_1', { task: new class implements sfn.IStepFunctionsTask { bind(): sfn.StepFunctionsTaskConfig { return { resourceArn: function_1.functionArn, parameters: new Map<string, any>() .set("params", '$.params'), } } }, resultPath: '$', });
定義したLambda関数のarnをresourceArn
指定に用いて、StepFunctionsのタスク定義を行います。これも定義したLambda関数分だけ実施します。
Task Chain定義
なれないうちは手の付け所に悩みがちです。とりあえず、判っている範囲から設定していきます。
共通の例外Taskが存在する場合は、例外を優先して設定してしまいましょう。例外用タスクcatchError
を作成済みとします。
function_1.addCatch(catchError) function_2.addCatch(catchError) .. function_x.addCatch(catchError)
これだけで完了です。
次にChainの開始部分を作成します。
sfn.Chain.start(function_1).next(function_2)
必要な数だけnext
で繋いでいきます。次に問題のループです。
import sfn = require('@aws-cdk/aws-stepfunctions'); const createTableCondition = sfn.Condition.numberGreaterThan('$.params.remain_table_count', -1); ...next(function_2).next(loop_task). .when(createTableCondition, loop_task) .otherwise(breaked_loop).afterwards().next(after_loop)
先ずは、ループ条件の設定を行います。Condition
からループ条件を指定します。今回は該当するパラメータが閾値より大きいかどうかという判定にしています。
次にループ対象の処理につなげます。.next
で繋ぎ、次に.when
でループさせる条件と条件一致時の処理を入れます。その後、.otherwise
で条件に一致しない場合、つまりループを抜けたときの処理を指定します。
ループ後に再度共通してループを行いたい場合は、.otherwise
の後の.next
にて再度.when
を開始します。共通のルートが必要であれば.when
と.otherwise
でつなげた後に.afterwards
を追加し、必要に応じて.next
でつなげていきます。
...next(function_2).next(loop_task). .when(createTableCondition, loop_task) .otherwise(breaked_loop .next(again_loop .when(nextLoopCondition, again_loop) .otherwise(again_breaked_loop).afterwards()))
.afterwards
でのルートを各ループ毎に用意したケースは、下記のように可視性がかなりひどくなるため、あまりおすすめはしません。
.start(createDB) .next(createTable) .next(choiseContinueCreateTable .when(createTableCondition, createTable) .otherwise(gatherAccountIDFromCUR .next(choiseGatherAccountID .when(gatherAccountIDCondition, gatherAccountIDFromCUR) .otherwise(matchingAccountID) .afterwards())) .afterwards())
このフローは以下のような構成になります。
コード実装
最後に、HelloWorldで終わらせていたコードを実装します。気をつけるべき点は、ハンドラに渡されるevent
と関数の返す値の2つです。
Lmbda関数に渡す値については定義したTaskのパラメータのうち、key(params)とpath($.params)を一致させておくことで明確になります。
parameters: new Map<string, any>() .set("params", '$.params'),
def lambda_handler(event, context): params = event['params']
Lambda関数の返す値を全て参照するのであれば、TaskのresultPath
には'$'
だけ指定しておきます。Lambda関数から返す値のkeyにparam
と入れておくと、上記keyとpath指定がそのまま使えるようになります。
return { 'params': params }
あとがき
ループ処理が正常にループするまでotherwiseとafterwardsの使い方に苦労しましたが、一度判ってしまえばChainの繋ぎ方については苦労しない感じです。
cdkを使ったStepFunctionの実装は、jsonでの指定を極力さけつつ細かい実装をcdkに委譲できるため、適切なフローの組み立てに集中できることが大きなメリットです。最初は戸惑うと思いますが、興味のある方は一度おすすめします。