[AWS IoT Core] AWS IoT Core Message Queuing を試してみました
1. はじめに
製造ビジネステクノロジー部の平内(SIN)です。
2025年7月31日、「Message Queuing for MQTT Shared Subscriptions」がアナウンスされました。
AWS IoT Core now supports message queuing for MQTT shared subscriptions
MQTT Shared Subscriptionsは、元々、2023年4月から利用可能でしたが、今回の機能追加によりメッセージの永続化とネットワーク障害時の信頼性向上が実現されました。
従来のShared Subscriptions(2023年4月〜)
- 複数のサブスクライバー間でメッセージを負荷分散
- ネットワーク障害時はメッセージが失われる可能性
- リアルタイム性重視の用途
新機能:Message Queuing(2025年7月〜)
- QoS1メッセージの自動キューイング: ネットワーク障害時にメッセージが自動保存
- 再接続時の自動配信: サブスクライバー復帰時にキューされたメッセージを自動配信
- 負荷分散 + メッセージ保証 が同時に実現
2. 動作確認
検証のため、断続的な切断・再接続を模擬するSubscriberと20件メッセージを送信するPublisherを作成しました。
プログラム名 | 機能 |
---|---|
publisher.py | 20件のメッセージを送信して終了 |
subscriber.py | 3つのサブスクライバーを起動、断続的に、切断・再接続を繰り返す |
2.1 環境構築
- AWS IoT Thing 及び証明書の作成
# Thing作成
aws iot create-thing --thing-name message-queuing-test-device
# 証明書作成
cd certs
aws iot create-keys-and-certificate --set-as-active \
--certificate-pem-outfile device.pem.crt \
--private-key-outfile private.pem.key \
--public-key-outfile public.pem.key
- IoTポリシー設定
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["iot:Connect"],
"Resource": "arn:aws:iot:*:*:client/message-queuing-test-*",
"Condition": {
"StringLike": {"iot:ClientId": "message-queuing-test-*"}
}
},
{
"Effect": "Allow",
"Action": ["iot:Publish"],
"Resource": "arn:aws:iot:*:*:topic/test/shared/*"
},
{
"Effect": "Allow",
"Action": ["iot:Subscribe"],
"Resource": [
"arn:aws:iot:*:*:topicfilter/test/shared/*",
"arn:aws:iot:*:*:topicfilter/$share/test-group/test/shared/*"
]
},
{
"Effect": "Allow",
"Action": ["iot:Receive"],
"Resource": "arn:aws:iot:*:*:topic/test/shared/*"
}
]
}
- 構成
IoTCode/
├── src/
│ ├── certs/ # AWS IoT証明書
│ ├── config.py # 設定管理
│ ├── publisher.py # メッセージ送信
│ ├── subscriber.py # シェアサブスクライバー
│ ├── test_message_queuing.py # 統合テスト
│ └── .env.example # 環境変数テンプレート
└── blog/
└── aws-iot-core-message-queuing-tryout.md
※ 詳しくは、GithubのREADMEをご参照ください
2.2 実装コードのポイント
- メッセージキューイングを有効にするため、Clean Session=falseが必要
# subscriber.py
client = mqtt.Client(client_id=self.client_id, clean_session=False, protocol=mqtt.MQTTv311)
- QoS1でのメッセージ送信が必須
# publisher.py
result = self.client.publish(topic, json.dumps(message), qos=1)
- トピック名は、シェアサブスクリプションへの登録が必要
# subscriber.py
shared_topic = f"$share/{self.shared_subscription_group}/{self.topic_prefix}/messages"
client.subscribe(shared_topic, qos=1)
2.2 検証と結果
ステップ1: サブスクライバーを起動
# ターミナル1でサブスクライバーを起動
python -m src.subscriber
サブスクライバーが起動すると、以下のような出力が表示されます:
[Main] サブスクライバー開始中...
[Manager] 3個のサブスクライバーが接続成功
[Subscriber-01] 接続成功: message-queuing-test-subscriber-01, セッション保持: True
[Subscriber-02] 接続成功: message-queuing-test-subscriber-02, セッション保持: True
[Subscriber-03] 接続成功: message-queuing-test-subscriber-03, セッション保持: True
=== メッセージキューイングテスト実行中 ===
Ctrl+C で終了
サブスクライバーはCtrl+Cで停止するまで継続的に動作します。
※ サブスクライバーが「🎉 全サブスクライバー準備完了!」メッセージを表示するまで待ってから、パブリッシャーを起動してください。
ステップ2: パブリッシャーを起動(別ターミナル)
# ターミナル2でパブリッシャーを起動
python -m src.publisher
パブリッシャーが20個のメッセージを送信し、サブスクライバーでメッセージ受信が確認できます。
ステップ3: Message Queuing 確認
サブスクライバーは、断続的に5〜15秒間、切断・再接続されますが、切断中のメッセージはキューイングされ、最終的に、パブリッシャーで送信した20件がすべて受信できることを確認できます。
確認ポイントは、以下です。
- シェアサブスクリプション: 複数サブスクライバー間でのメッセージ負荷分散
- メッセージキューイング: 切断中のメッセージが再接続時に配信される
2.3 結果
Publisher が20件送信した場合の正常に処理された場合の表示です。(統計情報は、10秒毎に表示されます)
=== サブスクライバー統計 ===
01: 接続中, メッセージ数: 7
02: 切断中, メッセージ数: 4
03: 接続中, メッセージ数: 9
📊 合計受信メッセージ数: 20
⏳ 1個のサブスクライバーが切断中
💡 切断中のサブスクライバー再接続時にキューイングメッセージが配信されます
🔄 Message Queuing機能 動作中!
- 合計が20件未満の場合、切断中のサブスクライバーにメッセージがキューイングされています
- これは Message Queuing機能が正常動作している証拠 です
- 切断されたサブスクライバーが再接続すると、不足分のメッセージが配信されます
3 実用的な活用場面
今回の新機能により、以下のような用途での活用が期待できます。
3.1 産業IoT監視システム
- 工場の複数センサーからのデータを複数のバックエンドで処理
- ネットワーク障害時のデータ損失を回避(今回のテストでは、ネットワーク障害時でもデータ損失がなかったことを確認できました)
3.2 スマートシティ
- 街中のセンサーデータを分散処理
- メンテナンス時間中のデータも確実に処理
3.3 リモート監視
- 断続的な接続環境でも重要なアラートを確実に配信
4 注意点・制限事項
4.1 QoS1
- QoS0メッセージは、キューイング対象外です。
# ❌ キューイングされない
client.publish(topic, message, qos=0)
# ✅ キューイング対象
client.publish(topic, message, qos=1)
4.2 配信レート制限
- 最大20メッセージ/秒/アクティブサブスクライバー
- 超過分は自動的にキューイング
4.3 永続セッション
永続セッションが必須です。
# ❌ キューイング無効
client = mqtt.Client(clean_session=True)
# ✅ キューイング有効
client = mqtt.Client(clean_session=False)
5 最後に
今回は新機能であるAWS IoT Core Message Queuingについて、その動作を確認してみました。
- 既存のShared Subscriptionsとの互換性
- 追加設定なしでの自動キューイング
- 優れた負荷分散とメッセージ保証の両立
元々IoT Coreは、ルール設定で、Kinesis Data Streams へ送ることで、同じような機能の実現は、可能でしたが、今後は、メッセージブローカーだけでも、システムの信頼性向上が可能になると思います。
今回検証に使用したコードは、下記に置きました。