この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
1 はじめに
CX事業本部の平内(SIN)です。
AWS IoT Coreで、MQTTをQoS=1(受信側も含む)でpublishした場合、適切に処理しないと、Coreから延々と再送されてしまいます。
下記は、その様子を再現してみたものです。
左側のウインドウで動作しているのは、トピック(test)をSubscribeして、メッセージの到着を待機しているプログラムです。
右側は、トピック(test)に対して、1回だけメッセージを送信して終了するプログラムです。
1回しか、送信していないのに、同じ内容の受信が止まりません。プログラムを再起動しても、それは続きます。
下に見えているのは、AWS IoTコンソールの「テスト」で、トピック(test)を確認しているものですが、こちらには、1回しか表示されていなので、一見すると「何処から誰が送っているのか?」と見えてもしまいます。
※ 2021/10/07 KeepAliveに関する記述を一部変更させていただいております。
2 QoS
MQTTのQoSは3つのレベルがありますが、AWS IoT Coreでは、このうち、0及び、1が利用可能です。
Qos | 説明 | AWS SDK |
---|---|---|
0 | At most once | mqtt.QoS.AT_MOST_ONCE |
1 | At least once | mqtt.QoS.AT_LEAST_ONCE |
2 | Exactly once | - |
QoS=1では、メッセージを受け取った際に返されるPUBACKを受け取れるまでCoreから再送が続きます。
AWS Summit Tokyo 2018 資料 AWS IoT の賢い利用の仕方とプログラミングの勘所より
3 コード
最初の動画で使用したコードは、以下のとおりです。
mqtt.pyは、pulish側及び、subscribe側の両方で共通的に使用しているMQTTクラスです。 qosは、mqtt.QoS.AT_LEAST_ONCEとなっています。
mqtt.py
from awscrt import io, mqtt
from awsiot import mqtt_connection_builder
class Mqtt():
def __init__(self, client_id):
endpoint = "xxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com"
root_ca = "./root-CA.crt"
cert = "./cert.pem"
key = "./private.key"
event_loop_group = io.EventLoopGroup(1)
host_resolver = io.DefaultHostResolver(event_loop_group)
client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
self.__connection = mqtt_connection_builder.mtls_from_path(
endpoint = endpoint,
cert_filepath = cert,
pri_key_filepath = key,
client_bootstrap = client_bootstrap,
ca_filepath = root_ca,
client_id = client_id,
clean_session = False,
keep_alive_secs = 30)
self.__connection.connect()
def subscribe(self, topic, callback):
self.__connection.subscribe(
topic = topic,
#qos = mqtt.QoS.AT_MOST_ONCE,
qos = mqtt.QoS.AT_LEAST_ONCE,
callback = callback)
def publish(self, topic, payload):
self.__connection.publish(
topic = topic,
payload = payload,
#qos = mqtt.QoS.AT_MOST_ONCE)
qos = mqtt.QoS.AT_LEAST_ONCE)
送信側のプログラムです。1回だけメッセージをpublishして終了しています。
publisher.py
# -*- coding: utf-8 -*-
import time
from mqtt import Mqtt
topic = "test"
mqtt = Mqtt("publisher")
mqtt.publish(topic, '{"message":"hello"}')
time.sleep(2) # 送信完了まで数秒待機
受信側のプログラムは、subscribeの後、永久ループで待機します。メッセージ到着時(callback)は、重たい処理を想定して、15秒待機しています。
subscriber.py
# -*- coding: utf-8 -*-
import time
from mqtt import Mqtt
def callback(topic, payload):
print("receive: {}".format(payload))
for i in range(15):
print("sleep({})".format(i))
time.sleep(1)
print("finish.")
topic = "test"
mqtt = Mqtt("subscriber")
mqtt.subscribe(topic, callback)
while(True):
time.sleep(1)
4 PUBACK
PUBACKを返す様子をWiresharkで確認してみました。
赤枠が、PUBACKですが、上は3秒後に返され、下は、7秒後に返されていることが、確認できます。
この違いは、受信したタイミングで処理されるCallbackの処理時間で変化しています。
def callback(topic, payload):
print("receive: {}".format(payload))
# for i in range(3): # <= 上は、3秒
for i in range(7): # <= 上は、7秒
print("sleep({})".format(i))
time.sleep(1)
最初に、試したプログラムでは、ここが15秒となっていたので、一応、PUBACKは、返しているが、タイムアウトで「返信なし」と判断されています。
ここで言えるのは、QoS=1で処理する場合、callbackは、処理時間を意識する必要があるということです。
どうしても重たい処理となる場合は、一例として、下記のように、別スレッドで処理すれば良いかも知れません。
def func(payload):
print("receive: {}".format(payload))
for i in range(15):
print("sleep({})".format(i))
time.sleep(1)
print("finish.")
def callback(topic, payload):
thread1 = threading.Thread(target=func, args=(payload,))
thread1.start()
また、mqtt_connection_builder.mtls_from_path() で指定する、keep_alive_secsでの調整も可能かも知れません。
5 最後に
QoS=1を使用すれば、「確実な送達」というだけでなく、デバイス(受信側)が起動していないタイミングでも、先にメッセージを送っておいて、「起動した時に処理させる」というような事も可能です。