AWS IoTを使ったIoTエッジソフト開発(Python on Raspberry Pi OS)でやって良かったこと
はじめに
タイトルの通りですが、Raspberry Pi OS上で動作するIoTエッジ開発でやって良かったことを紹介します。
本記事の注意点
- 本記事ではハードウェアとの連携部分に関してはほぼありません。あくまでIoTデバイス側のソフトウェア実装(これを本記事ではエッジ側呼びます)を、クラウド(AWS IoT Core)と連携する上で気をつけて良かったことを紹介します
- 本記事で紹介するプログラムは断片的です。詳細は適宜reterminal-sampleを参照してください。Mac, Raspberry Pi OS両方で動作します。利用方法は、記事の末尾の
項.付録
を参照してください - MQTT接続にはaws-crt-pythonを利用前提で記載しています。実現できなかったことに関しては、ラッパーであるaws-iot-device-sdk-python-v2でのみ可能かどうかを検証しています。他のライブラリに関しては未検証です
- 各解決策はベストプラックティスではないため、参考程度でお願いします
- 中にはエッジ開発じゃなくても当然でしょ、、という項目もありますが改めて確認という形で見て頂けると幸いです
- 組み込み開発ではありません。Pythonを用いたサーバーサイドアプリ開発とほぼ同じです
環境
利用するOS、ツールチェインは以下の通りです。
# Raspberry Pi OSバージョン $ lsb_release -a No LSB modules are available. Distributor ID: Raspbian Description: Raspbian GNU/Linux 10 (buster) Release: 10 Codename: buster # Pythonバージョン $ python3 --version Python 3.10.4 # poetryバージョン $ poetry --version Poetry (version 1.2.1)
MQTT関連
サブスクライブトピックへパブリッシュ時に処理を行うコールバック関数はすぐに完了させ、重い処理は別スレッドで行う
事象/再現コード
事象を説明するため、以下のようなコードを示します。
ソースコード
import utils.config as Config import utils.logger as Logger from awscrt import io, mqtt from threading import Thread import json import time config = Config.getConfig() Logger.init() logger = Logger.getLogger(__name__) AWS_IOT_ENDPOINT = config.IotEndpoint # AWS IoTのサポートするkeep-alive 5 - 1200 s KEEP_ALIVE_SECS = 5 # ms単位なので、KEEP_ALIVE_SECS * 1000 より短い必要がある PING_TIMEOUT_MS = 1000 THING_NAME = "test-thing" SUB_TOPIC_A = "sub/a" SUB_TOPIC_B = "sub/b" class PublishThread(Thread): def __init__(self, mqtt_connection=None, publish_topic_name=None): self.connection = mqtt_connection self.publish_topic_name = publish_topic_name Thread.__init__(self) def run(self): while True: timestamp = int(time.time()) logger.info(f"publish {self.name} {timestamp}") self.connection.publish( topic=self.publish_topic_name, payload=json.dumps({f"{self.native_id}": timestamp}), qos=mqtt.QoS.AT_MOST_ONCE, ) logger.info("publish done") time.sleep(1) def createMQTTConnection( device_certificate_filepath: str, private_key_filepath: str, ca_certificate_filepath: str, ): event_loop_group = io.EventLoopGroup(1) host_resolver = io.DefaultHostResolver(event_loop_group) client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver) client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver) tls_ctx_options = io.TlsContextOptions.create_client_with_mtls_from_path( device_certificate_filepath, private_key_filepath ) tls_ctx_options.override_default_trust_store_from_path( None, ca_certificate_filepath ) tls_ctx = io.ClientTlsContext(tls_ctx_options) mqtt_client = mqtt.Client(client_bootstrap, tls_ctx) mqtt_connection = mqtt.Connection( client=mqtt_client, client_id=THING_NAME, host_name=AWS_IOT_ENDPOINT, port=8883, clean_session=False, keep_alive_secs=KEEP_ALIVE_SECS, ping_timeout_ms=PING_TIMEOUT_MS, ) connect_future = mqtt_connection.connect() connect_future.result() return mqtt_connection def sub_heavy_callback(topic, payload, dup, qos, retain): logger.info("call heavy func") logger.info(f"topic: {topic}, dup: {dup}, qos: {qos}, retain: {retain}") logger.info(f"payload: {payload.decode('utf-8')}") time.sleep(10) logger.info("end heavy func") def sub_light_callback(topic, payload, dup, qos, retain): logger.info("call light func") logger.info(f"topic: {topic}, dup: {dup}, qos: {qos}, retain: {retain}") logger.info(f"payload: {payload.decode('utf-8')}") logger.info("end light func") def main(): logger.info("start main thread") logger.info("start create mqtt connection") connection = createMQTTConnection( device_certificate_filepath="./data/device_certificate.pem", private_key_filepath="./data/private.key", ca_certificate_filepath="./data/ca_certificate.pem", ) logger.info("end create mqtt connection") subTopic1Publisher = PublishThread(connection, SUB_TOPIC_A) subTopic1Publisher.daemon = True subTopic1Publisher.name = SUB_TOPIC_A subTopic1Publisher.start() subTopic2Publisher = PublishThread(connection, SUB_TOPIC_B) subTopic2Publisher.daemon = True subTopic2Publisher.name = SUB_TOPIC_B subTopic2Publisher.start() connection.subscribe( topic=SUB_TOPIC_A, qos=mqtt.QoS.AT_LEAST_ONCE, callback=sub_heavy_callback ) connection.subscribe( topic=SUB_TOPIC_B, qos=mqtt.QoS.AT_LEAST_ONCE, callback=sub_light_callback ) while True: time.sleep(10) if __name__ == "__main__": main()
わかり辛いので、図にすると以下の通りです。
※ idはスレッドIDを示します。後述する実行ログと合わせて確認してください。
以下の状態を1つのコードで表しています。
- 2つのスレッドが1秒間に1回topicにパブリッシュ
- パブリッシュされた2つのトピックをそれぞれサブスクライブする
- サブスクライブトピック
sub/a
のコールバック関数の処理時間は10秒 - サブスクライブトピック
sub/b
のコールバック関数の処理時間は数ms
- サブスクライブトピック
あくまで問題を再現させるためのコードです。実際には、パブリッシュする仕組みはクラウド側にあると思います。
実行結果のログは以下の通りです。
実行ログ
$ poetry run python3 ./src/mqtt-sub-callback.py 2022-10-09 19:53:26,194 __main__ [INFO] main (3069579328): start main thread 2022-10-09 19:53:26,195 __main__ [INFO] main (3069579328): start create mqtt connection 2022-10-09 19:53:28,141 __main__ [INFO] main (3069579328): end create mqtt connection 2022-10-09 19:53:28,142 __main__ [INFO] run (3038770240): publish sub/a 1665312808 2022-10-09 19:53:28,143 __main__ [INFO] run (3038770240): publish done 2022-10-09 19:53:28,143 __main__ [INFO] run (3028284480): publish sub/b 1665312808 2022-10-09 19:53:28,144 __main__ [INFO] run (3028284480): publish done 2022-10-09 19:53:28,233 __main__ [INFO] sub_heavy_callback (3059283008): call heavy func 2022-10-09 19:53:28,234 __main__ [INFO] sub_heavy_callback (3059283008): topic: sub/a, dup: False, qos: 0, retain: False 2022-10-09 19:53:28,234 __main__ [INFO] sub_heavy_callback (3059283008): payload: {"2407": 1665312808} 2022-10-09 19:53:29,144 __main__ [INFO] run (3038770240): publish sub/a 1665312809 2022-10-09 19:53:29,145 __main__ [INFO] run (3038770240): publish done 2022-10-09 19:53:29,145 __main__ [INFO] run (3028284480): publish sub/b 1665312809 2022-10-09 19:53:29,146 __main__ [INFO] run (3028284480): publish done 2022-10-09 19:53:30,146 __main__ [INFO] run (3038770240): publish sub/a 1665312810 2022-10-09 19:53:30,147 __main__ [INFO] run (3038770240): publish done 2022-10-09 19:53:30,148 __main__ [INFO] run (3028284480): publish sub/b 1665312810 2022-10-09 19:53:30,148 __main__ [INFO] run (3028284480): publish done 2022-10-09 19:53:31,149 __main__ [INFO] run (3038770240): publish sub/a 1665312811 2022-10-09 19:53:31,150 __main__ [INFO] run (3038770240): publish done 2022-10-09 19:53:31,150 __main__ [INFO] run (3028284480): publish sub/b 1665312811 2022-10-09 19:53:31,151 __main__ [INFO] run (3028284480): publish done 2022-10-09 19:53:32,151 __main__ [INFO] run (3038770240): publish sub/a 1665312812 2022-10-09 19:53:32,152 __main__ [INFO] run (3038770240): publish done 2022-10-09 19:53:32,153 __main__ [INFO] run (3028284480): publish sub/b 1665312812 2022-10-09 19:53:32,153 __main__ [INFO] run (3028284480): publish done 2022-10-09 19:53:33,153 __main__ [INFO] run (3038770240): publish sub/a 1665312813 2022-10-09 19:53:33,154 __main__ [INFO] run (3038770240): publish done 2022-10-09 19:53:33,154 __main__ [INFO] run (3028284480): publish sub/b 1665312813 2022-10-09 19:53:33,155 __main__ [INFO] run (3028284480): publish done 2022-10-09 19:53:34,156 __main__ [INFO] run (3038770240): publish sub/a 1665312814 2022-10-09 19:53:34,156 __main__ [INFO] run (3028284480): publish sub/b 1665312814 2022-10-09 19:53:34,157 __main__ [INFO] run (3038770240): publish done 2022-10-09 19:53:34,157 __main__ [INFO] run (3028284480): publish done 2022-10-09 19:53:35,159 __main__ [INFO] run (3038770240): publish sub/a 1665312815 2022-10-09 19:53:35,159 __main__ [INFO] run (3028284480): publish sub/b 1665312815 2022-10-09 19:53:35,160 __main__ [INFO] run (3038770240): publish done 2022-10-09 19:53:35,160 __main__ [INFO] run (3028284480): publish done 2022-10-09 19:53:36,162 __main__ [INFO] run (3028284480): publish sub/b 1665312816 2022-10-09 19:53:36,162 __main__ [INFO] run (3038770240): publish sub/a 1665312816 2022-10-09 19:53:36,163 __main__ [INFO] run (3028284480): publish done 2022-10-09 19:53:36,163 __main__ [INFO] run (3038770240): publish done 2022-10-09 19:53:37,165 __main__ [INFO] run (3028284480): publish sub/b 1665312817 2022-10-09 19:53:37,165 __main__ [INFO] run (3038770240): publish sub/a 1665312817 2022-10-09 19:53:37,166 __main__ [INFO] run (3028284480): publish done 2022-10-09 19:53:37,166 __main__ [INFO] run (3038770240): publish done 2022-10-09 19:53:38,168 __main__ [INFO] run (3028284480): publish sub/b 1665312818 2022-10-09 19:53:38,168 __main__ [INFO] run (3038770240): publish sub/a 1665312818 2022-10-09 19:53:38,169 __main__ [INFO] run (3028284480): publish done 2022-10-09 19:53:38,169 __main__ [INFO] run (3038770240): publish done 2022-10-09 19:53:38,237 __main__ [INFO] sub_heavy_callback (3059283008): end heavy func 2022-10-09 19:53:38,237 __main__ [INFO] sub_light_callback (3059283008): call light func 2022-10-09 19:53:38,238 __main__ [INFO] sub_light_callback (3059283008): topic: sub/b, dup: False, qos: 0, retain: False 2022-10-09 19:53:38,238 __main__ [INFO] sub_light_callback (3059283008): payload: {"2408": 1665312808} 2022-10-09 19:53:38,238 __main__ [INFO] sub_light_callback (3059283008): end light func
ログを元に課題を整理します。
[課題1] コールバック関数実行中に別のサブスクライブトピックのコールバック関数実行がブロックされる
ログの通り、 sub/a
トピックのコールバック関数実行中に sub/b
のコールバック関数の実行がブロックされています。
sub/a
sub/b
トピックへほぼ同時にpublishしています。しかし、sub/aのコールバック関数(実行完了には10秒かかる)のみが実行され、sub/bのコールバック関数は実行されていません。
sub/bのコールバック関数が実行されるのは、sub/aのコールバック関数が終了してからです。
コールバック関数が実行されているスレッドIDが同じことから、内部的にはサブスクライブ時に受け取ったイベントをキューイングして同じスレッドを使い回し、シーケンシャルに実行していることが推測できます。
[課題2] コールバック関数の実行時間がkeep-aliveより長い場合、エッジ側が接続断として検知される
AWS IoTの設定でログを有効化すると(有効化方法は、記事の最後の項.付録
を参照)、AWSIoTLogsV2
にログが出力されます。
実行ログ
{ "timestamp": "2022-10-09 10:53:27.884", "logLevel": "INFO", "traceId": "88fa4463-8119-a0af-13bc-376a26ee8e0d", "accountId": "", "status": "Success", "eventType": "Connect", "protocol": "MQTT", "clientId": "test-thing", "principalId": "b670a35bf5f9640d4cf300d5c6d149fb0fa81dab64cb01eb54bab5812f514925", "sourceIp": "xxx", "sourcePort": 43052 } { "timestamp": "2022-10-09 10:53:28.158", "logLevel": "INFO", "traceId": "560e8bfc-f6b8-567f-90c6-72d0f2bcc2d8", "accountId": "", "status": "Success", "eventType": "Publish-In", "protocol": "MQTT", "topicName": "sub/a", "clientId": "test-thing", "principalId": "b670a35bf5f9640d4cf300d5c6d149fb0fa81dab64cb01eb54bab5812f514925", "sourceIp": "xxx", "sourcePort": 43052 } { "timestamp": "2022-10-09 10:53:28.159", "logLevel": "INFO", "traceId": "758bdeca-2f08-428a-c444-a68bbdb0d313", "accountId": "", "status": "Success", "eventType": "Publish-Out", "protocol": "MQTT", "topicName": "sub/a", "clientId": "test-thing", "principalId": "b670a35bf5f9640d4cf300d5c6d149fb0fa81dab64cb01eb54bab5812f514925", "sourceIp": "xxx", "sourcePort": 43052 } { "timestamp": "2022-10-09 10:53:28.182", "logLevel": "INFO", "traceId": "da12bd57-d32e-9065-6980-c1792991a51a", "accountId": "", "status": "Success", "eventType": "Publish-In", "protocol": "MQTT", "topicName": "sub/b", "clientId": "test-thing", "principalId": "b670a35bf5f9640d4cf300d5c6d149fb0fa81dab64cb01eb54bab5812f514925", "sourceIp": "xxx", "sourcePort": 43052 } { "timestamp": "2022-10-09 10:53:28.183", "logLevel": "INFO", "traceId": "e4432668-b809-84c1-21d9-3cf24baa5a32", "accountId": "", "status": "Success", "eventType": "Publish-Out", "protocol": "MQTT", "topicName": "sub/b", "clientId": "test-thing", "principalId": "b670a35bf5f9640d4cf300d5c6d149fb0fa81dab64cb01eb54bab5812f514925", "sourceIp": "xxx", "sourcePort": 43052 } { "timestamp": "2022-10-09 10:53:28.233", "logLevel": "INFO", "traceId": "11d11c50-4237-5d0d-b776-3ab153870819", "accountId": "", "status": "Success", "eventType": "Subscribe", "protocol": "MQTT", "topicName": "sub/b", "clientId": "test-thing", "principalId": "b670a35bf5f9640d4cf300d5c6d149fb0fa81dab64cb01eb54bab5812f514925", "sourceIp": "xxx", "sourcePort": 43052 } { "timestamp": "2022-10-09 10:53:28.239", "logLevel": "INFO", "traceId": "e2acda51-fdba-ae22-edb1-5bbab217d1a9", "accountId": "", "status": "Success", "eventType": "Subscribe", "protocol": "MQTT", "topicName": "sub/a", "clientId": "test-thing", "principalId": "b670a35bf5f9640d4cf300d5c6d149fb0fa81dab64cb01eb54bab5812f514925", "sourceIp": "xxx", "sourcePort": 43052 } { "timestamp": "2022-10-09 10:53:35.703", "logLevel": "INFO", "traceId": "7cf5e46f-19e5-855e-1276-9da84eee31bc", "accountId": "", "status": "Success", "eventType": "Disconnect", "protocol": "MQTT", "clientId": "test-thing", "principalId": "b670a35bf5f9640d4cf300d5c6d149fb0fa81dab64cb01eb54bab5812f514925", "sourceIp": "xxx", "sourcePort": 43052, "disconnectReason": "MQTT_KEEP_ALIVE_TIMEOUT" } { "timestamp": "2022-10-09 10:53:38.538", "logLevel": "INFO", "traceId": "4e788e77-bce1-abf6-16c5-3e4ceb1febac", "accountId": "", "status": "Success", "eventType": "Connect", "protocol": "MQTT", "clientId": "test-thing", "principalId": "b670a35bf5f9640d4cf300d5c6d149fb0fa81dab64cb01eb54bab5812f514925", "sourceIp": "xxx", "sourcePort": 43102 } { "timestamp": "2022-10-09 10:53:39.183", "logLevel": "INFO", "traceId": "a389304e-4392-acef-3d9f-3c674115a836", "accountId": "", "status": "Success", "eventType": "Publish-In", "protocol": "MQTT", "topicName": "sub/b", "clientId": "test-thing", "principalId": "b670a35bf5f9640d4cf300d5c6d149fb0fa81dab64cb01eb54bab5812f514925", "sourceIp": "123.198.93.31", "sourcePort": 43102 } { "timestamp": "2022-10-09 10:53:39.184", "logLevel": "INFO", "traceId": "8d1d4b31-95ba-7671-d1c9-172acabf0ccb", "accountId": "", "status": "Success", "eventType": "Publish-Out", "protocol": "MQTT", "topicName": "sub/b", "clientId": "test-thing", "principalId": "b670a35bf5f9640d4cf300d5c6d149fb0fa81dab64cb01eb54bab5812f514925", "sourceIp": "123.198.93.31", "sourcePort": 43102 } { "timestamp": "2022-10-09 10:53:39.432", "logLevel": "INFO", "traceId": "89d9bc0c-2c64-d4d4-85dd-50c4fc060a0d", "accountId": "", "status": "Success", "eventType": "Publish-In", "protocol": "MQTT", "topicName": "sub/a", "clientId": "test-thing", "principalId": "b670a35bf5f9640d4cf300d5c6d149fb0fa81dab64cb01eb54bab5812f514925", "sourceIp": "123.198.93.31", "sourcePort": 43102 } { "timestamp": "2022-10-09 10:53:39.434", "logLevel": "INFO", "traceId": "148fe046-adb3-b2a5-4d4e-6d7c5b0c418f", "accountId": "", "status": "Success", "eventType": "Publish-Out", "protocol": "MQTT", "topicName": "sub/a", "clientId": "test-thing", "principalId": "b670a35bf5f9640d4cf300d5c6d149fb0fa81dab64cb01eb54bab5812f514925", "sourceIp": "123.198.93.31", "sourcePort": 43102 } { "timestamp": "2022-10-09 10:53:40.511", "logLevel": "INFO", "traceId": "dd7ec295-0a06-fb83-3c3a-d68309305036", "accountId": "", "status": "Success", "eventType": "Disconnect", "protocol": "MQTT", "clientId": "test-thing", "principalId": "b670a35bf5f9640d4cf300d5c6d149fb0fa81dab64cb01eb54bab5812f514925", "sourceIp": "123.198.93.31", "sourcePort": 43102, "disconnectReason": "CONNECTION_LOST" }
ログを確認すると、以下のことが分かります。
コールバック関数実行中の2022-10-09 10:53:35.703
に Disconnect
(MQTT_KEEP_ALIVE_TIMEOUT
)しています。
前述のソースコードより、keep-aliveは5秒が設定されています。コールバック関数の実行に10秒かかる今回のケースでは、タイムアウトとして検知されます。実際トピックからメッセージを受けて7秒後に検知されていますが、これはポーリング間隔によるズレだと考えられます。
またコールバック関数の実行が完了した2022-10-09 10:53:38.538
に再接続されています。
エッジ側の接続断を検知する仕組みを導入していた場合(AWSドキュメント|ライフサイクルイベント)、本ケースでも予約済みトピックに切断された対象デバイスとして通知されます。
この挙動を最初理解しておらず、エッジ側のネットワークは問題ないはずだが、断続的に接続断が発生するという事象が起きました。こちらもコールバック関数の実行方法を工夫することで対策が可能です。
解決策
表題の通り、サブスクライブ時に実行されるコールバック関数をすぐ終了させ、重たい処理は別スレッドで行います。課題2に関しては、keep-alive以内にコールバック関数が終了すれば解決できます。ただ課題1にアプローチするために以下の方法を利用します。
方法1
メモリが許す範囲で、パブリッシュされたイベントを全て捌きたい場合は、ワーカースレッドを使うのが良いと思います。
ソースコード
import utils.config as Config import utils.logger as Logger from awscrt import io, mqtt from threading import Thread import json import time from concurrent.futures import ThreadPoolExecutor config = Config.getConfig() Logger.init() logger = Logger.getLogger(__name__) AWS_IOT_ENDPOINT = config.IotEndpoint # AWS IoTのサポートするkeep-alive 5 - 1200 s KEEP_ALIVE_SECS = 5 # ms単位なので、KEEP_ALIVE_SECS * 1000 より短い必要がある PING_TIMEOUT_MS = 1000 THING_NAME = "test-thing" SUB_TOPIC_A = "sub/a" SUB_TOPIC_B = "sub/b" class PublishThread(Thread): def __init__(self, mqtt_connection=None, publish_topic_name=None): self.connection = mqtt_connection self.publish_topic_name = publish_topic_name Thread.__init__(self) def run(self): while True: timestamp = int(time.time()) logger.info(f"publish {self.name} {timestamp}") self.connection.publish( topic=self.publish_topic_name, payload=json.dumps({f"{self.native_id}": timestamp}), qos=mqtt.QoS.AT_MOST_ONCE, ) logger.info("publish done") time.sleep(1) def createMQTTConnection( device_certificate_filepath: str, private_key_filepath: str, ca_certificate_filepath: str, ): event_loop_group = io.EventLoopGroup(1) host_resolver = io.DefaultHostResolver(event_loop_group) client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver) client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver) tls_ctx_options = io.TlsContextOptions.create_client_with_mtls_from_path( device_certificate_filepath, private_key_filepath ) tls_ctx_options.override_default_trust_store_from_path( None, ca_certificate_filepath ) tls_ctx = io.ClientTlsContext(tls_ctx_options) mqtt_client = mqtt.Client(client_bootstrap, tls_ctx) mqtt_connection = mqtt.Connection( client=mqtt_client, client_id=THING_NAME, host_name=AWS_IOT_ENDPOINT, port=8883, clean_session=False, keep_alive_secs=KEEP_ALIVE_SECS, ping_timeout_ms=PING_TIMEOUT_MS, ) connect_future = mqtt_connection.connect() connect_future.result() return mqtt_connection def sub_heavy_callback(topic, payload, dup, qos, retain): logger.info("call heavy func") logger.info(f"topic: {topic}, dup: {dup}, qos: {qos}, retain: {retain}") logger.info(f"payload: {payload.decode('utf-8')}") time.sleep(10) logger.info("end heavy func") def sub_light_callback(topic, payload, dup, qos, retain): logger.info("call light func") logger.info(f"topic: {topic}, dup: {dup}, qos: {qos}, retain: {retain}") logger.info(f"payload: {payload.decode('utf-8')}") logger.info("end light func") def main(): logger.info("start main thread") logger.info("start create mqtt connection") connection = createMQTTConnection( device_certificate_filepath="./data/device_certificate.pem", private_key_filepath="./data/private.key", ca_certificate_filepath="./data/ca_certificate.pem", ) logger.info("end create mqtt connection") MAX_WORKER = 3 with ThreadPoolExecutor( max_workers=MAX_WORKER, thread_name_prefix="execSubCallbackThread" ) as executor: subTopic1Publisher = PublishThread(connection, SUB_TOPIC_A) subTopic1Publisher.daemon = True subTopic1Publisher.name = SUB_TOPIC_A subTopic1Publisher.start() subTopic2Publisher = PublishThread(connection, SUB_TOPIC_B) subTopic2Publisher.daemon = True subTopic2Publisher.name = SUB_TOPIC_B subTopic2Publisher.start() def sub_a_callbcak(topic, payload, dup, qos, retain): executor.submit(sub_heavy_callback, topic, payload, dup, qos, retain) def sub_b_callbcak(topic, payload, dup, qos, retain): executor.submit(sub_light_callback, topic, payload, dup, qos, retain) connection.subscribe( topic=SUB_TOPIC_A, qos=mqtt.QoS.AT_LEAST_ONCE, callback=sub_a_callbcak ) connection.subscribe( topic=SUB_TOPIC_B, qos=mqtt.QoS.AT_LEAST_ONCE, callback=sub_b_callbcak ) while True: time.sleep(10) if __name__ == "__main__": main()
当然ですが、指定したワーカースレッドの数を超えた並列数は実行できません。わかりやすいように3つワーカースレッドを立てた場合以下の通りです。
実行ログ
$ poetry run python3 ./src/mqtt-sub-callback-1.py 2022-10-10 11:25:06,299 __main__ [INFO] main [MainThread] (3069313088): start main thread 2022-10-10 11:25:06,299 __main__ [INFO] main [MainThread] (3069313088): start create mqtt connection 2022-10-10 11:25:06,618 __main__ [INFO] main [MainThread] (3069313088): end create mqtt connection 2022-10-10 11:25:06,619 __main__ [INFO] run [sub/a] (3037721664): publish sub/a 1665368706 2022-10-10 11:25:06,619 __main__ [INFO] run [sub/a] (3037721664): publish done 2022-10-10 11:25:06,620 __main__ [INFO] run [sub/b] (3027235904): publish sub/b 1665368706 2022-10-10 11:25:06,621 __main__ [INFO] run [sub/b] (3027235904): publish done 2022-10-10 11:25:06,655 __main__ [INFO] sub_heavy_callback [execSubCallbackThread_0] (3016750144): call heavy func 2022-10-10 11:25:06,655 __main__ [INFO] sub_heavy_callback [execSubCallbackThread_0] (3016750144): topic: sub/a, dup: False, qos: 0, retain: False 2022-10-10 11:25:06,655 __main__ [INFO] sub_heavy_callback [execSubCallbackThread_0] (3016750144): payload: {"6556": 1665368706} 2022-10-10 11:25:06,681 __main__ [INFO] sub_light_callback [execSubCallbackThread_1] (3006264384): call light func 2022-10-10 11:25:06,681 __main__ [INFO] sub_light_callback [execSubCallbackThread_1] (3006264384): topic: sub/b, dup: False, qos: 0, retain: False 2022-10-10 11:25:06,682 __main__ [INFO] sub_light_callback [execSubCallbackThread_1] (3006264384): payload: {"6557": 1665368706} 2022-10-10 11:25:06,682 __main__ [INFO] sub_light_callback [execSubCallbackThread_1] (3006264384): end light func 2022-10-10 11:25:07,621 __main__ [INFO] run [sub/a] (3037721664): publish sub/a 1665368707 2022-10-10 11:25:07,622 __main__ [INFO] run [sub/a] (3037721664): publish done 2022-10-10 11:25:07,622 __main__ [INFO] run [sub/b] (3027235904): publish sub/b 1665368707 2022-10-10 11:25:07,623 __main__ [INFO] run [sub/b] (3027235904): publish done 2022-10-10 11:25:07,645 __main__ [INFO] sub_heavy_callback [execSubCallbackThread_1] (3006264384): call heavy func 2022-10-10 11:25:07,645 __main__ [INFO] sub_heavy_callback [execSubCallbackThread_1] (3006264384): topic: sub/a, dup: False, qos: 0, retain: False 2022-10-10 11:25:07,646 __main__ [INFO] sub_heavy_callback [execSubCallbackThread_1] (3006264384): payload: {"6556": 1665368707} 2022-10-10 11:25:07,673 __main__ [INFO] sub_light_callback [execSubCallbackThread_2] (2995778624): call light func 2022-10-10 11:25:07,674 __main__ [INFO] sub_light_callback [execSubCallbackThread_2] (2995778624): topic: sub/b, dup: False, qos: 0, retain: False 2022-10-10 11:25:07,674 __main__ [INFO] sub_light_callback [execSubCallbackThread_2] (2995778624): payload: {"6557": 1665368707} 2022-10-10 11:25:07,674 __main__ [INFO] sub_light_callback [execSubCallbackThread_2] (2995778624): end light func 2022-10-10 11:25:08,623 __main__ [INFO] run [sub/a] (3037721664): publish sub/a 1665368708 2022-10-10 11:25:08,624 __main__ [INFO] run [sub/a] (3037721664): publish done 2022-10-10 11:25:08,624 __main__ [INFO] run [sub/b] (3027235904): publish sub/b 1665368708 2022-10-10 11:25:08,625 __main__ [INFO] run [sub/b] (3027235904): publish done 2022-10-10 11:25:08,653 __main__ [INFO] sub_heavy_callback [execSubCallbackThread_2] (2995778624): call heavy func 2022-10-10 11:25:08,654 __main__ [INFO] sub_heavy_callback [execSubCallbackThread_2] (2995778624): topic: sub/a, dup: False, qos: 0, retain: False 2022-10-10 11:25:08,654 __main__ [INFO] sub_heavy_callback [execSubCallbackThread_2] (2995778624): payload: {"6556": 1665368708} 2022-10-10 11:25:09,625 __main__ [INFO] run [sub/a] (3037721664): publish sub/a 1665368709 2022-10-10 11:25:09,626 __main__ [INFO] run [sub/a] (3037721664): publish done 2022-10-10 11:25:09,626 __main__ [INFO] run [sub/b] (3027235904): publish sub/b 1665368709 2022-10-10 11:25:09,627 __main__ [INFO] run [sub/b] (3027235904): publish done 2022-10-10 11:25:10,627 __main__ [INFO] run [sub/a] (3037721664): publish sub/a 1665368710 2022-10-10 11:25:10,628 __main__ [INFO] run [sub/b] (3027235904): publish sub/b 1665368710 2022-10-10 11:25:10,628 __main__ [INFO] run [sub/a] (3037721664): publish done 2022-10-10 11:25:10,629 __main__ [INFO] run [sub/b] (3027235904): publish done 2022-10-10 11:25:11,631 __main__ [INFO] run [sub/a] (3037721664): publish sub/a 1665368711 2022-10-10 11:25:11,632 __main__ [INFO] run [sub/b] (3027235904): publish sub/b 1665368711 2022-10-10 11:25:11,633 __main__ [INFO] run [sub/a] (3037721664): publish done 2022-10-10 11:25:11,634 __main__ [INFO] run [sub/b] (3027235904): publish done 2022-10-10 11:25:12,635 __main__ [INFO] run [sub/a] (3037721664): publish sub/a 1665368712 2022-10-10 11:25:12,636 __main__ [INFO] run [sub/b] (3027235904): publish sub/b 1665368712 2022-10-10 11:25:12,636 __main__ [INFO] run [sub/a] (3037721664): publish done 2022-10-10 11:25:12,637 __main__ [INFO] run [sub/b] (3027235904): publish done 2022-10-10 11:25:13,639 __main__ [INFO] run [sub/a] (3037721664): publish sub/a 1665368713 2022-10-10 11:25:13,640 __main__ [INFO] run [sub/b] (3027235904): publish sub/b 1665368713 2022-10-10 11:25:13,640 __main__ [INFO] run [sub/a] (3037721664): publish done 2022-10-10 11:25:13,641 __main__ [INFO] run [sub/b] (3027235904): publish done 2022-10-10 11:25:14,642 __main__ [INFO] run [sub/a] (3037721664): publish sub/a 1665368714 2022-10-10 11:25:14,643 __main__ [INFO] run [sub/b] (3027235904): publish sub/b 1665368714 2022-10-10 11:25:14,643 __main__ [INFO] run [sub/a] (3037721664): publish done 2022-10-10 11:25:14,644 __main__ [INFO] run [sub/b] (3027235904): publish done 2022-10-10 11:25:15,646 __main__ [INFO] run [sub/a] (3037721664): publish sub/a 1665368715 2022-10-10 11:25:15,646 __main__ [INFO] run [sub/b] (3027235904): publish sub/b 1665368715 2022-10-10 11:25:15,646 __main__ [INFO] run [sub/a] (3037721664): publish done 2022-10-10 11:25:15,647 __main__ [INFO] run [sub/b] (3027235904): publish done 2022-10-10 11:25:16,649 __main__ [INFO] run [sub/a] (3037721664): publish sub/a 1665368716 2022-10-10 11:25:16,649 __main__ [INFO] run [sub/b] (3027235904): publish sub/b 1665368716 2022-10-10 11:25:16,650 __main__ [INFO] run [sub/a] (3037721664): publish done 2022-10-10 11:25:16,651 __main__ [INFO] run [sub/b] (3027235904): publish done 2022-10-10 11:25:16,657 __main__ [INFO] sub_heavy_callback [execSubCallbackThread_0] (3016750144): end heavy func 2022-10-10 11:25:16,658 __main__ [INFO] sub_light_callback [execSubCallbackThread_0] (3016750144): call light func 2022-10-10 11:25:16,658 __main__ [INFO] sub_light_callback [execSubCallbackThread_0] (3016750144): topic: sub/b, dup: False, qos: 0, retain: False 2022-10-10 11:25:16,659 __main__ [INFO] sub_light_callback [execSubCallbackThread_0] (3016750144): payload: {"6557": 1665368708} 2022-10-10 11:25:16,659 __main__ [INFO] sub_light_callback [execSubCallbackThread_0] (3016750144): end light func 2022-10-10 11:25:16,659 __main__ [INFO] sub_heavy_callback [execSubCallbackThread_0] (3016750144): call heavy func 2022-10-10 11:25:16,659 __main__ [INFO] sub_heavy_callback [execSubCallbackThread_0] (3016750144): topic: sub/a, dup: False, qos: 0, retain: False 2022-10-10 11:25:16,660 __main__ [INFO] sub_heavy_callback [execSubCallbackThread_0] (3016750144): payload: {"6556": 1665368709}
sub/a
sub/b
両方ともpublishされたメッセージを捌けていますが、3つのスレッドが使用中になると後続のコールバック関数の実行はブロックされます。
ワーカスレッドが空き次第、次のタスクがスケジュールされます。
方法2
前述のやり方ですと、負荷を読みきれなかったケースでキューが伸びてメモリ枯渇->最悪OOM Killしてしまうケースがあります。サブスクライブするトピック毎にスレッドを立てて、処理中は後続のイベントは捨てることが許容できれば以下のような実装でも良いと思います。
ソースコード
import json import time from threading import Thread from awscrt import io, mqtt import utils.config as Config import utils.logger as Logger config = Config.getConfig() Logger.init() logger = Logger.getLogger(__name__) AWS_IOT_ENDPOINT = config.IotEndpoint # AWS IoTのサポートするkeep-alive 5 - 1200 s KEEP_ALIVE_SECS = 5 # ms単位なので、KEEP_ALIVE_SECS * 1000 より短い必要がある PING_TIMEOUT_MS = 1000 THING_NAME = "test-thing" SUB_TOPIC_A = "sub/a" SUB_TOPIC_B = "sub/b" class PublishThread(Thread): def __init__(self, mqtt_connection=None, publish_topic_name=None): self.connection = mqtt_connection self.publish_topic_name = publish_topic_name Thread.__init__(self) def run(self): while True: timestamp = int(time.time()) logger.info(f"publish {self.name} {timestamp}") self.connection.publish( topic=self.publish_topic_name, payload=json.dumps({f"{self.native_id}": timestamp}), qos=mqtt.QoS.AT_MOST_ONCE, ) logger.info("publish done") time.sleep(1) def createMQTTConnection( device_certificate_filepath: str, private_key_filepath: str, ca_certificate_filepath: str, ): event_loop_group = io.EventLoopGroup(1) host_resolver = io.DefaultHostResolver(event_loop_group) client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver) client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver) tls_ctx_options = io.TlsContextOptions.create_client_with_mtls_from_path( device_certificate_filepath, private_key_filepath ) tls_ctx_options.override_default_trust_store_from_path( None, ca_certificate_filepath ) tls_ctx = io.ClientTlsContext(tls_ctx_options) mqtt_client = mqtt.Client(client_bootstrap, tls_ctx) mqtt_connection = mqtt.Connection( client=mqtt_client, client_id=THING_NAME, host_name=AWS_IOT_ENDPOINT, port=8883, clean_session=False, keep_alive_secs=KEEP_ALIVE_SECS, ping_timeout_ms=PING_TIMEOUT_MS, ) connect_future = mqtt_connection.connect() connect_future.result() return mqtt_connection def sub_heavy_callback(topic, payload, dup, qos, retain): logger.info("call heavy func") logger.info(f"topic: {topic}, dup: {dup}, qos: {qos}, retain: {retain}") logger.info(f"payload: {payload.decode('utf-8')}") time.sleep(10) logger.info("end heavy func") def sub_light_callback(topic, payload, dup, qos, retain): logger.info("call light func") logger.info(f"topic: {topic}, dup: {dup}, qos: {qos}, retain: {retain}") logger.info(f"payload: {payload.decode('utf-8')}") logger.info("end light func") class SubExecThread(Thread): def __init__(self, callback): self.__is_using = False self.__callback = callback self.__args = [] Thread.__init__(self) def add_task(self, topic, payload, dup, qos, retain): if self.__is_using is False: self.__is_using = True self.__args = [topic, payload, dup, qos, retain] else: logger.info( f"skip event: topic: {topic}, dup: {dup}, qos: {qos}, retain: {retain}, payload: {payload.decode('utf-8')}" ) def run(self): while True: if self.__is_using is True: logger.info(self.__args) self.__callback( topic=self.__args[0], payload=self.__args[1], dup=self.__args[2], qos=self.__args[3], retain=self.__args[4], ) self.__is_using = False time.sleep(0.1) def main(): logger.info("start main thread") logger.info("start create mqtt connection") connection = createMQTTConnection( device_certificate_filepath="./data/device_certificate.pem", private_key_filepath="./data/private.key", ca_certificate_filepath="./data/ca_certificate.pem", ) logger.info("end create mqtt connection") subTopic1Publisher = PublishThread(connection, SUB_TOPIC_A) subTopic1Publisher.daemon = True subTopic1Publisher.name = SUB_TOPIC_A subTopic1Publisher.start() subTopic2Publisher = PublishThread(connection, SUB_TOPIC_B) subTopic2Publisher.daemon = True subTopic2Publisher.name = SUB_TOPIC_B subTopic2Publisher.start() subaExecThread = SubExecThread(sub_heavy_callback) subaExecThread.daemon = True subaExecThread.name = "subaExecThread" subaExecThread.start() subbExecThread = SubExecThread(sub_light_callback) subbExecThread.daemon = True subbExecThread.name = "subbExecThread" subbExecThread.start() connection.subscribe( topic=SUB_TOPIC_A, qos=mqtt.QoS.AT_LEAST_ONCE, callback=subaExecThread.add_task, ) connection.subscribe( topic=SUB_TOPIC_B, qos=mqtt.QoS.AT_LEAST_ONCE, callback=subbExecThread.add_task, ) while True: time.sleep(10) if __name__ == "__main__": main()
実行ログは以下の通りです。
実行ログ
$ poetry run python3 ./src/mqtt-sub-callback-2.py 2022-10-10 12:17:57,269 __main__ [INFO] main [MainThread] (3069907008): start main thread 2022-10-10 12:17:57,270 __main__ [INFO] main [MainThread] (3069907008): start create mqtt connection 2022-10-10 12:17:57,577 __main__ [INFO] main [MainThread] (3069907008): end create mqtt connection 2022-10-10 12:17:57,578 __main__ [INFO] run [sub/a] (3038770240): publish sub/a 1665371877 2022-10-10 12:17:57,579 __main__ [INFO] run [sub/a] (3038770240): publish done 2022-10-10 12:17:57,579 __main__ [INFO] run [sub/b] (3028284480): publish sub/b 1665371877 2022-10-10 12:17:57,581 __main__ [INFO] run [sub/b] (3028284480): publish done 2022-10-10 12:17:57,680 __main__ [INFO] sub_heavy_callback [subaExecThread] (3017798720): call heavy func 2022-10-10 12:17:57,681 __main__ [INFO] sub_heavy_callback [subaExecThread] (3017798720): topic: sub/a, dup: False, qos: 0, retain: False 2022-10-10 12:17:57,681 __main__ [INFO] sub_heavy_callback [subaExecThread] (3017798720): payload: {"8168": 1665371877} 2022-10-10 12:17:57,681 __main__ [INFO] sub_light_callback [subbExecThread] (3007312960): call light func 2022-10-10 12:17:57,682 __main__ [INFO] sub_light_callback [subbExecThread] (3007312960): topic: sub/b, dup: False, qos: 0, retain: False 2022-10-10 12:17:57,682 __main__ [INFO] sub_light_callback [subbExecThread] (3007312960): payload: {"8169": 1665371877} 2022-10-10 12:17:57,682 __main__ [INFO] sub_light_callback [subbExecThread] (3007312960): end light func 2022-10-10 12:17:58,581 __main__ [INFO] run [sub/a] (3038770240): publish sub/a 1665371878 2022-10-10 12:17:58,581 __main__ [INFO] run [sub/a] (3038770240): publish done 2022-10-10 12:17:58,582 __main__ [INFO] run [sub/b] (3028284480): publish sub/b 1665371878 2022-10-10 12:17:58,582 __main__ [INFO] run [sub/b] (3028284480): publish done 2022-10-10 12:17:58,628 __main__ [INFO] add_task [Dummy-5] (3059610688): skip event: topic: sub/a, dup: False, qos: 0, retain: False, payload: {"8168": 1665371878} 2022-10-10 12:17:58,684 __main__ [INFO] sub_light_callback [subbExecThread] (3007312960): call light func 2022-10-10 12:17:58,684 __main__ [INFO] sub_light_callback [subbExecThread] (3007312960): topic: sub/b, dup: False, qos: 0, retain: False 2022-10-10 12:17:58,685 __main__ [INFO] sub_light_callback [subbExecThread] (3007312960): payload: {"8169": 1665371878} 2022-10-10 12:17:58,685 __main__ [INFO] sub_light_callback [subbExecThread] (3007312960): end light func 2022-10-10 12:18:07,597 __main__ [INFO] run [sub/a] (3038770240): publish sub/a 1665371887 (中略) 2022-10-10 12:18:07,598 __main__ [INFO] run [sub/a] (3038770240): publish done 2022-10-10 12:18:07,599 __main__ [INFO] run [sub/b] (3028284480): publish sub/b 1665371887 2022-10-10 12:18:07,600 __main__ [INFO] run [sub/b] (3028284480): publish done 2022-10-10 12:18:07,635 __main__ [INFO] add_task [Dummy-5] (3059610688): skip event: topic: sub/a, dup: False, qos: 0, retain: False, payload: {"8168": 1665371887} 2022-10-10 12:18:07,681 __main__ [INFO] sub_heavy_callback [subaExecThread] (3017798720): end heavy func 2022-10-10 12:18:07,708 __main__ [INFO] sub_light_callback [subbExecThread] (3007312960): call light func 2022-10-10 12:18:07,709 __main__ [INFO] sub_light_callback [subbExecThread] (3007312960): topic: sub/b, dup: False, qos: 0, retain: False 2022-10-10 12:18:07,709 __main__ [INFO] sub_light_callback [subbExecThread] (3007312960): payload: {"8169": 1665371887} 2022-10-10 12:18:07,709 __main__ [INFO] sub_light_callback [subbExecThread] (3007312960): end light func 2022-10-10 12:18:08,599 __main__ [INFO] run [sub/a] (3038770240): publish sub/a 1665371888 2022-10-10 12:18:08,600 __main__ [INFO] run [sub/a] (3038770240): publish done 2022-10-10 12:18:08,601 __main__ [INFO] run [sub/b] (3028284480): publish sub/b 1665371888 2022-10-10 12:18:08,602 __main__ [INFO] run [sub/b] (3028284480): publish done 2022-10-10 12:18:08,683 __main__ [INFO] sub_heavy_callback [subaExecThread] (3017798720): call heavy func 2022-10-10 12:18:08,684 __main__ [INFO] sub_heavy_callback [subaExecThread] (3017798720): topic: sub/a, dup: False, qos: 0, retain: False 2022-10-10 12:18:08,684 __main__ [INFO] sub_heavy_callback [subaExecThread] (3017798720): payload: {"8168": 1665371888} 2022-10-10 12:18:08,911 __main__ [INFO] sub_light_callback [subbExecThread] (3007312960): call light func 2022-10-10 12:18:08,912 __main__ [INFO] sub_light_callback [subbExecThread] (3007312960): topic: sub/b, dup: False, qos: 0, retain: False 2022-10-10 12:18:08,912 __main__ [INFO] sub_light_callback [subbExecThread] (3007312960): payload: {"8169": 1665371888} 2022-10-10 12:18:08,912 __main__ [INFO] sub_light_callback [subbExecThread] (3007312960): end light func
本コードの場合、サブスクライブスレッド毎にコールバック関数の実行が分離されているため、sub/a
のイベントが詰まっても sub/b
のイベントを正常に処理できます。
また実行中はイベントを捨てることで、キューが伸びることもありません。
以上の通り、実現したい要件や利用できるメモリ量を考慮して方法を検討して見ると良いかもしれません。方法1と方法2の良いところを組み合わせて利用するのも良いと思います。
オフラインパブリッシュ対策をする(QoS=1の場合)
以下の条件でパブリッシュした場合、パブリッシュした内容を送信元でもキャンセルすることはできません。
- オフライン状態(AWS IoTエンドポイントに到達できない状態を指す)
- aws-iot-device-sdk-python-v2を利用
- QoS=1でパブリッシュ
再接続時にオフライン時にパブリッシュした内容が一斉に再送されます。QoS=1で送信しているのでPUBACK
が返却されるまで再送し続けるので当然かもしれませんが、送信側で滞留したメッセージの対策が必要です。
オフラインパブリッシュ仕様に関しては、issueとしてconfigure offline publishing in v2が起票されています。v1であるaws-iot-device-sdk-pythonでは詳細に設定できたことが読み取れます。オフラインパブリッシュ仕様に関してはv1のソースを読めば別の回避策が見つかるかもしれません。
事象/再現コード
コードは以下の通りです。
ソースコード
import json import time from threading import Thread from concurrent import futures from awscrt import io, mqtt import utils.config as Config import utils.logger as Logger config = Config.getConfig() Logger.init() logger = Logger.getLogger(__name__) AWS_IOT_ENDPOINT = config.IotEndpoint KEEP_ALIVE_SECS = 5 PING_TIMEOUT_MS = 1000 THING_NAME = "test-thing" SUB_TOPIC_A = "sub/a" class PublishThread(Thread): def __init__(self, mqtt_connection=None, publish_topic_name=None): self.connection = mqtt_connection self.publish_topic_name = publish_topic_name Thread.__init__(self) def run(self): while True: try: timestamp = int(time.time()) logger.info(f"publish {self.name} {timestamp}") publish_future, _ = self.connection.publish( topic=self.publish_topic_name, payload=json.dumps({f"{self.native_id}": timestamp}), qos=mqtt.QoS.AT_LEAST_ONCE, ) logger.info("publish done") time.sleep(1) except Exception as e: logger.info(f"publish error: {e}", exc_info=True) def sub_callback(topic, payload, dup, qos, retain): logger.info("call sub_callback") logger.info(f"topic: {topic}, dup: {dup}, qos: {qos}, retain: {retain}") logger.info(f"payload: {payload.decode('utf-8')}") logger.info("end sub_callback") def createMQTTConnection( device_certificate_filepath: str, private_key_filepath: str, ca_certificate_filepath: str, ): event_loop_group = io.EventLoopGroup(1) host_resolver = io.DefaultHostResolver(event_loop_group) client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver) client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver) tls_ctx_options = io.TlsContextOptions.create_client_with_mtls_from_path( device_certificate_filepath, private_key_filepath ) tls_ctx_options.override_default_trust_store_from_path( None, ca_certificate_filepath ) tls_ctx = io.ClientTlsContext(tls_ctx_options) mqtt_client = mqtt.Client(client_bootstrap, tls_ctx) mqtt_connection = mqtt.Connection( client=mqtt_client, client_id=THING_NAME, host_name=AWS_IOT_ENDPOINT, port=8883, clean_session=False, keep_alive_secs=KEEP_ALIVE_SECS, ping_timeout_ms=PING_TIMEOUT_MS, ) connect_future = mqtt_connection.connect() connect_future.result() return mqtt_connection def main(): logger.info("start main thread") logger.info("start create mqtt connection") connection = createMQTTConnection( device_certificate_filepath="./data/device_certificate.pem", private_key_filepath="./data/private.key", ca_certificate_filepath="./data/ca_certificate.pem", ) logger.info("end create mqtt connection") subTopic1Publisher = PublishThread(connection, SUB_TOPIC_A) subTopic1Publisher.daemon = True subTopic1Publisher.name = SUB_TOPIC_A subTopic1Publisher.start() connection.subscribe( topic=SUB_TOPIC_A, qos=mqtt.QoS.AT_LEAST_ONCE, callback=sub_callback ) while True: time.sleep(10) if __name__ == "__main__": main()
ネットワークをOFFしタイムスタンプを出力するコマンドは以下の通りです。
# Raspberry Pi OSの場合 sudo ifconfig wlan0 down;date +%s # Macの場合 networksetup -SetAirportPower en0 off;date +%s
実行ログ
wifiをOFF->ONしたタイムスタンプ
$ networksetup -SetAirportPower en0 off;date +%s 1665442923 $ networksetup -SetAirportPower en0 on;date +%s 1665442931
ソースのログ
$ poetry run python3 ./src/offline-publish.py 2022-10-11 08:02:01,255 __main__ [INFO] main [MainThread] (4335748480): start main thread 2022-10-11 08:02:01,255 __main__ [INFO] main [MainThread] (4335748480): start create mqtt connection 2022-10-11 08:02:01,583 __main__ [INFO] main [MainThread] (4335748480): end create mqtt connection 2022-10-11 08:02:01,583 __main__ [INFO] run [sub/a] (6153482240): publish sub/a 1665442921 2022-10-11 08:02:01,584 __main__ [INFO] run [sub/a] (6153482240): publish done 2022-10-11 08:02:01,641 __main__ [INFO] sub_callback [Dummy-2] (6134935552): call sub_callback 2022-10-11 08:02:01,641 __main__ [INFO] sub_callback [Dummy-2] (6134935552): topic: sub/a, dup: False, qos: 1, retain: False 2022-10-11 08:02:01,641 __main__ [INFO] sub_callback [Dummy-2] (6134935552): payload: {"2000558": 1665442921} 2022-10-11 08:02:01,641 __main__ [INFO] sub_callback [Dummy-2] (6134935552): end sub_callback 2022-10-11 08:02:02,589 __main__ [INFO] run [sub/a] (6153482240): publish sub/a 1665442922 2022-10-11 08:02:02,589 __main__ [INFO] run [sub/a] (6153482240): publish done 2022-10-11 08:02:02,647 __main__ [INFO] sub_callback [Dummy-2] (6134935552): call sub_callback 2022-10-11 08:02:02,647 __main__ [INFO] sub_callback [Dummy-2] (6134935552): topic: sub/a, dup: False, qos: 1, retain: False 2022-10-11 08:02:02,647 __main__ [INFO] sub_callback [Dummy-2] (6134935552): payload: {"2000558": 1665442922} 2022-10-11 08:02:02,647 __main__ [INFO] sub_callback [Dummy-2] (6134935552): end sub_callback 2022-10-11 08:02:03,594 __main__ [INFO] run [sub/a] (6153482240): publish sub/a 1665442923 2022-10-11 08:02:03,595 __main__ [INFO] run [sub/a] (6153482240): publish done 2022-10-11 08:02:04,599 __main__ [INFO] run [sub/a] (6153482240): publish sub/a 1665442924 2022-10-11 08:02:04,599 __main__ [INFO] run [sub/a] (6153482240): publish done 2022-10-11 08:02:05,600 __main__ [INFO] run [sub/a] (6153482240): publish sub/a 1665442925 2022-10-11 08:02:05,600 __main__ [INFO] run [sub/a] (6153482240): publish done 2022-10-11 08:02:06,603 __main__ [INFO] run [sub/a] (6153482240): publish sub/a 1665442926 2022-10-11 08:02:06,603 __main__ [INFO] run [sub/a] (6153482240): publish done 2022-10-11 08:02:07,609 __main__ [INFO] run [sub/a] (6153482240): publish sub/a 1665442927 2022-10-11 08:02:07,609 __main__ [INFO] run [sub/a] (6153482240): publish done 2022-10-11 08:02:08,613 __main__ [INFO] run [sub/a] (6153482240): publish sub/a 1665442928 2022-10-11 08:02:08,613 __main__ [INFO] run [sub/a] (6153482240): publish done 2022-10-11 08:02:09,618 __main__ [INFO] run [sub/a] (6153482240): publish sub/a 1665442929 2022-10-11 08:02:09,619 __main__ [INFO] run [sub/a] (6153482240): publish done 2022-10-11 08:02:10,624 __main__ [INFO] run [sub/a] (6153482240): publish sub/a 1665442930 2022-10-11 08:02:10,624 __main__ [INFO] run [sub/a] (6153482240): publish done 2022-10-11 08:02:11,629 __main__ [INFO] run [sub/a] (6153482240): publish sub/a 1665442931 2022-10-11 08:02:11,629 __main__ [INFO] run [sub/a] (6153482240): publish done 2022-10-11 08:02:12,635 __main__ [INFO] run [sub/a] (6153482240): publish sub/a 1665442932 2022-10-11 08:02:12,635 __main__ [INFO] run [sub/a] (6153482240): publish done 2022-10-11 08:02:13,019 __main__ [INFO] sub_callback [Dummy-2] (6134935552): call sub_callback 2022-10-11 08:02:13,019 __main__ [INFO] sub_callback [Dummy-2] (6134935552): topic: sub/a, dup: False, qos: 1, retain: False 2022-10-11 08:02:13,019 __main__ [INFO] sub_callback [Dummy-2] (6134935552): payload: {"2000558": 1665442931} 2022-10-11 08:02:13,019 __main__ [INFO] sub_callback [Dummy-2] (6134935552): end sub_callback 2022-10-11 08:02:13,019 __main__ [INFO] sub_callback [Dummy-2] (6134935552): call sub_callback 2022-10-11 08:02:13,019 __main__ [INFO] sub_callback [Dummy-2] (6134935552): topic: sub/a, dup: False, qos: 1, retain: False 2022-10-11 08:02:13,019 __main__ [INFO] sub_callback [Dummy-2] (6134935552): payload: {"2000558": 1665442924} 2022-10-11 08:02:13,019 __main__ [INFO] sub_callback [Dummy-2] (6134935552): end sub_callback 2022-10-11 08:02:13,019 __main__ [INFO] sub_callback [Dummy-2] (6134935552): call sub_callback 2022-10-11 08:02:13,019 __main__ [INFO] sub_callback [Dummy-2] (6134935552): topic: sub/a, dup: False, qos: 1, retain: False 2022-10-11 08:02:13,019 __main__ [INFO] sub_callback [Dummy-2] (6134935552): payload: {"2000558": 1665442932} 2022-10-11 08:02:13,019 __main__ [INFO] sub_callback [Dummy-2] (6134935552): end sub_callback 2022-10-11 08:02:13,019 __main__ [INFO] sub_callback [Dummy-2] (6134935552): call sub_callback 2022-10-11 08:02:13,019 __main__ [INFO] sub_callback [Dummy-2] (6134935552): topic: sub/a, dup: False, qos: 1, retain: False 2022-10-11 08:02:13,019 __main__ [INFO] sub_callback [Dummy-2] (6134935552): payload: {"2000558": 1665442928} 2022-10-11 08:02:13,020 __main__ [INFO] sub_callback [Dummy-2] (6134935552): end sub_callback 2022-10-11 08:02:13,024 __main__ [INFO] sub_callback [Dummy-2] (6134935552): call sub_callback 2022-10-11 08:02:13,024 __main__ [INFO] sub_callback [Dummy-2] (6134935552): topic: sub/a, dup: False, qos: 1, retain: False 2022-10-11 08:02:13,024 __main__ [INFO] sub_callback [Dummy-2] (6134935552): payload: {"2000558": 1665442926} 2022-10-11 08:02:13,024 __main__ [INFO] sub_callback [Dummy-2] (6134935552): end sub_callback 2022-10-11 08:02:13,025 __main__ [INFO] sub_callback [Dummy-2] (6134935552): call sub_callback 2022-10-11 08:02:13,025 __main__ [INFO] sub_callback [Dummy-2] (6134935552): topic: sub/a, dup: False, qos: 1, retain: False 2022-10-11 08:02:13,025 __main__ [INFO] sub_callback [Dummy-2] (6134935552): payload: {"2000558": 1665442925} 2022-10-11 08:02:13,025 __main__ [INFO] sub_callback [Dummy-2] (6134935552): end sub_callback 2022-10-11 08:02:13,028 __main__ [INFO] sub_callback [Dummy-2] (6134935552): call sub_callback 2022-10-11 08:02:13,028 __main__ [INFO] sub_callback [Dummy-2] (6134935552): topic: sub/a, dup: False, qos: 1, retain: False 2022-10-11 08:02:13,028 __main__ [INFO] sub_callback [Dummy-2] (6134935552): payload: {"2000558": 1665442930} 2022-10-11 08:02:13,028 __main__ [INFO] sub_callback [Dummy-2] (6134935552): end sub_callback 2022-10-11 08:02:13,029 __main__ [INFO] sub_callback [Dummy-2] (6134935552): call sub_callback 2022-10-11 08:02:13,029 __main__ [INFO] sub_callback [Dummy-2] (6134935552): topic: sub/a, dup: False, qos: 1, retain: False 2022-10-11 08:02:13,029 __main__ [INFO] sub_callback [Dummy-2] (6134935552): payload: {"2000558": 1665442923} 2022-10-11 08:02:13,030 __main__ [INFO] sub_callback [Dummy-2] (6134935552): end sub_callback 2022-10-11 08:02:13,031 __main__ [INFO] sub_callback [Dummy-2] (6134935552): call sub_callback 2022-10-11 08:02:13,031 __main__ [INFO] sub_callback [Dummy-2] (6134935552): topic: sub/a, dup: False, qos: 1, retain: False 2022-10-11 08:02:13,031 __main__ [INFO] sub_callback [Dummy-2] (6134935552): payload: {"2000558": 1665442927} 2022-10-11 08:02:13,031 __main__ [INFO] sub_callback [Dummy-2] (6134935552): end sub_callback 2022-10-11 08:02:13,033 __main__ [INFO] sub_callback [Dummy-2] (6134935552): call sub_callback 2022-10-11 08:02:13,034 __main__ [INFO] sub_callback [Dummy-2] (6134935552): topic: sub/a, dup: False, qos: 1, retain: False 2022-10-11 08:02:13,034 __main__ [INFO] sub_callback [Dummy-2] (6134935552): payload: {"2000558": 1665442929} 2022-10-11 08:02:13,034 __main__ [INFO] sub_callback [Dummy-2] (6134935552): end sub_callback 2022-10-11 08:02:13,640 __main__ [INFO] run [sub/a] (6153482240): publish sub/a 1665442933 2022-10-11 08:02:13,641 __main__ [INFO] run [sub/a] (6153482240): publish done 2022-10-11 08:02:13,708 __main__ [INFO] sub_callback [Dummy-2] (6134935552): call sub_callback 2022-10-11 08:02:13,708 __main__ [INFO] sub_callback [Dummy-2] (6134935552): topic: sub/a, dup: False, qos: 1, retain: False 2022-10-11 08:02:13,708 __main__ [INFO] sub_callback [Dummy-2] (6134935552): payload: {"2000558": 1665442933} 2022-10-11 08:02:13,708 __main__ [INFO] sub_callback [Dummy-2] (6134935552): end sub_callback 2022-10-11 08:02:14,646 __main__ [INFO] run [sub/a] (6153482240): publish sub/a 1665442934 2022-10-11 08:02:14,646 __main__ [INFO] run [sub/a] (6153482240): publish done 2022-10-11 08:02:14,712 __main__ [INFO] sub_callback [Dummy-2] (6134935552): call sub_callback 2022-10-11 08:02:14,712 __main__ [INFO] sub_callback [Dummy-2] (6134935552): topic: sub/a, dup: False, qos: 1, retain: False 2022-10-11 08:02:14,712 __main__ [INFO] sub_callback [Dummy-2] (6134935552): payload: {"2000558": 1665442934} 2022-10-11 08:02:14,712 __main__ [INFO] sub_callback [Dummy-2] (6134935552): end sub_callback 2022-10-11 08:02:15,649 __main__ [INFO] run [sub/a] (6153482240): publish sub/a 1665442935 2022-10-11 08:02:15,650 __main__ [INFO] run [sub/a] (6153482240): publish done
ログより、ネットワークが復旧したと思われる08:02:13,019
にオフライン時にパブリッシュした1665442923
から1665442932
のペイロードがほぼ同時に順不同でパブリッシュされていることが分かります。
これは一定時間大量のエッジ側でQoS=1でオフラインパブリッシュが発生した場合、再接続時にクラウド側のリソースのスロットリングが発生する可能性があり、対策が必要です。
解決策
解決策としては以下の通りです。
- QoS=0で送信する
- (負荷が許容できる場合)送信元でパブリッシュするメッセージにタイムスタンプをつけて、送信先(クラウド側)でタイムスタンプを検証する
エッジ側でMQTT接続異常を検知できるようにする
要件としてはネットワークに異常があった場合、デバイス側からエンドユーザーになんらかの処理を実行したい場合です。クラウド側で検知出来ても、ネットワーク異常時にクラウド側からデバイスに処理内容を送れません。
パブリッシュ時に検知したい場合
QoS=1の場合、PUBACKが返却されたかどうかを送信後一定期間待つことで確認可能です。awscrt
の場合、以下のコードで実現可能です。ソースコード全文はshuntaka9576/reterminal-sampleを参考にしてください。注意点として、前項で記載した通り、タイムアウトしてもオンラインに戻った時点で再送されます。
try: timestamp = int(time.time()) logger.info(f"publish {self.name} {timestamp}") publish_future, _ = self.connection.publish( topic=self.publish_topic_name, payload=json.dumps({f"{self.native_id}": timestamp}), qos=mqtt.QoS.AT_LEAST_ONCE, ) # PUBACKが返却されるまで待機 publish_future.result(timeout=10) logger.info("publish done") time.sleep(1) except futures._base.TimeoutError as e: # 確認できない場合エラーをスロー logger.info(f"futures timeout error: {e}", exc_info=True) # cancel不可(オンラインに戻った時点で再送される) # publish_future.cancel() except Exception as e: logger.info(f"publish error: {e}", exc_info=True)
実行すると以下の通りです。10秒たった時点でエラーがスローされていることが確認できます。本エラーをハンドリングすることでエンドユーザーにネットワークエラーを知らせることが可能です。
$ poetry run python3 ./src/offline-publish-1.py 2022-10-11 08:18:31,410 __main__ [INFO] main [MainThread] (4341024128): start main thread 2022-10-11 08:18:31,410 __main__ [INFO] main [MainThread] (4341024128): start create mqtt connection 2022-10-11 08:18:31,713 __main__ [INFO] main [MainThread] (4341024128): end create mqtt connection 2022-10-11 08:18:31,713 __main__ [INFO] run [sub/a] (6146224128): publish sub/a 1665443911 2022-10-11 08:18:31,750 __main__ [INFO] run [sub/a] (6146224128): publish done 2022-10-11 08:18:31,778 __main__ [INFO] sub_callback [Dummy-2] (6127677440): call sub_callback 2022-10-11 08:18:31,778 __main__ [INFO] sub_callback [Dummy-2] (6127677440): topic: sub/a, dup: False, qos: 1, retain: False 2022-10-11 08:18:31,778 __main__ [INFO] sub_callback [Dummy-2] (6127677440): payload: {"2028645": 1665443911} 2022-10-11 08:18:31,778 __main__ [INFO] sub_callback [Dummy-2] (6127677440): end sub_callback 2022-10-11 08:18:31,786 __main__ [INFO] sub_callback [Dummy-2] (6127677440): call sub_callback 2022-10-11 08:18:31,786 __main__ [INFO] sub_callback [Dummy-2] (6127677440): topic: sub/a, dup: False, qos: 1, retain: False 2022-10-11 08:18:31,786 __main__ [INFO] sub_callback [Dummy-2] (6127677440): payload: {"2000558": 1665442936} 2022-10-11 08:18:31,786 __main__ [INFO] sub_callback [Dummy-2] (6127677440): end sub_callback (中略) 2022-10-11 08:18:36,932 __main__ [INFO] run [sub/a] (6146224128): publish sub/a 1665443916 2022-10-11 08:18:46,938 __main__ [INFO] run [sub/a] (6146224128): futures timeout error: Traceback (most recent call last): File "/Users/shuntaka/repos/github.com/shuntaka9576/reterminal-sample/./src/offline-publish-1.py", line 41, in run publish_future.result(timeout=10) File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/concurrent/futures/_base.py", line 448, in result raise TimeoutError() concurrent.futures._base.TimeoutError 2022-10-11 08:18:46,941 __main__ [INFO] run [sub/a] (6146224128): publish sub/a 1665443926 2022-10-11 08:18:56,942 __main__ [INFO] run [sub/a] (6146224128): futures timeout error: Traceback (most recent call last): File "/Users/shuntaka/repos/github.com/shuntaka9576/reterminal-sample/./src/offline-publish-1.py", line 41, in run publish_future.result(timeout=10) File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/concurrent/futures/_base.py", line 448, in result raise TimeoutError() concurrent.futures._base.TimeoutError 2022-10-11 08:18:56,943 __main__ [INFO] run [sub/a] (6146224128): publish sub/a 1665443936 2022-10-11 08:19:06,948 __main__ [INFO] run [sub/a] (6146224128): futures timeout error: Traceback (most recent call last): File "/Users/shuntaka/repos/github.com/shuntaka9576/reterminal-sample/./src/offline-publish-1.py", line 41, in run publish_future.result(timeout=10) File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/concurrent/futures/_base.py", line 448, in result raise TimeoutError() concurrent.futures._base.TimeoutError 2022-10-11 08:19:06,949 __main__ [INFO] run [sub/a] (6146224128): publish sub/a 1665443946
恒常的に検知したい場合
様々な実装方法がありますが、比較的簡易なものを紹介します。正直あまり筋が良いとは思ってないため、もっと良い方法があればFB頂けるとありがたいです。
- ヘルスチェック用にトピックを定義(ヘルスチェックトピックとします)し、デバイスがヘルスチェックトピックへ一定間隔でタイムスタンプをつけたペイロードをパブリッシュ
- 同時にデバイスは、ヘルスチェックトピックをサブスクライブし、パブリッシュした値と同じであることを確認
今回はMQTTを利用しますが、アプリケーション層より低レイヤーのネットワーク階層のプロトコルを使っても良いと思います。
ソースコード
import threading import json import time from enum import Enum from awscrt import io, mqtt import utils.config as Config import utils.logger as Logger config = Config.getConfig() Logger.init() logger = Logger.getLogger(__name__) AWS_IOT_ENDPOINT = config.IotEndpoint KEEP_ALIVE_SECS = 5 PING_TIMEOUT_MS = 1000 THING_NAME = "test-thing" HEALTH_CHECK_TOPIC_NAME = "sub/a" TIME_OUT_SEC = 3 class HealthCheckStatus(Enum): NOT_INIT = "NOT_INIT" CONNECTED = "CONNECT" DISCONNECT = "DISCONNECT" class HealthCheckTopic: def __init__(self, connection, thing_name: str): self.mqtt_connection = connection self.thing_name = thing_name def publish(self, data): future, packet_id = self.mqtt_connection.publish( topic=HEALTH_CHECK_TOPIC_NAME, payload=json.dumps(data), qos=mqtt.QoS.AT_MOST_ONCE, ) return future, packet_id def subscribe(self, callback=None): self.mqtt_connection.subscribe( # publishしたことを確認するために利用、publishとtopicが同じ topic=HEALTH_CHECK_TOPIC_NAME, qos=mqtt.QoS.AT_MOST_ONCE, callback=callback, ) class HealthCheckUseCase: def __init__(self, mqtt_connection, thing_name) -> None: self.subscribe_notify_timestamp = None self.__health_check_topic = HealthCheckTopic(mqtt_connection, thing_name) def set_timestamp_for_subscriber(self, topic, payload, dup, qos, retain): payload_obj = json.loads(payload.decode("utf-8")) timestamp = payload_obj.get("timestamp") if timestamp is not None: self.subscribe_notify_timestamp = timestamp def get_status( self, time_out_sec: int, ): try: self.subscribe_notify_timestamp = None timestamp = int(time.time()) future = self.__health_check_topic.publish( {"timestamp": timestamp}, ) retry_count = 0 # time_out_secに指定した秒間ヘルスチェックトピックからメッセージが来ていないかを確認する while True: # ヘルスチェックトピックへパブリッシュした値とサブスクライブで到着した値が一致すれば接続中と判定 if timestamp == self.subscribe_notify_timestamp: return HealthCheckStatus.CONNECTED retry_count = retry_count + 1 if retry_count == time_out_sec: logger.info( f"health check timeout: " f"timestamp: {timestamp}, " f"subscribe_notify_timestamp: {self.subscribe_notify_timestamp}" ) future.cancel() return HealthCheckStatus.DISCONNECT time.sleep(1) except Exception as e: return HealthCheckStatus.DISCONNECT class HealthCheckThread(threading.Thread): def __init__( self, mqtt_connection: mqtt.Connection, thing_name: str, ): threading.Thread.__init__(self) self.__status = HealthCheckStatus.NOT_INIT self.__mqtt_connection = mqtt_connection self.__thing_name = thing_name def run(self): health_check_use_case = HealthCheckUseCase( self.__mqtt_connection, self.__thing_name ) health_check_topic = HealthCheckTopic(self.__mqtt_connection, self.__thing_name) health_check_topic.subscribe(health_check_use_case.set_timestamp_for_subscriber) while True: try: self.__status = health_check_use_case.get_status( TIME_OUT_SEC, ) logger.info(f"status: {self.__status}") except Exception: logger.error("Unknown error", exc_info=True) # チェック間隔を指定 time.sleep(1) def createMQTTConnection( device_certificate_filepath: str, private_key_filepath: str, ca_certificate_filepath: str, ): event_loop_group = io.EventLoopGroup(1) host_resolver = io.DefaultHostResolver(event_loop_group) client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver) client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver) tls_ctx_options = io.TlsContextOptions.create_client_with_mtls_from_path( device_certificate_filepath, private_key_filepath ) tls_ctx_options.override_default_trust_store_from_path( None, ca_certificate_filepath ) tls_ctx = io.ClientTlsContext(tls_ctx_options) mqtt_client = mqtt.Client(client_bootstrap, tls_ctx) mqtt_connection = mqtt.Connection( client=mqtt_client, client_id=THING_NAME, host_name=AWS_IOT_ENDPOINT, port=8883, clean_session=False, keep_alive_secs=KEEP_ALIVE_SECS, ping_timeout_ms=PING_TIMEOUT_MS, ) connect_future = mqtt_connection.connect() connect_future.result() return mqtt_connection def main(): logger.info("start main thread") logger.info("start create mqtt connection") connection = createMQTTConnection( device_certificate_filepath="./data/device_certificate.pem", private_key_filepath="./data/private.key", ca_certificate_filepath="./data/ca_certificate.pem", ) logger.info("end create mqtt connection") health_check_thread = HealthCheckThread(mqtt_connection=connection, thing_name=THING_NAME) health_check_thread.daemon = True health_check_thread.name = "health_check_thread" health_check_thread.start() while True: time.sleep(10) if __name__ == "__main__": main()
実行ログ
ネットワークをOFF->ONしたタイムスタンプ
$ networksetup -SetAirportPower en0 off;date +%s 1665448588 $ networksetup -SetAirportPower en0 on;date +%s 1665448592
アプリ実行ログ
$ poetry run python3 ./src/health-check.py 2022-10-11 09:36:22,063 __main__ [INFO] main [MainThread] (4345300352): start main thread 2022-10-11 09:36:22,063 __main__ [INFO] main [MainThread] (4345300352): start create mqtt connection 2022-10-11 09:36:22,435 __main__ [INFO] main [MainThread] (4345300352): end create mqtt connection 2022-10-11 09:36:23,441 __main__ [INFO] run [health_check_thread] (6144815104): status: HealthCheckStatus.CONNECTED 2022-10-11 09:36:25,442 __main__ [INFO] run [health_check_thread] (6144815104): status: HealthCheckStatus.CONNECTED 2022-10-11 09:36:27,453 __main__ [INFO] run [health_check_thread] (6144815104): status: HealthCheckStatus.CONNECTED 2022-10-11 09:36:30,468 __main__ [INFO] get_status [health_check_thread] (6144815104): health check timeout: timestamp: 1665448588, subscribe_notify_timestamp: None 2022-10-11 09:36:30,469 __main__ [INFO] run [health_check_thread] (6144815104): status: HealthCheckStatus.DISCONNECT 2022-10-11 09:36:33,478 __main__ [INFO] get_status [health_check_thread] (6144815104): health check timeout: timestamp: 1665448591, subscribe_notify_timestamp: None 2022-10-11 09:36:33,478 __main__ [INFO] run [health_check_thread] (6144815104): status: HealthCheckStatus.DISCONNECT 2022-10-11 09:36:36,487 __main__ [INFO] get_status [health_check_thread] (6144815104): health check timeout: timestamp: 1665448594, subscribe_notify_timestamp: None 2022-10-11 09:36:36,487 __main__ [INFO] run [health_check_thread] (6144815104): status: HealthCheckStatus.DISCONNECT 2022-10-11 09:36:39,498 __main__ [INFO] get_status [health_check_thread] (6144815104): health check timeout: timestamp: 1665448597, subscribe_notify_timestamp: None 2022-10-11 09:36:39,498 __main__ [INFO] run [health_check_thread] (6144815104): status: HealthCheckStatus.DISCONNECT 2022-10-11 09:36:41,508 __main__ [INFO] run [health_check_thread] (6144815104): status: HealthCheckStatus.CONNECTED 2022-10-11 09:36:43,514 __main__ [INFO] run [health_check_thread] (6144815104): status: HealthCheckStatus.CONNECTED 2022-10-11 09:36:45,524 __main__ [INFO] run [health_check_thread] (6144815104): status: HealthCheckStatus.CONNECTED
ネットワークをOFFにした1665448588
(2022 09:36:28
)の2秒後に切断を検知、ネットワークをONにした1665448592
(2022 09:36:32
)から大体7秒後に復帰を検知しています。
IoTポリシーサイズ上限に注意する
ポリシードキュメントの最大サイズはスペースを除く2048文字です。(参考)こちらは上限緩和できるクォータではありません。
ポリシーは証明書に複数アタッチ可能で、役割ごとに余裕を持たせたIoTポリシーを複数アタッチするのが良いと思われます。
既に大量に証明書がある際に上限を超えた場合、以下の対応が必要なので認識しておくだけでも違うと思います。
- 既存証明書へポリシー追加アタッチするスクリプトの作成、実行
- 証明書作成時に新しいポリシーを追加アタッチする実装の追加
その他
その他機能面でやって良かったことを書きます。中には初歩的なこともありますが、改めて見て頂けると幸いです。
ログをCloudWatch Logsと同期する
クラウド側からエッジ側のログを確認できるとデバイスの状況を確認する上で便利でした。
デバイスのログは、watchtowerを利用して、クラウド側に保存しました。デバイス証明書を利用し、発行したAWS認証情報を利用して送信を行いました。(参考:AWS IoT Core認証情報プロバイダーを使用して、AWSサービスの直接呼出しを認証)。認証情報プロバイダーを利用した場合、クレデンシャルの期限は1時間であるため、1 時間ごとにローテートする必要があります。
ただ本方法はエッジ側がオフライン->オンラインになった場合、送信していなかった箇所を再送してくれる訳ではないです。故に参考程度です。トラブルシュートが必要な場合は、Soracom Napter経由でログを吸い上げて確認しています。
スレッドを監視する
ハードウェア側から割り込みで処理が必要になるケースがあり、常駐で起動しているスレッドを使う場面がありました。ただそのスレッドが意図せず停止しているケースがあったため監視をし、停止してはいけないスレッドが停止している場合異常終了するようにしました。systemdで再起動するようにOS側で設定しているため、サービスを再起動して復帰するという流れになります。
以下のような関数を定期的に実行すると良いと思います。ただソフトの開発ある程度進んでから、このエラーは全く見なくなりました。
for thread in threads: # スレッドの生存を確認 if not thread.is_alive(): thread_name = thread.getName() logger.error(f"{thread_name} is not alive.") raise # 独自エラーをraizeする
リモートでアップデート可能にしておく
スケジュール通り作り込めないケースがほとんどだと思います。なるべく早い段階でアプリの処理初めの方にアップデートする機構を作っておくと後々機能改善がスムーズになります。アップデートにはいくつか方法がありますが、例えばIoT Jobsだったり、最悪Soracom Napterで直接SSHできるようにしておくと現場に行かずにトラブルシュートが可能で便利だと思います。
アプリで時刻を扱う場合、時刻同期タイミングをチェックする
Raspberry PiにはRTCを搭載していないため、シャットダウンした時刻から次の処理が始まります。依存関係なくsystemdで起動登録している場合は、同期される前にほぼアプリケーションが始まると思って良いと思います。
時刻同期していない状態でアプリを動作すると、当たり前ですが以下のような問題が発生します
- 時刻を検証の要素としているトークン検証に失敗する(
AWS Signature Version 4
,JWT
など) - 時刻同期された時点で、ログに出力されている時刻が飛ぶため信頼性が下がる
アプリ側で時刻が同期されるまで待機する場合コードは以下の通りです。timedatectl
というCLIを利用して時刻同期の状態を逐次取得する関数を定義し、同期されるまで後続処理をブロックするのが良いと思います。
import re import subprocess SYNCHRONIZED_STATUS_PATTERN = ".*?System clock synchronized: (.*?)\n" def is_time_sync() -> bool: execTimedatectlResult = subprocess.run( ["timedatectl", "status"], encoding="utf-8", stdout=subprocess.PIPE ) regexpResult = re.match( SYNCHRONIZED_STATUS_PATTERN, execTimedatectlResult.stdout, re.S ) if regexpResult is not None: synchronizedStatus = regexpResult.group(1) if synchronizedStatus == "yes": return True else: return False
ログをローテートする
ストレージには限りがありますのでローテートしましょう。Pythonのデフォルトのloggingパッケージで可能です。
異常時にリソースの解放を行う
GPIOやソケット通信のコネクションは、終了シグナルを受け取ったり異常終了した際に、必ず解放するようにします。解放すること異常終了後のサービス再起動時に正常稼働できる可能性が高まります。
最終的には本番と同様の起動方法でテストする
systemd経由だと他のサービスの起動状況によって、接続されているハードウェア設定の初期化状態(QRリーダーのキーボード設定など)が変わる場合があります。早い段階でsystemdで起動した状態でテストできれば、その分バグが見つかるのは早くなります。
事前にやっておくとよかったこと
利用するハードウェアのライブラリが用意されているかを早めに確認する
ハードウェアと連携する場合、利用したいライブラリがCしかないケースは多々あります。 Cに精通しているメンバー入れば良いですが、いない場合はソフトを書く言語がPythonの場合、PythonのCバインディングライブラリがないかなど事前に確認しておくとスケジュールが読みやすくなると思います。
最後に
本記事ではRaspberry Pi OS上で動作するIoTエッジ開発でやって良かったことを書きました。当然ですがエッジデバイスをサーバーに見立てたら、EC2やECSで動作するサーバーサイドアプリ同様で、気にする観点として共通しているものも多かったと思います。 1つ1つもう少し掘りさげられるともっと良い解決策が見つかったかもしれません。より良い方法があればフィードバック頂けると幸いです。
付録
サンプルコードの設定方法
コンフィグ設定
cp ./src/utils/_config.py ./src/utils/config.py
IoT Coreのatsエンドポイントを設定
def getConfig(): config = type( "Config", (object,), { # FQDNのみでOK "IotEndpoint": "xxxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com", }, ) return config
依存ライブラリのインストール
make install # or poetry install
MQTT接続に必要なIoT Coreリソース作成
one-click証明書を利用します
wget https://www.amazontrust.com/repository/AmazonRootCA1.pem mv ./AmazonRootCA1.pem ./data/ca_certificate.pem aws iot create-keys-and-certificate \ --set-as-active \ --certificate-pem-outfile ./data/device_certificate.pem \ --public-key-outfile ./data/public.key \ --private-key-outfile ./data/private.key >./data/cert.json
export POLICY_NAME="test-policy" export THING_NAME="test-thing" # ポリシーを作成 aws iot create-policy \ --policy-name test-policy \ --policy-document file://iot-policy.json # 証明書にポリシーをアタッチ aws iot attach-policy \ --policy-name $POLICY_NAME \ --target `cat ./data/cert.json | jq -r ".certificateArn"` # モノの作成 aws iot create-thing \ --thing-name $THING_NAME # モノと証明書をアタッチ aws iot attach-thing-principal \ --thing-name $THING_NAME \ --principal `cat ./data/cert.json | jq -r ".certificateArn"`