【AWS Lambda】処理対象ファイルが大きい時に、メモリの大きな関数に代替処理をさせる際のTips

こんにちは、平野です。

毎日のログなど、定期的に出力されるファイルに処理をする場合、 基本的には同じくらいのサイズのファイルが対象になるかと思います。 これをLambdaで処理する場合、同じようなファイルサイズなので、 メモリサイズもそれに見合った量を設定することで、無駄な課金を防ぐことができます。

しかし何か特別なイベントがあり、通常よりも大きなサイズのファイルが出力されるような場合、 ギリギリに設定したメモリ量では処理が失敗してしまうことになります。

これを避けるために、今回はLambda内部で対象ファイルサイズを確認して、 一定以上だったら大きめのメモリを確保しておいたLambda関数で実行するような仕掛けを作ってみました。 特別難しいことはなく、そのままコーディングすれば良いのですが、 いくつか覚えておいた方が良さそうなことがあったのでブログにします。

想定しているケース

今回対処しようとしたケースでは、大きな容量のファイルが出力されることは滅多になく、 容量が大きな場合は通常の10倍程度の量まで想定されるようなケースでした。

もし容量の振れ幅が大きくない場合は、デフォルトのメモリ量を増やしてしまった方が、 結局管理面でアドバンテージがあるかもしれません。 また、メモリ量を増やすとそれに対応してCPU性能も上がります1ので、 一度メモリを上げてみて、処理時間が短縮されて料金的に相殺されるケースでないか確認してみることをオススメします。

アプリケーションとしては、pythonで動くコードをServerlessFrameworkを使ってデプロイしています。 同じ処理をするコードであれば、一つにまとめておきたかったので、 lambda_handler自体は一つで、それをメモリサイズだけ異なる2パターンの関数としてデプロイするようにしました。

なお、この記事では解説は割愛させて頂きますが、 呼び出す側のLambda関数には、他のLambda関数や、SNSトピックへアクセスできる ロールが付与されていることを確認してください。

コード

Serverless.yml

Serverless.ymlの内容としては以下のようになります。

functions:
  SampleApp:
    handler: sample_app.lambda_handler
    memorySize: 256
    timeout: 60
    reservedConcurrency: 50
    events:
      - (何かしらの呼び出しイベント)
  SampleApp_forLargeFile:
    handler: sample_app.lambda_handler
    memorySize: 1024
    timeout: 60
    reservedConcurrency: 50

ハンドラとなるpythonコードsample_app.pyは1つで、メモリサイズだけを変更して2つの関数を配置します。

sample_app.py

FILE_SIZE_THRESH = int(os.environ.get("FILE_SIZE_THRESH", "1000000"))

def lambda_handler(event, context):
    if context.function_name.endswith("_forLargeFile"):
        print("メモリサイズ{}MBの関数で実行します".format(context.memory_limit_in_mb))
    else:
        # ファイルサイズのチェック
        bucket = get_bucket(event)
        key = get_key(event)
        size = check_object_size(bucket, key)
        if size >= FILE_SIZE_THRESH:
            print("容量の大きなファイルを検知しました: {}Bytes".format(size))
            import boto3
            lambda_client = boto3.client("lambda")
            lambda_client.invoke(
                FunctionName=context.function_name + "_forLargeFile",
                InvocationType="Event",
                Payload=json.dumps(event))
            return

    # 以降に本処理を書く

def get_bucket(event):
    # 省略
def get_key(event):
    # 省略
def check_object_size(bucket, key):
    s3 = boto3.client('s3')
    return s3.head_object(Bucket=bucket, Key=key)['ContentLength']

lambda_handlerの直後で、関数名によって場合分けをしています。 この例では_forLargeFileと言うsuffixがついていたら大きいファイル向けの関数である、 というコードになっています。

ファイルサイズをチェックして、閾値を超えていたら別のLambda関数を呼びます。 基本的にはlambda_clientを作成してinvokeメソッドを呼ぶだけですが、 注意点としては、関数の呼び出しタイプとして

InvocationType="Event"

と指定することです。 Eventを指定すると非同期での呼び出しになりますが、 デフォルトでは同期呼び出しになってしまうため、呼び出し先の関数が終わるまで待機してしまいます。 同期呼び出しでは呼び出し元のLambda関数がずっと課金時間になってしまいますし、 呼び出し元の関数側でタイムアウトが発生したら、呼び出し先関数も一緒に終了してしまいます。 ということで、必ず呼び出しタイプはEventを指定しましょう。

また、eventの渡し方としては

Payload=json.dumps(event))

のようにします。 こうすれば、eventをそのまま渡すことができます。

VPCLambdaの場合

VPC内ではないLambda関数の場合は上記の方法で良いのですが、VPC内Lambda関数の場合には一工夫が必要です。

(NAT Gatewayの設置されていない)VPC内で実行されるLambda関数の場合、Lambda関数を呼び出すAPIを実行することができません。 呼び出す側、呼び出される側共に同じVPC内に存在していたとしても、 一度インターネットを経由しなければならないためです。

VPCからインターネットへ出るのであればNAT Gatewayが思い浮かびます2が、 今回はVPCエンドポイントが設定されたSNSが既にあったので、SNSを経由してLambda関数を呼び出す方法を取りました。 NAT Gatewayが既に設置されている場合はそれを利用した方が無駄なお金はかからずに済みます。

SNSのVPCエンドポイントを作成する方法は下記のブログを参照してみて下さい。

【アップデート】AWS PrivateLink 対応で SNS がよりセキュアに使えるようになりました!

Serverless.yml

SNSを経由する場合のServerless.ymlの例としては以下のようになります。 eventsに新規に作るsnsの名前を渡せば新しいリソースを作成してLambda関数との紐付けまでやってくれますので、 その結果作成されるARNを環境変数として定義しておいて(名前がわかれば予めARNまで決められる)、 Lambda関数からはそのARNにpublishを行うという流れになります。

provider:
  environment:
    VPC_LAMBDA_INVOKE_TOPIC_ARN: "arn:aws:sns:${self:provider.region}:#{AWS::AccountId}:vpc_lambda_invoke_topic_name"

functions:
  SampleApp:
    handler: sample_app.lambda_handler
    memorySize: 256
    timeout: 60
    reservedConcurrency: 50
    events:
      - (何かしらの呼び出しイベント)
  SampleApp_forLargeFile:
    handler: sample_app.lambda_handler
    memorySize: 1024
    timeout: 60
    reservedConcurrency: 50
    events:
      - sns: vpc_lambda_invoke_topic_name

vpc_lambda_invoke_topic_nameがSNSトピック名なので、適宜変更してください。

sample_app.py

FILE_SIZE_THRESH = int(os.environ.get("FILE_SIZE_THRESH", "1000000"))

def lambda_handler(event, context):
    if context.function_name.endswith("_forLargeFile"):
        event = json.loads(event['Records'][0]['Sns']['Message'])
        print("メモリサイズ{}MBの関数で実行します".format(context.memory_limit_in_mb))
    else:
        bucket = get_bucket(event)
        key = get_key(event)
        size = check_object_size(bucket, key)
        if size >= FILE_SIZE_THRESH:
            print("容量の大きなファイルを検知しました: {}Bytes".format(size))
            import boto3
            sns_client = boto3.client("sns")
            sns_client.publish(
                TopicArn=os.environ.get("VPC_LAMBDA_INVOKE_TOPIC_ARN"),
                Subject="Subject",
                Message=json.dumps(event))
            return

lambda_client.invokeから、sns_client.publishに変更し、 環境変数のトピックARNを指定しています。 Subjectはなんでも良い(カラ文字列はダメ)ので、適当な文字列を入れます。 そして、Messageに元のeventを渡します。

呼び出される側としては、

event = json.loads(event['Records'][0]['Sns']['Message'])

のように、SNSで送られてくる形式から、オリジナルのeventの情報を取り出す必要があります。

SQSの場合はタイムアウト時間に注意

この仕組みをSQSから発火するLambda関数に使用する場合は、 呼び出される側のタイムアウト時間に気をつける必要があります。

SQSの可視性タイムアウト時間は、それと紐づいたLambda関数のタイムアウト時間よりも長く設定する必要があります。 マネジメントコンソールから設定する際には、この関係性がチェックされます。 この条件を満たさない場合、処理が走っている間に別のLambda関数が処理を開始してしまう可能性があるからです。

しかし、呼び出される側のLambda関数はSQSと直接紐づいている訳ではないため、このチェックが働きません。 なので、呼び出される側のLambdaのタイムアウト時間も、SQSの可視性タイムアウトをオーバーしないように気をつけてください。 もしも容量の大きなファイルには処理時間がかかるというような場合には、 DynamoDBを使って処理中のファイルを記録しておくなどの処置が必要になるかと思います。

そのほかのやり方

ブログを公開したところ、社内で、他の方法も教えてもらったので簡単に紹介します

  • データ起因でLambda関数がエラーになるなら、StepFunctionsの利用も検討する
  • 対象ファイル容量のチェックはVPC外で行い、SNS経由でVPC内Lambdaを起動する
    • これならNAT GatewayもVPCエンドポイントも不要

まとめ

メモリ設定量の異なるだけのLambda関数を用意して、 処理対象のファイルのサイズによって大きなメモリの関数に処理を委譲する時のポイントをいくつか紹介しました。

なお、このような仕組みを作る場合には、 Lambda関数起動の無限ループに陥らないか、よく確認するようにしてください!! 自分とほぼ同じ関数を呼ぶため、少しの間違いでも、自分自身を呼ぶ無限ループに入ってしまう可能性があります。 (私は、もちろん、やりましたとも!) そうするとLambda関数が実行されまくるので、お金もかかりますし、 そのせいでLambda関数の同時起動数制限に引っかかって他の関数に影響が出てしまったりするかと思います。 (reservedConcurrencyを設定しているのはそのためです)

おおよそ一定のサイズのファイルを処理するけど、時々大きいのが来る、というケースがどれほど汎用性があるのかはわかりませんが、 単純にメモリ設定を大きくしてしまうとお金がもったいないですので、こんな方法もあるよ、という話でした。

誰かの参考になれば幸いです。


  1. https://docs.aws.amazon.com/ja_jp/lambda/latest/dg/resource-model.html 
  2. 社内でツッコミが入った後にこう書いています。私自身はまだ「○○と言ったら××」というストックが少ないです。