[Kinesis Data Streams] 後段のLambdaの実行数を予測するため、シャード数、バッチ数、同時バッチ数、処理時間の関係を確認してみました

2021.06.11

1 はじめに

CX事業本部の平内(SIN)です。

AWS IoT Coreのメッセージブローカーに到着する高負荷なメッセージを処理する場合、一旦、Amazon Kinesis Data Streams(以下、Kinesis Data Streams)で受けてから、処理する構成はよくあると思います。

この時、どのような設定を行うと、後段のAWS Lambda(以下、Lambda)が、どれぐらい実行されるのかを理解したいということで、少し試してみました。

確認に使用した構成は、以下のとおりです。

なお、全ての確認は、制約の範囲内となっています。

1つのシャードあたり

  • IN 1000rps 1M/sec
  • OUT コンシューマー×5 2M/sec

2 計算値

シャードあたりの同時バッチをデフォルトの1とした場合、1つシャードで同時実行されるLambdaは、1つです。

例えば、Lambdaの処理時間が、ちょうど1秒だった場合、1つのシャードで1秒間に実行されるLambdaは1回となり、バッチ数が100だった場合、処理数は100件/secという事になります。

もし、この状況で、秒間100件以上のデータが到着すると、ストリームに溜まってしまい、処理完了までの時間は、その分遅くなるはずです。

毎秒300件で5秒間(総数1,500件)のメッセージが到着した場合の、Lambdaの実行数と、処理されるまでの所要時間を、上の考えを元に表にしてみました。

No. シャード数 バッチ数 同時バッチ数 Lambdaの処理時間(sec) 処理件数(件/sec) Lambda実行数 所要時間(sec)
(1) 1 100 1 1 100 1 15
(2) 3 100 1 1 300 3 5
(3) 1 300 1 1 300 1 5
(4) 1 100 3 1 300 1 5
(5) 1 100 1 1/3 300 3 5

(1)は、秒間100件しか処理できないので、1,500件処理するのに15秒かかります。そして、(2)〜(5)のように、設定で、秒間の処理件数を300件となるようにすれば、1,500/300=5 となり、ほぼリアルタイムで処理できる事になるはずです。

3 MQTT送信

確認作業のための、IoT CoreへのMQTTの送信は、下記のブログで作成したものを使用しました。

少し、ウエイトを置いて、概ね1秒で300件となるように調整しています。

data_size:256
CLIENT_ID_PREFIX:AA CLIENT_MAX:10
RPS:300 PERIOD:5 [sec]
0 2021-06-10 23:58:19.924671
counter: 300 elapsed_time: 0.9065141677856445 sec
1 2021-06-10 23:58:20.924834
counter: 300 elapsed_time: 0.9061965942382812 sec
2 2021-06-10 23:58:21.925014
counter: 300 elapsed_time: 0.9064352512359619 sec
3 2021-06-10 23:58:22.925190
counter: 300 elapsed_time: 0.9060108661651611 sec
4 2021-06-10 23:58:23.925360
counter: 300 elapsed_time: 0.9060707092285156 sec
wait...
finish!

Publishされた状況を、IoT Coreのログ(AWSIotLogsV2)でみると、綺麗に毎秒300件到着していることが確認できます。

4 Lambda

Lambdaは、time.sleep() で処理の負荷を模擬しています。また、集計のために処理したRecordの数をログに出力しています。

import time

def lambda_handler(event, context):

    time.sleep(1.0) # 処理時間

    print({
        "name": "load_test",
        "records_len": len(event["Records"])
    })

5 実測

以下は、先の計算値の表の状況を実際に測定してみたものです。

(1) 遅延

Lambdaの実行回数は、毎秒1回で、100件ずつ処理され、計算通り、完了まで約15秒かかっています。

  • レコード処理数
fields @timestamp, @message
| sort @timestamp desc
| filter name = "load_test"
| stats sum(records_len) by bin(1s)

  • Lambda実行回数
fields @timestamp, @message
| sort @timestamp desc
| filter name = "load_test"
| stats count(*) by bin(1s)

(2) シャード数

Kinesis Data Streamsシャード数1から3に増やして、確認してみました。

  • レコード処理数

概ね、毎秒300件ずつ処理され、ほぼリアルタイムに処理されていることが確認できます。

  • Lambda実行回数

Lambdaの実行回数は、秒間、最大3です。

(3) バッチサイズ

Lambdaのトリガー設定で、バッチサイズ100から300に変更して、確認してみました。

  • レコード処理数

この場合も、概ね、毎秒300件ずつ処理され、ほぼリアルタイムに処理されていることが確認できます。

  • Lambda実行回数

Lambdaの実行回数は、毎秒1回です。

(4) 同時バッチ数

Lambdaのトリガー設定(追加設定)で、同時バッチ数1から3に変更して、確認してみました。

  • レコード処理数

概ね、毎秒300件ずつ処理され、ほぼリアルタイムに処理されていることが確認できます。

  • Lambda実行回数

Lambdaの実行回数は、毎秒、最大3回です。

(5) 処理時間

続いて、Lambdaの処理時間を1/3に変更して確認してみました。 この確認だけ、他のものと違って、少し当初の計算値通りとはなりませんでした。

import time

def lambda_handler(event, context):

    #time.sleep(1.0) # 処理時間
    time.sleep(0.3) # 処理時間

    print({
        "name": "load_test",
        "records_len": len(event["Records"])
    })
  • レコード処理数

こちらは、立ち上がり処理件数が300まで届かず、リアルタイム生が少し損なわれています。

  • Lambda実行回数

Lambdaの実行回数は、毎秒、最大3回ですが、2回となっているところもあります。

6 最後に

今回は、シャード数や、トリガーの設定を変えて、Lambda実行数や、処理時間がどのように変化するかを確認してみましたが、ほとんどが、計算してみた数値に近い値となていることが確認できました。

最後のLambdaの実行時間による処理数の変化は、関数内のwaitだけで、Kinesis Data Streamsから見た処理時間と一致しないことが原因なのではないかと考えています。しかし、数秒後には、計算値どおり300/secとなっているので、一応、「計算通りだった」という事にしたいと思っています。

「同時バッチ数」は、1〜10までしか、設定できませんが、後段の実行数を上げる効果としては、シャード数を「指定倍数」に増やすことと同じなので、目的がそこにあるのであれば、まずは、「同時バッチ数」を上げる方が、コストの面からも良いのでは、と思いました。