[Step Functions]動的並列処理(Map)を使って60分×24時間=1440ファイルのバッチ処理を楽々実装してみた

2019.10.07

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、平野です。

下記のブログで紹介されているように、 Step Functionsにて、配列の入力を個別のLambda等にバラまいて処理させるMapステートがサポートされました!!

[アップデート]Step Functionsで動的並列処理がサポートされました!

担当していた案件で、S3上にある直近24時間分ファイル群(各ファイル名に秒までの時刻が入っている)を、 1分毎にまとめて別のバケットに移すような処理があり、 これはまさにMapステートに最適な素材でしたので、Mapステートを使ったリファクタリングをしてみました!

この記事では、ServerlessFrameworkのStep Functionsプラグインを用いています。 (対応早くて助かる!!)

検証バージョン

ServerlessFramework: 1.40.0
Step Functionsプラグイン: 2.8.0

Mapステートの簡単なおさらい

Mapステートはちょっと使うだけなら仕組みはとっても簡単なので、 まずはビジュアルワークフローで挙動を確認しました。

MakeArrayState[1, 2, 3, 4, 5]という配列を出力して CalcSquareState(Mapステート)に配列が入力されて その中にある個々のCalcSquareに配列の個々の値が入力され、2乗した値を出力 配列内の各々の値が2乗された配列が、CalcSquareStateの出力になっています。

ちなみに書くまでもないくらいですが、それぞれLambda関数は至極簡単です。

make_array.py

def lambda_handler(event, context):
    return [1, 2, 3, 4, 5]

calc_square.py

def lambda_handler(event, context):
    return event*event

つまり、Pythonで言えば次のようなmap関数などと同じ感じで実装が可能になります。

array = [1, 2, 3, 4, 5]
squared = list(map(lambda x: x*x, array))
# ==> [1, 4, 9, 16, 25]

serverless.yml

上記を行うためのserverless.ymlとPythonスクリプトの例です。

service: hirano-stepfunctions-map-test-01 # NOTE: update this with your service name

provider:
  name: aws
  runtime: python3.7
  stage: ${env:STAGE, 'dev'}
  region: ap-northeast-1

functions:
  MakeArrayLambda:
    handler: handler/make_array.lambda_handler
    timeout: 30
  CalcSquareLambda:
    handler: handler/calc_square.lambda_handler
    timeout: 30

stepFunctions:
  stateMachines:
    StateMachine01:
      definition:
        StartAt: MakeArrayState
        States:
          MakeArrayState:
            Type: Task
            Resource:
              Fn::GetAtt: [MakeArrayLambda, Arn]
            Next: CalcSquareState
          CalcSquareState:
            Type: Map
            Iterator:
              StartAt: CalcSquare
              States:
                CalcSquare:
                  Type: Task
                  Resource:
                    Fn::GetAtt: [CalcSquareLambda, Arn]
                  End: true
            End: true

plugins:
  - serverless-step-functions

package:
  exclude:
    - .git/**
    - node_modules
    - node_modules/**
    - __pycache__

大量のファイルに同一処理させる例

本題。
毎分ごとのファイルに統合するので、60分 * 24時間 = 1440個のファイルを処理する仕組みを作りました。 CloudWatch Eventsからスケジュール起動され、その日付に対応した(前日の)24時間を対象とするような想定です。

挙動の確認のところで、Mapステートの(一番安直な)使い方はわかっているので、 特に解説するところもなく、それにしたがって実装するだけです。

serverless.yml

並列数が1440であり、全く別の処理のためのLambda関数も動いているので、 同時実行数(MaxConcurrency)は100に制限しています。

service: cm-hirano-stepfunctions-map-test # NOTE: update this with your service name

provider:
  name: aws
  runtime: python3.7
  stage: ${env:STAGE, 'dev'}
  region: ap-northeast-1
  memorySize: 256
  iamRoleStatements:
    - Effect: "Allow"
      Action:
        - "s3:*"
      Resource:
        - "*"
  environment:
    STAGE: ${self:provider.stage}
  tracing:
    lambda: true

functions:
  MakeEveryMinute:
    handler: handler/make_every_minute.lambda_handler
    timeout: 30
  Concatenate:
    handler: handler/concatenate.lambda_handler
    timeout: 30

stepFunctions:
  stateMachines:
    StateMachine01:
      definition:
        StartAt: MakeEveryMinuteState
        States:
          MakeEveryMinuteState:
            Type: Task
            Resource:
              Fn::GetAtt: [MakeEveryMinuteLambdaFunction, Arn]
            Next: ConcatenateMapState
          ConcatenateMapState:
            Type: Map
            MaxConcurrency: 100
            Iterator:
              StartAt: ConcatenateState
              States:
                ConcatenateState:
                  Type: Task
                  Resource:
                    Fn::GetAtt: [ConcatenateLambdaFunction, Arn]
                  End: true
            End: true

plugins:
  - serverless-step-functions

package:
  exclude:
    - .git/**
    - node_modules
    - node_modules/**
    - __pycache__

ビジュアルワークフローとしては以下のような感じです。

なお、Fn::GetAtt: [MakeEveryMinuteLambdaFunction, Arn]という表記ですが、 普通はFn::GetAtt: [MakeEveryMinute, Arn]でも可能なはずですが、うまくデプロイできませんでした。 Step Functionsプラグインのマニュアルに下記の記載があったのでそれに従ったところ、 問題なくデプロイができました。

You can also express the above Fn::GetAtt function as Fn::GetAtt: [HelloLambdaFunction, Arn]

make_every_minute.py

def lambda_handler(event, context):
    date = get_date(event['time'])
    minutes = []
    for hour in range(0, 24):
        for minute in range(0, 60):
            minutes.append("{}-{}".format(str(hour).zfill(2), str(minute).zfill(2)))
    minutes = map(lambda x: "{}-{}".format(date, x), minutes)
    return minutes

def get_date(time):
    # 具体的な実装は省略(実際はここがかなり案件依存なので)
    return "2019-09-30"

concatenate.py

import os
import boto3

SRC_BUCKET = os.environ.get("SOURCE_BUCKET")

s3_client = None

def lambda_handler(event, context):
    global s3_client
    if not s3_client:
        s3_client = boto3.client('s3')
    # event = "2019-09-30-00-00"
    prefix = event
    print("PREFIX: {}".format(prefix))
    obj_list = s3_client.list_objects_v2(Bucket=SRC_BUCKET, Prefix=prefix)
    if 'Contents' not in obj_list:
        raise ValueError("Could not found any files.")
    for obj in obj_list['Contents']:
        key = obj['Key']
        # 結合して云々な処理
        concatnate_process(key)

実行

CloudWatch Eventsからのスケジュール実行を想定しているので、 入力jsonは最低限以下のような形が入っていればOKです。

{ "time": "2019-03-01T01:23:45Z" }

実際に実行してみたところ、約1時間ほどで処理が完了しました (実際の結合処理の内容などを何も示していないので、絶対値には意味がありませんが)。 1440個という、1アカウント内のLambda関数の同時実行上限である1000を超える数のLambda関数を起動させましたが、 MaxConcurrencyを指定した以外は特に並列起動のことを考えている箇所はありません。

つまり、「引数を受け取った後の処理内容だけ実装すれば良い」 という理想的な実装スタイルで処理を書くことができています! 規模が大きくなると、こういう単純さは非常に重要です。

Mapステートへ渡すデータについての注意点

上記の例では配列に2019-09-30-00-00のような文字列を渡しています。 Step FunctionsのParameters機能を使えば

{'date': '2019-09-30', 'minute': '00-00'}

のようにして、minuteだけを変動させることも可能なのですが、 どうやらMapステートとParametersの相性は良くないようです。 上記のようにしたところ、並列数が少ないうちは問題ないのですが、 並列数が大きくなった場合に処理に失敗してしまいました。

原因はまだはっきりしないのですが、 ひとまず単純な配列を受け渡しに使うと正常に動作するようなので、 Parametersを使ってうまくいかない場合には、 必要なデータを全て入れた配列を渡してあげるのが一番良さそうです。

ただ、このように配列の中身を大きくすると、 今度は、 受け渡しデータのサイズ上限 に引っかかってしまうという問題点もあります。 ですので、大きすぎるデータの場合にはまだ別のアーキテクチャを考える必要がありそうです。

この辺についてはキムさんのブログが非常に参考になりましたので、 そちらも是非ご参照ください。

【サーバーレス】18,570本のブログの形態素分析をやってみた

まとめ

S3上の大量ファイルに対して、ある程度の個数でグルーピングするような処理を Step FunctionsのMapステートを使って実装してみました。 Mapステートの使い方は非常に直感的なので、 すでにStep Functionsを使っている人であればほぼ瞬時に使いこなせるのではないでしょうか?

1000を超えるような処理だとまだ課題もありそうなMapステートですが、 そこまで大きくない規模のファイル群に対しては、 直感的に、複雑なことを考えずにLambda処理を並列化させられるので非常にオススメです。 異なる引数についてLambda関数をばら撒きたいという場合にはまず最初に検討すべきパターンになるかと思います。

以上、誰かの参考になれば幸いです。

参照リンク

Amazon Web Services ブログ 新機能 – Step Functions が動的並列処理をサポート AWS Step Functions - Map