この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
1 はじめに
CX事業本部の平内(SIN)です。
AWS IoT CoreでMQTTのメッセージブローカーを利用したシステムを構築する際に、大量のメッセージに対応できているかどうかの確認が必要になる場合があると思います。
今回は、1秒間に17,000件のメッセージをPublishする、テスト用のクライアントを作成してみました。
ちなみに、17,000という数字は、使用したインスタンスで、1秒間に処理できる最大量で決まりました。もっと強いインスタンスを使用すれば、もしかすると、これ以上の数字も出せるのかも知れませんが、すいません、試していません。
2 構成
テスト用のクライアントは、Pythonで作成されており、EC2上で動作させています。
沢山のメッセージを送信をするために、一定のCPUやネットワークが必要なため、EC2のインスタンスは、m5.24xlargeを使用しています。
3 コード
テスト用のクライアントは、以下の通りです。
1秒ごとに1つのスレッドを起動し、スレッド中で、17,000件のpublishを行っています。
MQTTクライアントは、4,000個作成して接続し、順番に利用しています。後で紹介しますが、沢山のメッセージを送信するためには、多くの接続が必要です。
import json
import time
import math
import datetime
import threading
from awscrt import io, mqtt
from awsiot import mqtt_connection_builder
# MQTTオブジェクト
class Mqtt():
def __init__(self, endpoint, client_id, root_ca, cert, key):
event_loop_group = io.EventLoopGroup(1)
host_resolver = io.DefaultHostResolver(event_loop_group)
client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
self.__mqtt_connection = mqtt_connection_builder.mtls_from_path(
endpoint = endpoint,
cert_filepath = cert,
pri_key_filepath = key,
client_bootstrap = client_bootstrap,
ca_filepath = root_ca,
client_id = client_id,
clean_session = False,
keep_alive_secs = 6)
connect_future = self.__mqtt_connection.connect()
connect_future.result()
print("Connected! {}".format(client_id))
def publish(self, topic, payload):
self.__mqtt_connection.publish(
topic = topic,
payload = json.dumps(payload),
qos = mqtt.QoS.AT_MOST_ONCE) # mqtt.QoS.AT_LEAST_ONCE
def __del__(self):
disconnect_future = self.__mqtt_connection.disconnect()
disconnect_future.result()
# MQTTクライアントの生成
def create_clients(client_max, client_id_prefix, endpoint, root_ca, cert, key):
print("mqtt clinet_max:{} client_id_prefix:{}".format(client_max, client_id_prefix))
clients = []
for i in range(client_max):
clients.append(Mqtt(endpoint,"{}_{:04d}".format(client_id_prefix, i), root_ca, cert, key))
return clients
# payloadに乗せるデータの生成
def create_data(data_size):
print("data_size:{}".format(data_size))
data = ''
for i in range(data_size):
data += 'X'
return data
RPS = 17000
PERIOD = 10 # 送信時間[sec]
DATA_SIZE = 128 # byte
CLIENT_ID_PREFIX = "XF"
CLIENT_MAX = 4000 # 同一Clientの秒間制限があるため
endpoint = "xxxxxxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com"
cert = "./certs/xxxxxxxx-certificate.pem.crt"
key = "./certs/xxxxxxxx-private.pem.key"
root_ca = "./certs/root-CA.crt"
#topic = "load_test" # message broker
topic = '$aws/rules/load_test/topic/load_test' # basic ingest
# MATTクライアント生成
clients = create_clients(CLIENT_MAX, CLIENT_ID_PREFIX, endpoint, root_ca, cert, key)
# payloadに乗せるデータの生成
data = create_data(DATA_SIZE)
print("CLIENT_ID_PREFIX:{} CLIENT_MAX:{}".format(CLIENT_ID_PREFIX, CLIENT_MAX))
print("RPS:{} PERIOD:{} [sec]".format(RPS, PERIOD))
def worker(rps):
counter = 0
start = time.time()
for i in range(rps):
payload = {
"producer_timestamp": datetime.datetime.now().isoformat(sep=' ', timespec='milliseconds'),
"counter": counter,
"data": data
}
index = i % CLIENT_MAX
clients[index].publish(topic, payload)
counter += 1
elapsed_time = time.time() - start
print ("counter: {} elapsed_time: {} sec".format(counter, elapsed_time))
time_interval = 1
for i in range(PERIOD):
print("{} {}".format(i, datetime.datetime.now()))
now = time.time()
t = threading.Thread(target = worker(RPS))
t.setDaemon(True)
t.start()
t.join()
wait_time = time_interval - ( (time.time() - now) % time_interval )
time.sleep(wait_time)
print("wait...")
time.sleep(1) # 送りきらないうちに終わってしまうのを防止する
# MATTクライアント破棄
for i in range(CLIENT_MAX):
del clients[0]
print("finish!")
4 動作確認
上記のコードを実行すると、以下のような出力となります。各スレッドが、正確に1秒単位で実行され、各スレッドは、1秒以内に17,000件の送信を完了していることが分かります。
data_size:128
CLIENT_ID_PREFIX:XF CLIENT_MAX:4000
RPS:17000 PERIOD:10 [sec]
0 2021-06-05 00:54:00.875850
counter: 17000 elapsed_time: 0.901975154876709 sec
1 2021-06-05 00:54:01.876103
counter: 17000 elapsed_time: 0.8145761489868164 sec
2 2021-06-05 00:54:02.876444
counter: 17000 elapsed_time: 0.7705812454223633 sec
3 2021-06-05 00:54:03.876837
counter: 17000 elapsed_time: 0.7373974323272705 sec
4 2021-06-05 00:54:04.877293
counter: 17000 elapsed_time: 0.7749261856079102 sec
5 2021-06-05 00:54:05.877650
counter: 17000 elapsed_time: 0.7767212390899658 sec
6 2021-06-05 00:54:06.878021
counter: 17000 elapsed_time: 0.7741456031799316 sec
7 2021-06-05 00:54:07.878389
counter: 17000 elapsed_time: 0.76737380027771 sec
8 2021-06-05 00:54:08.878803
counter: 17000 elapsed_time: 0.7726783752441406 sec
9 2021-06-05 00:54:09.879194
counter: 17000 elapsed_time: 0.7592222690582275 sec
wait...
finish!
AWS IoT Coreを詳細ログを出力し、確認している様子です。
1秒ごとのPublish-In(status=Success)の件数を確認すると、概ね17,000/secのメッセージが到着していることを確認できます。
fields @timestamp, @message
| sort @timestamp desc
| filter clientId like /XF_.*/
| filter eventType = "Publish-In" and status = "Success"
| stats count(eventType) by bin(1s)
また、失敗のログ「Publish-In(status=Failure)」は記録されていません。
fields @timestamp, @message
| sort @timestamp desc
| filter clientId like /XF_.*/
| filter eventType = "Publish-In" and status != "Success"
5 クライアント数
ここまで、対応可能なクライアント数で送信していたため、エラーは発生していませんが、クライアント数が不足した場合は、以下の様な状況となります。
例として、クライアント数を1とし、毎秒300件を5秒間だけ送信してみています。
RPS = 300
PERIOD = 5 # 送信時間[sec]
DATA_SIZE = 128 # byte
CLIENT_ID_PREFIX = "XJ"
CLIENT_MAX = 1 # 同一 Clientでの秒間制限がある
Connected! XJ_0000
data_size:128
CLIENT_ID_PREFIX:XJ CLIENT_MAX:1
RPS:300 PERIOD:5 [sec]
0 2021-06-05 02:07:44.081579
counter: 300 elapsed_time: 0.9059906005859375 sec
1 2021-06-05 02:07:45.081736
counter: 300 elapsed_time: 0.9057602882385254 sec
2 2021-06-05 02:07:46.081907
counter: 300 elapsed_time: 0.9057824611663818 sec
3 2021-06-05 02:07:47.082070
counter: 300 elapsed_time: 0.9058058261871338 sec
4 2021-06-05 02:07:48.082242
counter: 300 elapsed_time: 0.9058108329772949 sec
wait...
finish!
到着数は、以下です。毎秒300件のはずが、全然足りていないことが分かります。
また、ERRORログも記録されており、reasonは、throttledとなっていました。
こちらは、AWS IoT Core メッセージブローカーの制限にある、「接続別の 1 秒あたりのパブリッシュリクエスト数」のようです。
https://docs.aws.amazon.com/ja_jp/general/latest/gr/iot-core.html
6 最後に
今回は、テスト用に使用するため、大量にpublishするクライアントを作成してみました。
ログに、Publish-InのERRORが出力された時、reasonが、throttledとなっていたので、少し戸惑ったのですが、今回試した程度の量であれば、接続数を増加させることで回避が可能のようでした。
扱いが簡単なようにと、1インスタンス、1アプリの形式で作業しましたが、これ以上となると、もしかすると、インスタンス数を増やす方向の方が簡単なのかも知れません。