[AWS IoT] 軽易にMQTTメッセージをpublishするテスト用クライアントをLambdaで作成してみました

2021.06.19

1 はじめに

CX事業本部の平内(SIN)です。

以前、比較的高いトラフィックで連続したしたMQTTメッセージを送信するために、テスト用のクライアントを作ってみました。

上記は、EC2上で実行するpythonプログラムとして作成しましたが、今回は、少しのデータでいいから軽易に利用できるものという事で、同じ仕組みをLambdaでやってみました。

boto3でやってますので、publishは、HTTPとなっています。
IoTDataPlane / publish(**kwargs)

IoT Coreのログで確認する分かりますが、ClientIdはありませんので、ClientIdでフィルタするような使い方はできません。

2 コード

Lambdaのコードです。

1秒単位で起動されるスレッド(worker)の中で、指定された回数のpublishを行います。

以下では、「毎秒80件」、「30秒間継続」、「Payloadのサイズは、約256byte」の設定となっています。

import json
import boto3
import time 
import datetime
import threading

topic = "load_test" # message broker
#topic = '$aws/rules/load_test/topic/load_test' # basic ingest

RPS = 80 # 秒間のpublish数
PERIOD = 30 # 送信時間[sec]
DATA_SIZE = 256 # byte

# payloadに乗せるデータの生成
def create_data(data_size):
    print("data_size:{}".format(data_size))
    data = ''
    for i in range(data_size):
        data += 'X'
    return data

def worker(iot, rps, data):
    start = time.time()

    counter = 0
    for i in  range(rps):
        payload = {
            "producer_timestamp": datetime.datetime.now().isoformat(sep=' ', timespec='milliseconds'),
            "counter": counter,
            "data": data
        }
        iot.publish(
            topic=topic,
            qos=1,
            payload=json.dumps(payload, ensure_ascii=False)
        )
        counter+=1
    elapsed_time = time.time() - start
    print ("counter: {} elapsed_time: {} sec".format(counter, elapsed_time))

def lambda_handler(event, context):

    iot = boto3.client('iot-data')

    # payloadに乗せるデータの生成
    data = create_data(DATA_SIZE)

    # 1秒ごとにworkerスレッドを起動する
    time_interval = 1
    for i in range(PERIOD):
        print("{} {}".format(i, datetime.datetime.now()))
        now = time.time()
        t = threading.Thread(target = worker(iot, RPS, data))
        t.setDaemon(True)
        t.start()
        t.join()
        wait_time = time_interval - ( (time.time() - now) % time_interval )
        time.sleep(wait_time)

    time.sleep(1) # 送りきらないうちに終わってしまうのを防止する
    print("finish!")

3 動作確認

上記のLambdaをテスト実行した際のログです。

スレッドは、正確に1秒ごと30秒間実行されており、各スレッドは、1秒以内に指定された回数(ここでは80)を送信し終わっていることが分かります。

Test Event Name
TEST

Response
null

Function Logs
START RequestId: d2ddd86a-02b8-459d-bec2-bb0f61a31db9 Version: $LATEST
data_size:256
0 2021-06-18 23:52:43.869158
counter: 80 elapsed_time: 0.9053993225097656 sec
1 2021-06-18 23:52:44.869398
counter: 80 elapsed_time: 0.8486857414245605 sec
2 2021-06-18 23:52:45.869718
counter: 80 elapsed_time: 0.8146414756774902 sec
3 2021-06-18 23:52:46.870064
counter: 80 elapsed_time: 0.8807601928710938 sec
4 2021-06-18 23:52:47.870364
counter: 80 elapsed_time: 0.8055093288421631 sec
5 2021-06-18 23:52:48.870725
counter: 80 elapsed_time: 0.8281140327453613 sec

・・・略・・・

24 2021-06-18 23:53:07.878011
counter: 80 elapsed_time: 0.8389482498168945 sec
25 2021-06-18 23:53:08.878350
counter: 80 elapsed_time: 0.8618371486663818 sec
26 2021-06-18 23:53:09.878661
counter: 80 elapsed_time: 0.8278014659881592 sec
27 2021-06-18 23:53:10.879009
counter: 80 elapsed_time: 0.8369021415710449 sec
28 2021-06-18 23:53:11.879305
counter: 80 elapsed_time: 0.8307430744171143 sec
29 2021-06-18 23:53:12.879757
counter: 80 elapsed_time: 0.8583922386169434 sec
finish!
END RequestId: d2ddd86a-02b8-459d-bec2-bb0f61a31db9
REPORT RequestId: d2ddd86a-02b8-459d-bec2-bb0f61a31db9  Duration: 31087.18 ms   Billed Duration: 31088 ms   Memory Size: 10240 MB   Max Memory Used: 76 MB  Init Duration: 267.69 ms

次の図は、IoT CoreのログをCloudWatch Logs Insightsで確認しているものです。

毎秒、約80件のPublishが、30秒間、エラーなく到着していることを確認できます。

4 Lambdaの設定

上記は、Lambdaのメモリを最大の10240、タイムアウトは45秒で試しています。

メモリサイズは、毎秒送信しようとする数が、1秒以内に収まるだけの力が必要です。今回、毎秒80を安定して送るのに、このサイズが必要でした。 また、タイムアウトは、当然ですが、継続して送信する時間より、大きくとる必要があります。

Lambdaには、IoT Coreへpublishする権限が必要です。今回は、雑ですが、AWSIoTDataAccessを追加しちゃってます。

5 最後に

今回は、軽易にMQTT送信の動作確認ができるようにLambdaでクライアントを作成してみました。 試してみた結果、Lambdaのメモリを10240にしても、秒間80件ぐらいが限界だったので、これ以上は、やっぱりEC2を上げてやるしかないかも知れません。