この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
1 はじめに
IoT事業部の平内(SIN)です
AWS IoT Coreで、retained messages(保持メッセージ)のサポートがアナウンスされていました。
AWS IoT Core now supports MQTT retained messages
今回は、この動作をAWS IoT Device SDK v2 for Pythonで確認してみました。
2 retained messages
MQTTプロトコルフォーマットのフラグでは、PublishでRETAINというフラグがあります。
http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718022
このフラグを設定されたメッセージは、永続的(AWS IoT Coreでは、最後に更新されてから3年後に自動的に消滅)に保持され、後からサブスクライブしても、その内容を取得する事ができます。
メッセージは、トピックに紐づいており、同じトピック名で新たにメッセージを送信すると上書きされます。
なお、既にサブスクライブしているクライアントには、通常のメッセージと同様に、直ちに送信されます。
この機能を利用すると、デバイスの初期設定や設定変更をパブリッシュする場面では、デバイスが、オンラインかどうかを追跡する必要が無くなります。
3 クライアント
サブスクライブ側もパブリッシュ側も、分かりやすいようにQoS=0(QoS.AT_MOST_ONCE)で書きました。
(1) サブスクライブ側
通常の実装と変わりません。
受信時のコールバックであるon_message_received()では、retainという変数を受け取る事ができ、通常のメッセージであれば、ここがFalse、永続メッセージであれば、Trueとなります。
import time
from awscrt import io, mqtt
from awsiot import mqtt_connection_builder
endpoint = "xxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com"
root_ca = "./certs/root-CA.crt"
cert = "./certs/xxxxxxxxxxxx-certificate.pem.crt"
key = "./certs/xxxxxxxxxxxx-private.pem.key"
topic = "retain-topic"
client_id = "subscriber"
def on_message_received(topic, payload, dup, qos, retain, **kwargs):
print("received topic: {}".format(topic))
print("payload: {}".format(payload.decode()))
print("retain: {}".format(retain))
event_loop_group = io.EventLoopGroup(1)
host_resolver = io.DefaultHostResolver(event_loop_group)
client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
client = mqtt_connection_builder.mtls_from_path(
endpoint = endpoint,
cert_filepath = cert,
pri_key_filepath = key,
client_bootstrap = client_bootstrap,
ca_filepath = root_ca,
client_id = client_id,
clean_session = False,
keep_alive_secs = 6)
connect_future = client.connect()
connect_future.result()
print("Connected! {}".format(client_id))
subscribe_future, _ = client.subscribe(
topic = topic,
qos = mqtt.QoS.AT_MOST_ONCE,
callback = on_message_received)
subscribe_future.result()
while True:
time.sleep(1)
disconnect_future = client.disconnect()
disconnect_future.result()
(2) パブリッシュ側
Publishでは、retainフラグをTrueとしています。
参考:https://awslabs.github.io/aws-crt-python/api/mqtt.html#awscrt.mqtt.Connection
import json
import datetime
from awscrt import io, mqtt
from awsiot import mqtt_connection_builder
endpoint = "xxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com"
root_ca = "./certs/root-CA.crt"
cert = "./certs/xxxxxxxxxxxx-certificate.pem.crt"
key = "./certs/xxxxxxxxxxxx-private.pem.key"
topic = "retain-topic"
client_id = "publiser"
def on_message_received(topic, payload, **kwargs):
print("received: {}".format(payload.decode()))
event_loop_group = io.EventLoopGroup(1)
host_resolver = io.DefaultHostResolver(event_loop_group)
client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
client = mqtt_connection_builder.mtls_from_path(
endpoint = endpoint,
cert_filepath = cert,
pri_key_filepath = key,
client_bootstrap = client_bootstrap,
ca_filepath = root_ca,
client_id = client_id,
clean_session = False,
keep_alive_secs = 6)
connect_future = client.connect()
connect_future.result()
print("Connected! {}".format(client_id))
# Publih
payload = {
"timestamp": datetime.datetime.now().isoformat(sep=' ', timespec='milliseconds'),
"status": 0
}
client.publish(
topic=topic,
payload=json.dumps(payload),
qos=mqtt.QoS.AT_MOST_ONCE,
retain=True)
disconnect_future = client.disconnect()
disconnect_future.result()
4 削除
0バイトのペイロードで、メッセージを送信すると、保持メッセージは削除されます。
client.publish(
topic=topic,
payload="",
qos=mqtt.QoS.AT_MOST_ONCE,
retain=True)
5 iot:RetainPublish
RetainフラグをTrueにしたメッセージをPublishするには、Actionでiot:RetainPublishがAllowになっている必要があります。
例)
{
"Effect": "Allow",
"Action": [
"iot:Publish",
"iot:RetainPublish"
],
"Resource": [
"arn:aws:iot:ap-northeast-1:xxxxxxxx:topic/retain-topic"
]
}
6 AWS IoT コンソール
保持れたメッセージは、IoT Coreのコンソールから確認できます。
また、新たに発行(更新)したり、削除することも可能です。
7 動作確認
既にメッセージが保持されている場合に、サブスクライブすると、保持されたメッセージを受け取ります。
% python3 subscribe.py
Connected! subscriber
received topic: retain-topic
payload: {"timestamp": "2021-08-25 08:25:02.507", "status": 2}
retain: True
サブスクライブ中に、Retainフラグがセットされたメッセージが到着すると、通常のメッセージとして受信します。(retain: False)
received topic: retain-topic
payload: {"timestamp": "2021-08-25 08:26:22.014", "status": 3}
retain: False
サブスクライブ中に、Retainフラグがセットされ、ペーロードが0バイトのメッセージが到着しても、メッセージは受信しません。
8 最後に
retained messagesの特性をしっかり把握する事で、シャドウとはまた違った、便利な使い方ができそう気がしています。