[AWS IoT] 軽易にMQTTメッセージをpublishするテスト用クライアントをLambdaで作成してみました
1 はじめに
CX事業本部の平内(SIN)です。
以前、比較的高いトラフィックで連続したしたMQTTメッセージを送信するために、テスト用のクライアントを作ってみました。
上記は、EC2上で実行するpythonプログラムとして作成しましたが、今回は、少しのデータでいいから軽易に利用できるものという事で、同じ仕組みをLambdaでやってみました。
boto3でやってますので、publishは、HTTPとなっています。
IoTDataPlane / publish(**kwargs)
IoT Coreのログで確認する分かりますが、ClientIdはありませんので、ClientIdでフィルタするような使い方はできません。
2 コード
Lambdaのコードです。
1秒単位で起動されるスレッド(worker)の中で、指定された回数のpublishを行います。
以下では、「毎秒80件」、「30秒間継続」、「Payloadのサイズは、約256byte」の設定となっています。
import json import boto3 import time import datetime import threading topic = "load_test" # message broker #topic = '$aws/rules/load_test/topic/load_test' # basic ingest RPS = 80 # 秒間のpublish数 PERIOD = 30 # 送信時間[sec] DATA_SIZE = 256 # byte # payloadに乗せるデータの生成 def create_data(data_size): print("data_size:{}".format(data_size)) data = '' for i in range(data_size): data += 'X' return data def worker(iot, rps, data): start = time.time() counter = 0 for i in range(rps): payload = { "producer_timestamp": datetime.datetime.now().isoformat(sep=' ', timespec='milliseconds'), "counter": counter, "data": data } iot.publish( topic=topic, qos=1, payload=json.dumps(payload, ensure_ascii=False) ) counter+=1 elapsed_time = time.time() - start print ("counter: {} elapsed_time: {} sec".format(counter, elapsed_time)) def lambda_handler(event, context): iot = boto3.client('iot-data') # payloadに乗せるデータの生成 data = create_data(DATA_SIZE) # 1秒ごとにworkerスレッドを起動する time_interval = 1 for i in range(PERIOD): print("{} {}".format(i, datetime.datetime.now())) now = time.time() t = threading.Thread(target = worker(iot, RPS, data)) t.setDaemon(True) t.start() t.join() wait_time = time_interval - ( (time.time() - now) % time_interval ) time.sleep(wait_time) time.sleep(1) # 送りきらないうちに終わってしまうのを防止する print("finish!")
3 動作確認
上記のLambdaをテスト実行した際のログです。
スレッドは、正確に1秒ごと30秒間実行されており、各スレッドは、1秒以内に指定された回数(ここでは80)を送信し終わっていることが分かります。
Test Event Name TEST Response null Function Logs START RequestId: d2ddd86a-02b8-459d-bec2-bb0f61a31db9 Version: $LATEST data_size:256 0 2021-06-18 23:52:43.869158 counter: 80 elapsed_time: 0.9053993225097656 sec 1 2021-06-18 23:52:44.869398 counter: 80 elapsed_time: 0.8486857414245605 sec 2 2021-06-18 23:52:45.869718 counter: 80 elapsed_time: 0.8146414756774902 sec 3 2021-06-18 23:52:46.870064 counter: 80 elapsed_time: 0.8807601928710938 sec 4 2021-06-18 23:52:47.870364 counter: 80 elapsed_time: 0.8055093288421631 sec 5 2021-06-18 23:52:48.870725 counter: 80 elapsed_time: 0.8281140327453613 sec ・・・略・・・ 24 2021-06-18 23:53:07.878011 counter: 80 elapsed_time: 0.8389482498168945 sec 25 2021-06-18 23:53:08.878350 counter: 80 elapsed_time: 0.8618371486663818 sec 26 2021-06-18 23:53:09.878661 counter: 80 elapsed_time: 0.8278014659881592 sec 27 2021-06-18 23:53:10.879009 counter: 80 elapsed_time: 0.8369021415710449 sec 28 2021-06-18 23:53:11.879305 counter: 80 elapsed_time: 0.8307430744171143 sec 29 2021-06-18 23:53:12.879757 counter: 80 elapsed_time: 0.8583922386169434 sec finish! END RequestId: d2ddd86a-02b8-459d-bec2-bb0f61a31db9 REPORT RequestId: d2ddd86a-02b8-459d-bec2-bb0f61a31db9 Duration: 31087.18 ms Billed Duration: 31088 ms Memory Size: 10240 MB Max Memory Used: 76 MB Init Duration: 267.69 ms
次の図は、IoT CoreのログをCloudWatch Logs Insightsで確認しているものです。
毎秒、約80件のPublishが、30秒間、エラーなく到着していることを確認できます。
4 Lambdaの設定
上記は、Lambdaのメモリを最大の10240、タイムアウトは45秒で試しています。
メモリサイズは、毎秒送信しようとする数が、1秒以内に収まるだけの力が必要です。今回、毎秒80を安定して送るのに、このサイズが必要でした。 また、タイムアウトは、当然ですが、継続して送信する時間より、大きくとる必要があります。
Lambdaには、IoT Coreへpublishする権限が必要です。今回は、雑ですが、AWSIoTDataAccessを追加しちゃってます。
5 最後に
今回は、軽易にMQTT送信の動作確認ができるようにLambdaでクライアントを作成してみました。 試してみた結果、Lambdaのメモリを10240にしても、秒間80件ぐらいが限界だったので、これ以上は、やっぱりEC2を上げてやるしかないかも知れません。