[AWS IoT] MQTT QoS=1による再送について確認してみました

2020.12.12

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

1 はじめに

CX事業本部の平内(SIN)です。

AWS IoT Coreで、MQTTをQoS=1(受信側も含む)でpublishした場合、適切に処理しないと、Coreから延々と再送されてしまいます。

下記は、その様子を再現してみたものです。

左側のウインドウで動作しているのは、トピック(test)をSubscribeして、メッセージの到着を待機しているプログラムです。

右側は、トピック(test)に対して、1回だけメッセージを送信して終了するプログラムです。

1回しか、送信していないのに、同じ内容の受信が止まりません。プログラムを再起動しても、それは続きます。

下に見えているのは、AWS IoTコンソールの「テスト」で、トピック(test)を確認しているものですが、こちらには、1回しか表示されていなので、一見すると「何処から誰が送っているのか?」と見えてもしまいます。

※ 2021/10/07 KeepAliveに関する記述を一部変更させていただいております。

2 QoS

MQTTのQoSは3つのレベルがありますが、AWS IoT Coreでは、このうち、0及び、1が利用可能です。

Qos 説明 AWS SDK
0 At most once mqtt.QoS.AT_MOST_ONCE
1 At least once mqtt.QoS.AT_LEAST_ONCE
2 Exactly once -

QoS=1では、メッセージを受け取った際に返されるPUBACKを受け取れるまでCoreから再送が続きます。

AWS Summit Tokyo 2018 資料 AWS IoT の賢い利用の仕方とプログラミングの勘所より

3 コード

最初の動画で使用したコードは、以下のとおりです。

mqtt.pyは、pulish側及び、subscribe側の両方で共通的に使用しているMQTTクラスです。 qosは、mqtt.QoS.AT_LEAST_ONCEとなっています。

mqtt.py

from awscrt import io, mqtt
from awsiot import mqtt_connection_builder

class Mqtt():
    def __init__(self, client_id):

        endpoint = "xxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com"
        root_ca = "./root-CA.crt"
        cert = "./cert.pem"
        key = "./private.key"

        event_loop_group = io.EventLoopGroup(1)
        host_resolver = io.DefaultHostResolver(event_loop_group)
        client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)

        self.__connection = mqtt_connection_builder.mtls_from_path(
            endpoint = endpoint,
            cert_filepath = cert,
            pri_key_filepath = key,
            client_bootstrap = client_bootstrap,
            ca_filepath = root_ca,
            client_id = client_id,
            clean_session = False,
            keep_alive_secs = 30)

        self.__connection.connect()

    def subscribe(self, topic, callback):
        self.__connection.subscribe(
            topic = topic,
            #qos = mqtt.QoS.AT_MOST_ONCE,
            qos = mqtt.QoS.AT_LEAST_ONCE,
            callback = callback)

    def publish(self, topic, payload):
        self.__connection.publish(
            topic = topic,
            payload = payload,
            #qos = mqtt.QoS.AT_MOST_ONCE)
            qos = mqtt.QoS.AT_LEAST_ONCE)

送信側のプログラムです。1回だけメッセージをpublishして終了しています。

publisher.py

# -*- coding: utf-8 -*-
import time
from mqtt import Mqtt

topic = "test"
mqtt = Mqtt("publisher")
mqtt.publish(topic, '{"message":"hello"}')

time.sleep(2) # 送信完了まで数秒待機

受信側のプログラムは、subscribeの後、永久ループで待機します。メッセージ到着時(callback)は、重たい処理を想定して、15秒待機しています。

subscriber.py

# -*- coding: utf-8 -*-
import time
from mqtt import Mqtt

def callback(topic, payload):
    print("receive: {}".format(payload))
    for i in range(15):
        print("sleep({})".format(i))
        time.sleep(1)
    print("finish.")

topic = "test"
mqtt = Mqtt("subscriber")
mqtt.subscribe(topic, callback)

while(True):
    time.sleep(1)

4 PUBACK

PUBACKを返す様子をWiresharkで確認してみました。

赤枠が、PUBACKですが、上は3秒後に返され、下は、7秒後に返されていることが、確認できます。

この違いは、受信したタイミングで処理されるCallbackの処理時間で変化しています。

def callback(topic, payload):
    print("receive: {}".format(payload))
    # for i in range(3): # <= 上は、3秒
    for i in range(7): # <= 上は、7秒
        print("sleep({})".format(i))
        time.sleep(1)

最初に、試したプログラムでは、ここが15秒となっていたので、一応、PUBACKは、返しているが、タイムアウトで「返信なし」と判断されています。

ここで言えるのは、QoS=1で処理する場合、callbackは、処理時間を意識する必要があるということです。

どうしても重たい処理となる場合は、一例として、下記のように、別スレッドで処理すれば良いかも知れません。

def func(payload):
    print("receive: {}".format(payload))
    for i in range(15):
        print("sleep({})".format(i))
        time.sleep(1)
    print("finish.")

def callback(topic, payload):
    thread1 = threading.Thread(target=func, args=(payload,))
    thread1.start()

また、mqtt_connection_builder.mtls_from_path() で指定する、keep_alive_secsでの調整も可能かも知れません。

5 最後に

QoS=1を使用すれば、「確実な送達」というだけでなく、デバイス(受信側)が起動していないタイミングでも、先にメッセージを送っておいて、「起動した時に処理させる」というような事も可能です。