[AWS IoT Core] MQTT v5 を使用してフォーマット識別要素で判別する Payload のパースを実装して見ました

2022.12.03

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

1 はじめに

CX 事業本部のデリバリー部の平内(SIN)です。

AWS re:Invent 2022 で発表された、AWS IoT Core での MQTT v5 対応に反応して、色々入門しています。

今回は、フォーマット識別要素を使用して、タイプの違う Payload をデコードする実装を試してみました。

なお、現時点(2022/11/30)では、AWS で提供される SDK は、MQTT v5 に対応していないため、サンプル作成には、paho.mqtt を使用させて頂きました。

2022/12/05 追記 Python SDK が開発者プレビューで MQTT v5 対応となりました。
参考:https://dev.classmethod.jp/articles/aws-iot-core-mqtt-v5-sdk-for-python

2 Payload Format Indicator/Content Type

MQTT v 3.1 では、Payload の内容をパースする要領は、全てクライアントの実装に任せられていました。 この為、Payload のフォーマットが一定でない場合は、その判別実装が必要でした。

MQTT v5 では、拡張されたプロパティに 2 つのフォーマット識別要素が追加されており、これを利用することで、判別のコストを大きく下げることができますま。

Property Description Input type Command
Payload Format Indicator Payload が、UTF-8 かどうかのブール値 Byte PUBLISH, CONNECT
Content Type コンテンツを説明する UTF-8 文字列 UTF-8 string PUBLISH, CONNECT

(1) Payload Format Indicator

Payload の内容が、UTF-8 文字列か否かを示します。文字列を扱う Subscriber にとっては、有意義な識別要素となります。 このプロパティは、1 バイトしか消費されません。バイト列の場合、0、UTF-8 文字列の場合、1 をセットします。

(2) Content Type

Content Type は、ペイロードの中身のタイプを具体的に表現する識別要素です。

一般的には MIME タイプで表現されそうに見えますが、ここで指定されるものは、あくまで Publisher と Subscriber の間の取り決めです。

3 実装例

以下が、実装したコードです。

Connect の後、Payload Format Indicator及び、Content Typeを指定して、プレーンなテキストと JSON を Publish しています。 また、同時に Subscribe で自らが送信したデータを受信し、識別要素を頼りにデコードして表示しています。

import ssl
import os
import time
import json
import paho.mqtt.client as mqtt
from paho.mqtt.properties import Properties
from paho.mqtt.packettypes import PacketTypes

endpoint = "xxxxxxxxxxxx.iot.ap-northeast-1.amazonaws.com"
port = 8883
topic = "sensors/device1"

dir = os.path.dirname(os.path.abspath(__file__))
certs = {
    "cafile": "{}/certificates/AmazonRootCA1.pem".format(dir),
    "certfile": "{}/certificates/client-cert.pem".format(dir),
    "keyfile": "{}/certificates/private-key.pem".format(dir),
}

def on_connect(client, user_data, flags, reason_code, properties=None):
    print("on_connect")
    client.subscribe(topic, qos=1)

def on_subscribe(mqttc, userdata, mid, granted_qos, properties=None):
    print("on_subscribe")

def on_publish(client,userdata, result,properties=None):
    print("on_publish")

def on_message(client, userdata, message):
    data = ''
    payload_format_indicator = message.properties.PayloadFormatIndicator
    content_type = message.properties.ContentType
    if(payload_format_indicator):
        payload = str(message.payload.decode("utf-8"))
        if(content_type == 'text/plain'):
            data = payload
    else:
        if(content_type == 'application/json'):
            data = json.loads(message.payload)

    print('on_message payload:{} PayloadFormatIndicator:{} ContentType:{}'.format(data, payload_format_indicator, content_type))

def main():
    client = mqtt.Client("client_id", protocol = mqtt.MQTTv5)
    client.tls_set(certs["cafile"],
            certfile=certs["certfile"],
            keyfile=certs["keyfile"],
            cert_reqs=ssl.CERT_REQUIRED,
            tls_version=ssl.PROTOCOL_TLSv1_2,
            ciphers=None)

    client.on_connect = on_connect
    client.on_subscribe = on_subscribe
    client.on_publish = on_publish
    client.on_message = on_message

    client.connect(endpoint, port, properties = None)
    client.loop_start()

    time.sleep(1)

    properties = Properties(PacketTypes.PUBLISH)

    # 1回目のpublish UTF-8 の文字列を送信する
    properties.PayloadFormatIndicator = 1
    properties.ContentType = "text/plain"
    payload = "hello world"
    client.publish(topic, payload, qos=1, properties=properties)
    time.sleep(1)

    # 2回目のpublish JSONを送信する
    properties.PayloadFormatIndicator = 0
    properties.ContentType = "application/json"
    payload = json.dumps({'message':'hello'})
    client.publish(topic, payload, qos=1, properties=properties)
    time.sleep(1)

if __name__ == "__main__":
    main()

実行した結果です。適切にエンコードできていることが確認できます。

% python3 index.py
on_connect
on_subscribe
on_publish
on_message payload:hello world PayloadFormatIndicator:1 ContentType:text/plain
on_publish
on_message payload:{'message': 'hello'} PayloadFormatIndicator:0 ContentType:application/json

4 最後に

今回は、フォーマット識別要素(Payload Format Indicator及び、Content Type)を使用して、タイプの違う Payload をデコードする実装を試してみました。

Content-Type は、AWS IoT Code に依存しているものではないので、例えば、仕様で3種類の JSON 形式を扱う場合、その3種類を識別できる Content Type を定義し、クライアントとサーバで共有すれば良いことになります。この拡張も、Subscriber 側の実装コストを下げるものとして有効に使えそうです。