cdkを使ったStepFunctionsでのループ込実装手続きを振り返りつつ書いてみた

StepFunctionsでのループ処理をcdkを使って書いてみました。具体的な実装例が余り見当たらず、試行錯誤の結果に色々と酷い状態になりましたが、それらを踏まえた上でキレイな実装になるような記事にしたつもりです。
2020.05.31

はじめに

業務効率向上を目的にLambda関数を実装しましたが制限時間内に全然収まらず、代替案としてプロセス毎にLambda関数として切り分けた上でStepFunctionsにて繋いでみました。

StepFunctionsを使った実装例はDevIOにも記事として公開されていますが、ループとcdkを用いた例については見当たらず、どう取り掛かったものか悩みどころでした。出来上がって実際に現在も動作しているケースを元に、ループと例外を盛り込みつつ一つ一つやってみたことを書いてみました。

プロセスの整理

大まかな段階毎に切り分けていくのは勿論ながら、それ以外にも検討するポイントがあります。

繰り返し処理の長時間化

繰り返す処理の時間長期化が想定できる場合、繰り返し回数が多いと制限に掛かりやすくなります。繰り返し判定をStepFunctionsに任せて、Lambda関数は処理のみの実施を行う形で検討します。

例外の発生

途中で例外が発生した場合にそのまま通して実施するか途中で止めるかの二択になります。途中で止める場合、各関数個別で対処するのは処理の重複が発生しやすいため、例外処理専用のプロセスの作成も検討します。

既存リソースへの依存

依存が不可避の場合は、該当のリソース名やリソースARN等、cdkがリソースを取得する際に必要なパラメータを事前に把握しておきます。今回は詳細には触れていません。

ロジックの組み立て

まずはcdkで以下の順に組み立てていきます。

  1. Lambda関数定義
  2. コードの空実装
  3. Task定義
  4. Task Chain定義
  5. コード実装

Lambda関数定義

import iam = require('@aws-cdk/aws-iam');
    import lambda = require('@aws-cdk/aws-lambda');
    
    const executionLambdaRole = new iam.Role(this, `LambdaRole`, {
      assumedBy: new iam.ServicePrincipal('lambda.amazonaws.com'),
      managedPolicies: [
        iam.ManagedPolicy.fromAwsManagedPolicyName('service-role/AWSLambdaBasicExecutionRole')
      ]
    });
 
    const function_1 = new lambda.Function(this, `function_1`, {
      runtime: lambda.Runtime.PYTHON_3_8,
      code: lambda.AssetCode.fromAsset('/path/to/asset'),
      handler: 'function_1.lambda_handler',
      timeout: cdk.Duration.seconds(900),
      role: executionLambdaRole,
    });

処理の段階毎に作成します。途中で中断されることを防ぐため、timeoutには最大時間を指定しました。

ロール指定の省略時にはcdkが必要なロールを自動作成しますが、複数のLambda関数を用いる場合、指定しなければ関数の数だけロールが作成されます。ロールの共有が問題なければ、事前に必要な分だけ作成しておきましょう。

Lambda関数分だけ夫々行いますが、必要に応じて共通の例外用Lambda関数定義も行っておきます。(例外時用の専用定義はなく、const errorCatch等にするくらいです。)

コード実装ファイルの作成

handlerに指定した数だけ実装ファイルを用意します。今回の場合は/path/to/asset/function_1.pyになります。実装内容については、現時点では各Lambda関数共に「Hello, World」的な内容にしてしまっても問題ありません。

Task定義

import sfn = require('@aws-cdk/aws-stepfunctions');
    
    const createDB = new sfn.Task(this, 'Task_1', {
      task: new class implements sfn.IStepFunctionsTask {
        bind(): sfn.StepFunctionsTaskConfig {
          return {
            resourceArn: function_1.functionArn,
            parameters: new Map<string, any>()
              .set("params", '$.params'),
          } 
        }
      },
      resultPath: '$',
    });

定義したLambda関数のarnをresourceArn指定に用いて、StepFunctionsのタスク定義を行います。これも定義したLambda関数分だけ実施します。

Task Chain定義

なれないうちは手の付け所に悩みがちです。とりあえず、判っている範囲から設定していきます。

共通の例外Taskが存在する場合は、例外を優先して設定してしまいましょう。例外用タスクcatchErrorを作成済みとします。

function_1.addCatch(catchError)
    function_2.addCatch(catchError)
    ..
    function_x.addCatch(catchError)

これだけで完了です。

次にChainの開始部分を作成します。

sfn.Chain.start(function_1).next(function_2)

必要な数だけnextで繋いでいきます。次に問題のループです。

import sfn = require('@aws-cdk/aws-stepfunctions');

 const createTableCondition = sfn.Condition.numberGreaterThan('$.params.remain_table_count', -1);
 ...next(function_2).next(loop_task).
        .when(createTableCondition, loop_task)
          .otherwise(breaked_loop).afterwards().next(after_loop)

先ずは、ループ条件の設定を行います。Conditionからループ条件を指定します。今回は該当するパラメータが閾値より大きいかどうかという判定にしています。

次にループ対象の処理につなげます。.nextで繋ぎ、次に.whenでループさせる条件と条件一致時の処理を入れます。その後、.otherwiseで条件に一致しない場合、つまりループを抜けたときの処理を指定します。

ループ後に再度共通してループを行いたい場合は、.otherwiseの後の.nextにて再度.whenを開始します。共通のルートが必要であれば.when.otherwiseでつなげた後に.afterwardsを追加し、必要に応じて.nextでつなげていきます。

...next(function_2).next(loop_task).
        .when(createTableCondition, loop_task)
          .otherwise(breaked_loop
            .next(again_loop
              .when(nextLoopCondition, again_loop)
                 .otherwise(again_breaked_loop).afterwards()))

.afterwardsでのルートを各ループ毎に用意したケースは、下記のように可視性がかなりひどくなるため、あまりおすすめはしません。

.start(createDB)
      .next(createTable)
      .next(choiseContinueCreateTable
        .when(createTableCondition, createTable)
          .otherwise(gatherAccountIDFromCUR
            .next(choiseGatherAccountID
              .when(gatherAccountIDCondition, gatherAccountIDFromCUR)
                .otherwise(matchingAccountID)
                .afterwards()))
            .afterwards())

このフローは以下のような構成になります。

コード実装

最後に、HelloWorldで終わらせていたコードを実装します。気をつけるべき点は、ハンドラに渡されるeventと関数の返す値の2つです。

Lmbda関数に渡す値については定義したTaskのパラメータのうち、key(params)とpath($.params)を一致させておくことで明確になります。

parameters: new Map<string, any>()
              .set("params", '$.params'),
def lambda_handler(event, context):
    params = event['params']

Lambda関数の返す値を全て参照するのであれば、TaskのresultPathには'$'だけ指定しておきます。Lambda関数から返す値のkeyにparamと入れておくと、上記keyとpath指定がそのまま使えるようになります。

return {
        'params': params
    }

あとがき

ループ処理が正常にループするまでotherwiseとafterwardsの使い方に苦労しましたが、一度判ってしまえばChainの繋ぎ方については苦労しない感じです。

cdkを使ったStepFunctionの実装は、jsonでの指定を極力さけつつ細かい実装をcdkに委譲できるため、適切なフローの組み立てに集中できることが大きなメリットです。最初は戸惑うと思いますが、興味のある方は一度おすすめします。