[AWS IoT Core] MQTT v5 を使用してリクエスト・レスポンス パターンを実装して見ました

2022.11.30

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

1 はじめに

CX 事業本部のデリバリー部の平内(SIN)です。

AWS IoT Code で新規に対応となった MQTT v5 では、大規模なデバイス展開の通信強化や、メッセージングパターンを革新すると言われています。

正直なところ、MQTT v5 に触れたことがなく、なかなかイメージが湧かなかったので、今回は、勉強のため、メッセージングパターン拡張の1つとして挙げられる、「リクエスト・レスポンス」のパターンを実装してみました。

最初に、作成したサンプルが動作している様子です。 右側の縦長のコンソールがサーバで、左の 3 つが、3 個のクライアントをイメージしています。 後で確認していただくと、分かると思うのですが、サーバ側のプログラムは、特にクライアントを区別しないようにコーディングされているのですが、それぞれのクライアントは、自分の Subscribe したトピックに、送信時に設定したシーケンス番号(固有情報)と共に、受信できています。

なお、現時点(2022/11/30)では、AWS で提供される SDK は、MQTT v5 に対応していないため、サンプル作成には、paho.mqtt を使用させて頂きました。

2022/12/05 追記 Python SDK が開発者プレビューで MQTT v5 対応となりました。
参考:https://dev.classmethod.jp/articles/aws-iot-core-mqtt-v5-sdk-for-python

※ 本記事では、リクエスト・レスポンスの方向を表現するため、MQTT 通信を行う主体を「クライアント」「サーバ」のように表現していますが、MQTT では、単純に Subscriber 及び Publisher であることにご注意ください。

2 Request Response

HTTP 等を利用する Web アプリケーションでは、送信側と受信側が直接通信されるため、「リクエスト」に対する「レスポンス」という関係が常に成り立ちます。しかし、MQTT は、Publish/Subscribe で通信するため、特に作り込みが無い場合、このような関係に基づいた処理は実現できません。

MQTT v3.1 でも、必要な情報を追加することで、特定の「リクエスト」に基づく「レスポンス」を判別することは可能ですが、情報を付加できる場所は、Topic 若しくは、 Payload となり、本来の実装に少なからず影響することは避けられないでしょう。

MQTT v5 では、新規にプロパティ領域が追加され、リクエスト・レスポンスのパターンを支援するためのKeyがあります。これを使用する事で、スマートに実装が可能となっています。

3 新規追加プロパティ

MQTT v5 で新しく規定されたプロパティのうち、リクエスト・レスポンスパターンに関連するのは、以下の 4 つです。

名称 説明 データ型 コマンド
0x08 Response Topic Request-Response の返信 Topic String PUBLISH, Will
0x09 Correlation Data Request-Response の相互共有情報 Binary PUBLISH, Will
0x19 Request Response Information Request-Response 参照情報の要求 Byte CONNECT
0x1A Response Information Request-Response 参照情報 String CONNACK

※ 0x19 及び、0x1A については、今回実装しておりません。

(1) Response Topic

通常、クライアントは、特定の(ワイルドカードは使用しない)Topic を Subscribe して、サーバからのデータを待ち受けます。

プロパティResponse Topicを使用すれば、返信を希望する Topic をここに設定することで、待ち受けているトピックへの返信を要求することができます。

なお、Response Topicは、単なるプロパティ値であり、サーバ側で、この値を使用して Publish する実装は、開発者の作業となります。

リクエスト・レスポンスが構成できるということは、サーバ側で「クライアントに応じた Topic を出し分ける」というようなコストも、大きく削減されるかも知れません。

(2) Correlation Data

クライアント側でCorrelation Dataに何らかの識別情報等を付与して送信し、サーバ側で、これを折り返すことで、相互の共有情報として利用可能になります。

なお、Correlation Dataを適切に折り返す実装も、開発者の作業となります。

図では、送信データにシーケンス番号を付与し、クライアントで受け取ったレスポンスが、どの送信に基づくものであるかを判定しています。

4 実装例

以下が、実装したコードです。

server.py を 1 つ起動して、複数の client.py が実行可能ですが、server.py 側では、client 固有の処理は記述されていません。

client.py は、パラメータにクライアント ID を指定して、Subscribe する Topic 名に使用しています。

% python3 client.py clientId

server.py

import ssl
import time
import os
import paho.mqtt.client as mqtt
from paho.mqtt.properties import Properties
from paho.mqtt.packettypes import PacketTypes
import time

endpoint = "xxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com"
port = 8883

dir = os.path.dirname(os.path.abspath(__file__))
certs = {
    "cafile": "{}/certificates/AmazonRootCA1.pem".format(dir),
    "certfile": "{}/certificates/client-cert.pem".format(dir),
    "keyfile": "{}/certificates/private-key.pem".format(dir),
}

def on_publish(client, userdata, mid):
    print("on_publish")

def on_connect(client, userdata, flags, reasonCode,properties=None):
    print("on_connect flags:{} properties:{} reasonCode: {}".format(flags, properties, reasonCode))

def on_subscribe(mqttc, userdata, mid, granted_qos, properties=None):
    print("on_subscribe")

def on_disconnect(client, userdata, rc,properties):
    print('on_disconnect {} {} {} {}'.format(client, userdata, rc, properties))

def on_unsubscribe(client, userdata, mid, properties, reasonCodes):
    print('on_unsubscribe')

def on_message(client, userdata, message):
    msg=str(message.payload.decode("utf-8"))
    print('on_message topic:{} {} {}'.format(message.topic, msg, message.properties.CorrelationData))

    properties = Properties(PacketTypes.PUBLISH)

    # プロパティに設定されたCorrelation Dataを折り返す
    properties.CorrelationData = message.properties.CorrelationData
    # プロパティに設定されたトピック名にレスポンスを返す
    response_topic = message.properties.ResponseTopic

    client.publish(response_topic, "response from server", properties = properties)

def main():
    client_id = "server"
    sub_topic = "sensor/server"

    client = mqtt.Client(client_id, protocol = mqtt.MQTTv5)
    client.on_connect = on_connect
    client.on_message = on_message
    client.on_disconnect = on_disconnect
    client.on_subscribe = on_subscribe
    client.on_publish = on_publish

    client.tls_set(certs["cafile"],
                certfile=certs["certfile"],
                keyfile=certs["keyfile"],
                cert_reqs=ssl.CERT_REQUIRED,
                tls_version=ssl.PROTOCOL_TLSv1_2,
                ciphers=None)

    properties=None
    client.connect(endpoint, port, properties = properties)
    time.sleep(3) # 少しの待機が必要
    client.subscribe(sub_topic, qos=0)
    client.loop_start()

    while(True):
        time.sleep(1)
        print("loop")

if __name__ == "__main__":
    main()

client.py

import ssl
import time
import json
import os
import sys
import paho.mqtt.client as mqtt
from paho.mqtt.properties import Properties
from paho.mqtt.packettypes import PacketTypes

endpoint = "xxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com"
port = 8883

dir = os.path.dirname(os.path.abspath(__file__))
certs = {
    "cafile": "{}/certificates/AmazonRootCA1.pem".format(dir),
    "certfile": "{}/certificates/client-cert.pem".format(dir),
    "keyfile": "{}/certificates/private-key.pem".format(dir),
}


def on_publish(client, userdata, mid):
    print("on_publish")

def on_connect(client, userdata, flags, reasonCode,properties=None):
    print("on_connect flags:{} properties:{} reasonCode: {}".format(flags, properties, reasonCode))

def on_subscribe(mqttc, userdata, mid, granted_qos, properties=None):
    print("on_subscribe")

def on_disconnect(client, userdata, rc,properties):
    print('on_disconnect {} {} {} {}'.format(client, userdata, rc, properties))

def on_unsubscribe(client, userdata, mid, properties, reasonCodes):
    print('on_unsubscribe')

def on_message(client, userdata, message):
    msg=str(message.payload.decode("utf-8"))
    print('on_message topic:{} {} {}'.format(message.topic, msg, message.properties.CorrelationData))

def main():
    args = sys.argv
    if 2 != len(args):
        print("use: client CLIEND_ID")
        exit()
    client_id = args[1]

    pub_topic = "sensor/server"
    sub_topic = "sensor/{}".format(client_id)

    client = mqtt.Client(client_id, protocol = mqtt.MQTTv5)

    client.on_connect = on_connect
    client.on_message = on_message
    client.on_disconnect = on_disconnect
    client.on_subscribe = on_subscribe
    client.on_publish = on_publish

    client.tls_set(certs["cafile"],
                certfile=certs["certfile"],
                keyfile=certs["keyfile"],
                cert_reqs=ssl.CERT_REQUIRED,
                tls_version=ssl.PROTOCOL_TLSv1_2,
                ciphers=None)
    client.connect(endpoint, port, properties = None)
    time.sleep(3) # 少しの待機が必要
    client.subscribe(sub_topic, qos=0)
    client.loop_start()
    time.sleep(1)

    # プロパティに戻りのトピック名をセットする
    properties = Properties(PacketTypes.PUBLISH)
    properties.ResponseTopic = sub_topic

    for i in range(5):
        properties.CorrelationData = json.dumps({"seq":i}).encode('utf-8')
        client.publish(pub_topic, "message from {}".format(client_id), properties=properties)
        time.sleep(3)

    client.unsubscribe(sub_topic)
    client.disconnect()

if __name__ == "__main__":
    main()

5 最後に

今回、MQTT v5 を使用して、「リクエスト・レスポンス」のパターンを実装してみました。

実装して見て分かったのは、MQTT v5 で新規に利用できるようになった機能は、あくまで、実装が可能になったと言う意味だと言う事です。

他に利用可能になった機能についても、順次確認を進めたいと思います。

6 参考にさせて頂いたリンク


MQTT Version 5.0 のリクエスト・レスポンスパターンを試してみた
Introducing new MQTTv5 features for AWS IoT Core to help build flexible architecture patterns
MQTT 5 supported features
[AWS IoT Core] MQTT v5 を使用してユーザープロパティを実装して見ました
[AWS IoT Core] MQTT v5 を使用してトピック・エイリアスを実装して見ました
[AWS IoT Core] MQTT v5 を使用してメッセージ及び、セッション有効期限とクリーンスタートを実装して見ました