ARNを入力するだけ!StepFunctionsで引数違いのLambda実行を大量にばら撒くCloudFormationテンプレート作ってみた

Lambda関数を複数実行するMapステートのステートマシンを作るCloudFormationテンプレートを作成しました。
2020.10.05

日次で出力されるファイルなどをLambdaで処理する場合、 1日分のファイルだけを対象として処理を行うLambda関数をよく書きます。 Lambdaは大量に並列起動させることができるので、小さめの関数を1つ書いて、 それを並列で動かすのが、仕組みに適した使い方です(各処理に依存関係がないことが前提)。

一方、処理対象ファイル群の出力が、過去から結構長く行われている場合、 この過去分のファイルに対しても同様の処理が必要になることが多いです。 過去分の取り込み用に複数の日付分を一気に行う処理を実装するのも手ですが、 ここは1つ、StepFunctionsで並列実行させることで過去分の取り込みをしてみたいと思います。

StepFunctionsを使う利点

並列で複数処理を行うだけであれば、 AWS CLIでinvoke-functionを繰り返しても実現できます。 しかしこのやり方は総数が多くなるといくつか問題が出てきます。

  • Lambda関数の同時呼び出し上限を気にしないといけない
    • アカウント当たりのデフォルト同時実行数は1000
  • MFAを使っている場合、それが切れたタイミングでAPIが叩けない
    • 同時実行数上限を気にしてループさせていると引っ掛かりやすい

一方StepFunctionsのMapステートを使うと、同時実行数も制御できるので安心感が増します。

CloudFormationテンプレート

ステートマシン自体は定義のJSONを1つ書いてやるだけでいいのですが、 使い勝手を良くするために、CloudFormationテンプレートを作ってみました。 以下のコードをファイル保存して、CloudFormationスタックを作成してください。

scatter_lambda.yaml

AWSTemplateFormatVersion: 2010-09-09
Parameters:
  LambdaARN:
    Type: String
  MaxConcurrency:
    Type: Number
    Default: 10
Resources:
  ScatterLambdaStepFunctionsStateMachineRole:
    Type: 'AWS::IAM::Role'
    Properties:
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service: states.ap-northeast-1.amazonaws.com
            Action: 'sts:AssumeRole'
      Policies:
        - PolicyName: ScatterLambdaStepFunctionsStateMachineRole
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - 'lambda:InvokeFunction'
                Resource:
                  - !Ref LambdaARN
  ScatterLambdaStepFunctionsStateMachine:
    Type: 'AWS::StepFunctions::StateMachine'
    Properties:
      StateMachineName: !Sub '${AWS::StackName}-ScatterLambda'
      DefinitionString: !Sub |-
        {
          "StartAt": "MapState",
          "States": {
            "MapState": {
              "Type": "Map",
              "MaxConcurrency": ${MaxConcurrency},
              "Iterator": {
                "StartAt": "ConcatenateState",
                "States": {
                  "ConcatenateState": {
                    "Type": "Task",
                    "Resource": "${LambdaARN}",
                    "End": true
                  }
                }
              },
              "End": true
            }
          }
        }
      RoleArn: !GetAtt 
        - ScatterLambdaStepFunctionsStateMachineRole
        - Arn
    DependsOn:
      - ScatterLambdaStepFunctionsStateMachineRole

余談ですが、このコードはServerlessFrameworkで作成されたスタックを元に手直しして作成しました。

使い方

初心者向けということで、CloudFormationでスタックを作る手順を見て行きます。 CloudFormationを使ったことのある方はもう特に読む必要はありません。

まずは上記のコードをそのままローカルのファイルとして適当な名前で保存します。 次にマネジメントコンソールでCloudFormationのページを開き、 「スタックの作成」→「新しいリソースを使用(標準)」を選択します

「テンプレートファイルのアップロード」→「ファイルの選択」から、 上記のコードを保存したファイルパスを指定します。

「スタックの名前」を適当に入力し、「LambdaARN」を指定します。 「MaxConcurrency」は後述しますが、わからなければデフォルトの10を指定しておけば良いと思います。

LambdaARNは、目的のLambda関数のページの右上に書かれている文字列になります。

特に変更は何も必要ありませんので、先に進みます。

「AWS CloudFormationによってIAMリソースが作成される場合があることを承認します。」にチェックをします。 今回はCloudFormationがステートマシンのIAMロールを新規作成するため、この承認が必須となります。 「スタックの作成」をクリックするとスタック作成が開始されます。

「CREATE_COMPLETE」の表示になればスタック作成完了です。

以上で準備はできたので、早速ステートマシンを実行します。

StepFunctionsの画面に行き、作成されたステートマシンを選択します。

「実行の開始」をクリックします。

「入力」の部分に、各Lambda関数に処理させたい入力の配列を書きます。 ここに配列を書くことで、そのうちの1つの要素が1つのLambda関数に渡されるという仕組みです。 下の画像の場合、1つのLambda関数のeventに送られるものは

{"date_hour": "20200508-00"}

のようなオブジェクトとなります。 「実行の開始」をクリックすると、並列分散処理が開始されます。

実行対象のLambda関数のログストリームをCloudWatch Logsから見てみると、 10個のログストリームがほぼ同時に作成されたことがわかります。

この10個という数は指定したMaxConcurrencyと等しく、 10個のプロセスが次々とLambda関数を実行していくという流れになります(10個以上のプロセスが同時に動くことはない)。

最後にちょっと先取りですが、実行が終わった後の後片付けとして、一応スタックの削除の方法も記載します。 スタックを指定して「削除」を選択し、その後の指示に従えばOKです。 ステートマシンだけでなく、そこで必要になったIAMロールなども合わせて削除されます。 もちろんLambda関数はそのまま残ります。

MaxConcurrencyについて補足

スタックを作成する際にMaxConcurrencyを指定できます。 これは同時に並行して起動させるLambda関数の数を指定します。 もちろん大きい数の方が処理の並列数が大きくなりますが、 Lambda関数は1アカウント辺り通常1000個までしか同時起動ができません。 この上限を超えてLambda関数を起動しようとすると失敗してしまうので、 もしアカウント内で他のLambda関数が動いている(動く予定になっている)ような状態で MaxConcurrency1000などと指定すると、他のLambda関数が起動できない可能性が出てしまいます。 ですので、そのアカウント内でのLambda起動のスケジュールなどを確認した上で数値を決めるようにする必要があります。

この1点さえちゃんと気をつけて指定をしてあげれば、 大量な並列処理について何も考えずに実行ができるというのがこのやり方の最大のメリットであると思います。

実際に動かす際の注意点

配列の上限

以前の記事でも書いたのですが、並列数が1000を超えた辺りから、 個々のLambda関数は特に問題がないのにステートマシンが失敗になってしまう例が見受けられました。

[Step Functions]動的並列処理(Map)を使って60分×24時間=1440ファイルのバッチ処理を楽々実装してみた

一方先日、 ペイロードサイズが増加されたという発表があった ので、この制限もだいぶ緩くなったのでは!?と期待したのですが、 今回3600並列ほどで試してみたのですが、やはりステートマシンは失敗という風にでてしまいました。 とりあえずはまだ並列数は1000程度を目安に、何回かに分けて実行した方が良さそうです。

Lambdaが失敗する場合にも対応したい

この方法では大量のLambda関数を実行していますが、この構成は全てのLambda関数が問題なく終了することを前提としています。 しかし、とりあえず大量に走らせて見て、失敗したものはあとでチェックしたいという需要もあるかと思います。 (私が今回使用したケースでもそうでした)

StepFunctionsのステートマシンは、そもそもの思想として、 全ての処理がうまくいくかどうか、ALL or NOTHINGで判断したいという場合に適したサービスなので、 上記のような要望は基本的には対象外だと思った方が良いと思います。

ただ現実にはこのようなやり方をしたいことは多いかと思いますので、 今回は「Lambda関数はそのままで、外側のステートマシンだけで大量ジョブを行う」という考えからは少し外れてしまいますが、 Lambda関数のほぼ全体をtryで囲い、失敗したらS3バケットにエラーの内容を記録するような形に修正し、 エラーを握り潰すことで全件を走らせるという方法を取りました。 例としては以下のようなスクリプトに改変しました。

sample.py

s3_client = None

def lambda_handler(event, context):
    global s3_client
    if not s3_client:
        s3_client = boto3.client('s3')
    try:
        # 何かしらの処理
    except Exception as e:
        bucket = 'output_bucket'
        key = 'errors/{}.txt'.format(event['date_hour'])
        body = str(e).encode('utf-8')
        s3_client.put_object(Bucket=bucket, Key=key, Body=body)

これで、失敗したファイルについてExceptionの内容のファイルが出力されます。

まとめ

StepFunctionsを使って、引数違いの大量のLambda関数を実行するやり方をCloudFormationテンプレートにしてみました。 汎用的に使えると思うので、このような場面に出会した場合には使ってみてください。