[AWS IoT Core] retained messages(保持メッセージ)を AWS IoT Device SDK v2 for Python で試してみました。

2021.08.25

1 はじめに

IoT事業部の平内(SIN)です

AWS IoT Coreで、retained messages(保持メッセージ)のサポートがアナウンスされていました。
AWS IoT Core now supports MQTT retained messages

今回は、この動作をAWS IoT Device SDK v2 for Pythonで確認してみました。

2 retained messages

MQTTプロトコルフォーマットのフラグでは、PublishでRETAINというフラグがあります。
http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718022

このフラグを設定されたメッセージは、永続的(AWS IoT Coreでは、最後に更新されてから3年後に自動的に消滅)に保持され、後からサブスクライブしても、その内容を取得する事ができます。

メッセージは、トピックに紐づいており、同じトピック名で新たにメッセージを送信すると上書きされます。

なお、既にサブスクライブしているクライアントには、通常のメッセージと同様に、直ちに送信されます。

この機能を利用すると、デバイスの初期設定や設定変更をパブリッシュする場面では、デバイスが、オンラインかどうかを追跡する必要が無くなります。

3 クライアント

サブスクライブ側もパブリッシュ側も、分かりやすいようにQoS=0(QoS.AT_MOST_ONCE)で書きました。

(1) サブスクライブ側

通常の実装と変わりません。

受信時のコールバックであるon_message_received()では、retainという変数を受け取る事ができ、通常のメッセージであれば、ここがFalse、永続メッセージであれば、Trueとなります。

import time
from awscrt import io, mqtt
from awsiot import mqtt_connection_builder

endpoint = "xxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com"
root_ca = "./certs/root-CA.crt"
cert = "./certs/xxxxxxxxxxxx-certificate.pem.crt"
key = "./certs/xxxxxxxxxxxx-private.pem.key"

topic = "retain-topic"
client_id = "subscriber"

def on_message_received(topic, payload, dup, qos, retain, **kwargs):
    print("received topic: {}".format(topic))
    print("payload: {}".format(payload.decode()))
    print("retain: {}".format(retain))

event_loop_group = io.EventLoopGroup(1)
host_resolver = io.DefaultHostResolver(event_loop_group)
client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
client = mqtt_connection_builder.mtls_from_path(
    endpoint = endpoint,
    cert_filepath = cert,
    pri_key_filepath = key,
    client_bootstrap = client_bootstrap,
    ca_filepath = root_ca,
    client_id = client_id,
    clean_session = False,
    keep_alive_secs = 6)
connect_future = client.connect()
connect_future.result()
print("Connected! {}".format(client_id))

subscribe_future, _ = client.subscribe(
    topic = topic,
    qos = mqtt.QoS.AT_MOST_ONCE,
    callback = on_message_received)
subscribe_future.result()

while True:
    time.sleep(1)

disconnect_future = client.disconnect()
disconnect_future.result()

(2) パブリッシュ側

Publishでは、retainフラグをTrueとしています。


参考:https://awslabs.github.io/aws-crt-python/api/mqtt.html#awscrt.mqtt.Connection

import json
import datetime
from awscrt import io, mqtt
from awsiot import mqtt_connection_builder

endpoint = "xxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com"
root_ca = "./certs/root-CA.crt"
cert = "./certs/xxxxxxxxxxxx-certificate.pem.crt"
key = "./certs/xxxxxxxxxxxx-private.pem.key"

topic = "retain-topic"
client_id = "publiser"

def on_message_received(topic, payload, **kwargs):
    print("received: {}".format(payload.decode()))

event_loop_group = io.EventLoopGroup(1)
host_resolver = io.DefaultHostResolver(event_loop_group)
client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
client = mqtt_connection_builder.mtls_from_path(
    endpoint = endpoint,
    cert_filepath = cert,
    pri_key_filepath = key,
    client_bootstrap = client_bootstrap,
    ca_filepath = root_ca,
    client_id = client_id,
    clean_session = False,
    keep_alive_secs = 6)
connect_future = client.connect()
connect_future.result()
print("Connected! {}".format(client_id))

# Publih
payload = {
    "timestamp": datetime.datetime.now().isoformat(sep=' ', timespec='milliseconds'),
    "status": 0
}
client.publish(
        topic=topic,
        payload=json.dumps(payload),
        qos=mqtt.QoS.AT_MOST_ONCE,
        retain=True)

disconnect_future = client.disconnect()
disconnect_future.result()

4 削除

0バイトのペイロードで、メッセージを送信すると、保持メッセージは削除されます。

client.publish(
        topic=topic,
        payload="",
        qos=mqtt.QoS.AT_MOST_ONCE,
        retain=True)

5 iot:RetainPublish

RetainフラグをTrueにしたメッセージをPublishするには、Actionでiot:RetainPublishAllowになっている必要があります。

例)

{
    "Effect": "Allow",
    "Action": [
        "iot:Publish",
        "iot:RetainPublish"
    ],
    "Resource": [
        "arn:aws:iot:ap-northeast-1:xxxxxxxx:topic/retain-topic"
    ]
}

6 AWS IoT コンソール

保持れたメッセージは、IoT Coreのコンソールから確認できます。

また、新たに発行(更新)したり、削除することも可能です。

7 動作確認

既にメッセージが保持されている場合に、サブスクライブすると、保持されたメッセージを受け取ります。

% python3 subscribe.py
Connected! subscriber
received topic: retain-topic
payload: {"timestamp": "2021-08-25 08:25:02.507", "status": 2}
retain: True

サブスクライブ中に、Retainフラグがセットされたメッセージが到着すると、通常のメッセージとして受信します。(retain: False)

received topic: retain-topic
payload: {"timestamp": "2021-08-25 08:26:22.014", "status": 3}
retain: False

サブスクライブ中に、Retainフラグがセットされ、ペーロードが0バイトのメッセージが到着しても、メッセージは受信しません。

8 最後に

retained messagesの特性をしっかり把握する事で、シャドウとはまた違った、便利な使い方ができそう気がしています。