この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
1 はじめに
CX 事業本部のデリバリー部の平内(SIN)です。
AWS re:Invent 2022 で発表された、AWS IoT Core での MQTT v5 対応に反応して、色々入門しています。
今回は、フォーマット識別要素を使用して、タイプの違う Payload をデコードする実装を試してみました。
なお、現時点(2022/11/30)では、AWS で提供される SDK は、MQTT v5 に対応していないため、サンプル作成には、paho.mqtt を使用させて頂きました。
参考:https://dev.classmethod.jp/articles/aws-iot-core-mqtt-v5-sdk-for-python
2 Payload Format Indicator/Content Type
MQTT v 3.1 では、Payload の内容をパースする要領は、全てクライアントの実装に任せられていました。 この為、Payload のフォーマットが一定でない場合は、その判別実装が必要でした。
MQTT v5 では、拡張されたプロパティに 2 つのフォーマット識別要素が追加されており、これを利用することで、判別のコストを大きく下げることができますま。
Property | Description | Input type | Command |
---|---|---|---|
Payload Format Indicator | Payload が、UTF-8 かどうかのブール値 | Byte | PUBLISH, CONNECT |
Content Type | コンテンツを説明する UTF-8 文字列 | UTF-8 string | PUBLISH, CONNECT |
(1) Payload Format Indicator
Payload の内容が、UTF-8 文字列か否かを示します。文字列を扱う Subscriber にとっては、有意義な識別要素となります。 このプロパティは、1 バイトしか消費されません。バイト列の場合、0、UTF-8 文字列の場合、1 をセットします。
(2) Content Type
Content Type は、ペイロードの中身のタイプを具体的に表現する識別要素です。
一般的には MIME タイプで表現されそうに見えますが、ここで指定されるものは、あくまで Publisher と Subscriber の間の取り決めです。
3 実装例
以下が、実装したコードです。
Connect の後、Payload Format Indicator及び、Content Typeを指定して、プレーンなテキストと JSON を Publish しています。 また、同時に Subscribe で自らが送信したデータを受信し、識別要素を頼りにデコードして表示しています。
import ssl
import os
import time
import json
import paho.mqtt.client as mqtt
from paho.mqtt.properties import Properties
from paho.mqtt.packettypes import PacketTypes
endpoint = "xxxxxxxxxxxx.iot.ap-northeast-1.amazonaws.com"
port = 8883
topic = "sensors/device1"
dir = os.path.dirname(os.path.abspath(__file__))
certs = {
"cafile": "{}/certificates/AmazonRootCA1.pem".format(dir),
"certfile": "{}/certificates/client-cert.pem".format(dir),
"keyfile": "{}/certificates/private-key.pem".format(dir),
}
def on_connect(client, user_data, flags, reason_code, properties=None):
print("on_connect")
client.subscribe(topic, qos=1)
def on_subscribe(mqttc, userdata, mid, granted_qos, properties=None):
print("on_subscribe")
def on_publish(client,userdata, result,properties=None):
print("on_publish")
def on_message(client, userdata, message):
data = ''
payload_format_indicator = message.properties.PayloadFormatIndicator
content_type = message.properties.ContentType
if(payload_format_indicator):
payload = str(message.payload.decode("utf-8"))
if(content_type == 'text/plain'):
data = payload
else:
if(content_type == 'application/json'):
data = json.loads(message.payload)
print('on_message payload:{} PayloadFormatIndicator:{} ContentType:{}'.format(data, payload_format_indicator, content_type))
def main():
client = mqtt.Client("client_id", protocol = mqtt.MQTTv5)
client.tls_set(certs["cafile"],
certfile=certs["certfile"],
keyfile=certs["keyfile"],
cert_reqs=ssl.CERT_REQUIRED,
tls_version=ssl.PROTOCOL_TLSv1_2,
ciphers=None)
client.on_connect = on_connect
client.on_subscribe = on_subscribe
client.on_publish = on_publish
client.on_message = on_message
client.connect(endpoint, port, properties = None)
client.loop_start()
time.sleep(1)
properties = Properties(PacketTypes.PUBLISH)
# 1回目のpublish UTF-8 の文字列を送信する
properties.PayloadFormatIndicator = 1
properties.ContentType = "text/plain"
payload = "hello world"
client.publish(topic, payload, qos=1, properties=properties)
time.sleep(1)
# 2回目のpublish JSONを送信する
properties.PayloadFormatIndicator = 0
properties.ContentType = "application/json"
payload = json.dumps({'message':'hello'})
client.publish(topic, payload, qos=1, properties=properties)
time.sleep(1)
if __name__ == "__main__":
main()
実行した結果です。適切にエンコードできていることが確認できます。
% python3 index.py
on_connect
on_subscribe
on_publish
on_message payload:hello world PayloadFormatIndicator:1 ContentType:text/plain
on_publish
on_message payload:{'message': 'hello'} PayloadFormatIndicator:0 ContentType:application/json
4 最後に
今回は、フォーマット識別要素(Payload Format Indicator及び、Content Type)を使用して、タイプの違う Payload をデコードする実装を試してみました。
Content-Type は、AWS IoT Code に依存しているものではないので、例えば、仕様で3種類の JSON 形式を扱う場合、その3種類を識別できる Content Type を定義し、クライアントとサーバで共有すれば良いことになります。この拡張も、Subscriber 側の実装コストを下げるものとして有効に使えそうです。