[AWS IoT Core] MQTT v5 を使用してメッセージ及び、セッション有効期限とクリーンスタートを実装して見ました

2022.12.03

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

1 はじめに

CX 事業本部のデリバリー部の平内(SIN)です。

AWS re:Invent 2022 で発表された、AWS IoT Core での MQTT v5 対応に反応して、色々入門しています。

今回は、メッセージ及び、セッション有効期限とクリーンスタートについての実装を試して見ました。

初めに、作成したサンプルが動作している様子を紹介させてください。

左のコンソールが Publisher、右のコンソールが Subscriber です。 Publisher は、パラメータに有効期間(秒)を指定してメッセージを送信します。 メッセージブローカーは、有効期間(秒)だけ、メッセージを保持するので、 期間内に Subscriber が立ち上がってくれば、そのメッセージを取得できます。

1 回目は、有効期間 3 秒で Publish し、3 秒以上経過してから Subscriber が上がったので、メッセージが取得できなかった状況です。 そして、2 回目、3 回目は、どちらも、期間内に上がったので、メッセージが取得できている様子を観測できています。

なお、Subscriber は、オフライン中に、ブローカーに到着しているメッセージを「有効」なものとするため、クリーンスタート(セッション初期化)しないで起動しています。

※ 現時点(2022/12/02)では、AWS で提供される SDK は、MQTT v5 に対応していないため、サンプル作成には、paho.mqtt を使用させて頂きました。

2 セッション、メッセージの有効期間及び、クリーンスタート

MQTT v5 では、個々の Publish メッセージに有効期限を設定できます。そして、その保存に影響するのが、セッションの有効期限とクリーンスタートです。

3つの機能を組み合わせることで、多彩なセッション管理とメッセージの運用が可能になります。

(1) メッセージの有効期限 (Message expiry feature)

PUBLISH 時にメッセージに有効期限(秒単位で指定)を設定すると、その期間に応じて、メッセージブローカーでメッセージは保持され、また自動的に削除されます。

これは、サブスクライバーが接続中かそうでないかに関係なく動作します。したがって、何らかの都合でサブスクライバーが切断中であっても、必要なメッセージが欠落する心配がなくなります。

メッセージの有効期限は、4 バイトの整数で設定できますが、「設定なし」の場合、メッセージは期限切れになりません。(無期限となる)

これは、MQTT v3.1 でも利用可能な Retained message によく似ていますが、Retained message は、トピック単位で1つのメッセージで利用するものなので、応用範囲は大きく異なるでしょう。

なお、PUBLISH メッセージには、 retain=true オプションも同時に設定可能です。

(2) セッションの有効期限 (Session expiry feature)

CONNECT 時にセッションの有効期限(秒単位で指定)を使用すると、セッションの固定間隔を定義できます。

有効期限が 0 に設定されているか、CONNECT パケットに有効期限の値が含まれていない場合、クライアントの接続が閉じた時点で、直ちにセッション情報がメッセージブローカー上から削除されます。

セッションの有効期限は、4 バイトの整数で設定できますが、AWS IoT Core では、時間単位の精度で最大が 7 日間となっています。

有効期限が経過すると、特定クライアントのセッション情報は自動的に切断され、保持されていたメッセージも破棄されます。

MQTT v3 MQTT v5
cleanSession=false sessionExpiry >0 , cleanStart=flase
cleanSession=true sessionExpiry 0 , cleanStart=true

(3) クリーン スタート (Slean start flag)

クリーンスタートは、セッションの有効期限と連携して設定できるフラグです。このフラグを有効(true)にすると既存のセッションを使用せず、新しいセッションが開始されます。

参考:https://docs.aws.amazon.com/iot/latest/developerguide/mqtt.html#mqtt5

3 利用例

MQTT v3.1 では、セッションは、削除するか、永続するかの選択しかありませんでしたが、期限が明示的に設定できるようになったことで、デバイスの破棄や廃止、テスト環境の撤収などがやりやすくなったと思います。

個々のメッセージに有効期限を設定できるようになった事で、仕様に応じたメッセージの運用が可能になっています。例えは、「現時点の緊急アラート」を示すメッセージと、「ファームウエアの要更新」メッセージでは、仕様上の有効期間が違ってくると思います。MQTT v3.1 では、このような要求を、サブスクライバー側で制御したり、パブリッシャー側の再送などで実装していたと思います。

また、一時的にオフラインとなっているデバイス宛のメッセージを、ブローカーで正確に保持できる仕組みは、非常に多くのデバイス(サブスクライバー)が接続されたシステムで、シンプルなシステム構築が可能になりそうです。

4 実装例

以下が、実装したコードです。

sub.py は、サブスクライバーです。セッションの有効期限は 600 秒で、clean_start=False となっているので、一度起動すると、終了しても、5分以内に起動すれば、セッションは継続されていることになります。

pub.pyは、パブリッシャーです。パラメーターで有効期限を指定してメッセージを送信します。

% python3 pub.py 120 (有効期限、120秒間のメッセージを送信する)

sub.py

import ssl
import time
import os
import paho.mqtt.client as mqtt
from paho.mqtt.properties import Properties
from paho.mqtt.packettypes import PacketTypes

endpoint = "xxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com"
port = 8883

dir = os.path.dirname(os.path.abspath(__file__))
certs = {
    "cafile": "{}/certificates/AmazonRootCA1.pem".format(dir),
    "certfile": "{}/certificates/client-cert.pem".format(dir),
    "keyfile": "{}/certificates/private-key.pem".format(dir),
}

def on_message(client, userdata, message):
    msg=str(message.payload.decode("utf-8"))
    print('on_message topic:{} {} {}'.format(message.topic, msg, message.properties))

def on_connect(client, user_data, flags, reason_code, properties=None):
    client.subscribe('sensor/#', qos=1)

def main():

    session_expiry_interval = 600

    client = mqtt.Client("server_id", protocol = mqtt.MQTTv5)
    client.tls_set(certs["cafile"],
            certfile=certs["certfile"],
            keyfile=certs["keyfile"],
            cert_reqs=ssl.CERT_REQUIRED,
            tls_version=ssl.PROTOCOL_TLSv1_2,
            ciphers=None)

    client.on_message = on_message
    client.on_connect = on_connect

    properties_connect = Properties(PacketTypes.CONNECT)
    properties_connect.SessionExpiryInterval = session_expiry_interval
    client.connect(endpoint, port, clean_start=False, properties=properties_connect)
    client.loop_start()
    time.sleep(5)


if __name__ == "__main__":
    main()

pub.py

import ssl
import sys
import os
import time
import paho.mqtt.client as mqtt
from paho.mqtt.properties import Properties
from paho.mqtt.packettypes import PacketTypes

endpoint = "xxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com"
port = 8883

args = sys.argv
if 2 != len(args):
    print("use: client messageExpiryInterval(sec)")
    exit()
messageExpiryInterval = int(args[1])


dir = os.path.dirname(os.path.abspath(__file__))
certs = {
    "cafile": "{}/certificates/AmazonRootCA1.pem".format(dir),
    "certfile": "{}/certificates/client-cert.pem".format(dir),
    "keyfile": "{}/certificates/private-key.pem".format(dir),
}

def on_connect(client, user_data, flags, reason_code, properties=None):
    properties = Properties(PacketTypes.PUBLISH)
    properties.MessageExpiryInterval = messageExpiryInterval
    topic = "sensor/device1"
    payload = "MESSAGE"
    client.publish(topic, payload, qos=1, properties=properties)

def main():
    client = mqtt.Client("client_id", protocol = mqtt.MQTTv5)
    client.tls_set(certs["cafile"],
            certfile=certs["certfile"],
            keyfile=certs["keyfile"],
            cert_reqs=ssl.CERT_REQUIRED,
            tls_version=ssl.PROTOCOL_TLSv1_2,
            ciphers=None)

    client.on_connect = on_connect

    client.connect(endpoint, port, properties = None)
    client.loop_start()

    time.sleep(1)

if __name__ == "__main__":
    main()

5 最後に

今回は、メッセージ及び、セッション有効期限とクリーンスタートについて確認してみました。

この3つの機能を組み合わせることで、メッセージの保持(キャッシュ)の作業をメッセージブローカーに任せることが可能になるため、個々のメッセージ仕様の応じた動作を、非常にシンプルに実装できそうです。

個人的には、メッセージの送信前後に関係なく、有効期限により運用できるあたりが、非常に興味深いと感じています。

引き続き、MQTT v5 で利用可能になった機能について、確認を進めたいと思います。

6 参考リンク


Introducing new MQTTv5 features for AWS IoT Core to help build flexible architecture patterns
[AWS IoT Core] MQTT v5 を使用してリクエスト・レスポンス パターンを実装して見ました
[AWS IoT Core] MQTT v5 を使用してユーザープロパティを実装して見ました
[AWS IoT Core] MQTT v5 を使用してトピック・エイリアスを実装して見ました