[AWS IoT Core] AWS IoT Core Message Queuing を試してみました

[AWS IoT Core] AWS IoT Core Message Queuing を試してみました

2025.08.04

1. はじめに

製造ビジネステクノロジー部の平内(SIN)です。

2025年7月31日、「Message Queuing for MQTT Shared Subscriptions」がアナウンスされました。

AWS IoT Core now supports message queuing for MQTT shared subscriptions

MQTT Shared Subscriptionsは、元々、2023年4月から利用可能でしたが、今回の機能追加によりメッセージの永続化ネットワーク障害時の信頼性向上が実現されました。

https://dev.classmethod.jp/articles/aws-iot-core-mqttv5-shared-subscriptions/

従来のShared Subscriptions(2023年4月〜)

  • 複数のサブスクライバー間でメッセージを負荷分散
  • ネットワーク障害時はメッセージが失われる可能性
  • リアルタイム性重視の用途

新機能:Message Queuing(2025年7月〜)

  • QoS1メッセージの自動キューイング: ネットワーク障害時にメッセージが自動保存
  • 再接続時の自動配信: サブスクライバー復帰時にキューされたメッセージを自動配信
  • 負荷分散 + メッセージ保証 が同時に実現

2. 動作確認

検証のため、断続的な切断・再接続を模擬するSubscriberと20件メッセージを送信するPublisherを作成しました。

プログラム名 機能
publisher.py 20件のメッセージを送信して終了
subscriber.py 3つのサブスクライバーを起動、断続的に、切断・再接続を繰り返す

https://github.com/furuya02/tryout-aws-iot-core-message-queuing/blob/main/src/publisher.py

https://github.com/furuya02/tryout-aws-iot-core-message-queuing/blob/main/src/subscriber.py

2.1 環境構築

  • AWS IoT Thing 及び証明書の作成
# Thing作成
aws iot create-thing --thing-name message-queuing-test-device

# 証明書作成
cd certs
aws iot create-keys-and-certificate --set-as-active \
  --certificate-pem-outfile device.pem.crt \
  --private-key-outfile private.pem.key \
  --public-key-outfile public.pem.key
  • IoTポリシー設定
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": ["iot:Connect"],
      "Resource": "arn:aws:iot:*:*:client/message-queuing-test-*",
      "Condition": {
        "StringLike": {"iot:ClientId": "message-queuing-test-*"}
      }
    },
    {
      "Effect": "Allow",
      "Action": ["iot:Publish"],
      "Resource": "arn:aws:iot:*:*:topic/test/shared/*"
    },
    {
      "Effect": "Allow",
      "Action": ["iot:Subscribe"],
      "Resource": [
        "arn:aws:iot:*:*:topicfilter/test/shared/*",
        "arn:aws:iot:*:*:topicfilter/$share/test-group/test/shared/*"
      ]
    },
    {
      "Effect": "Allow",
      "Action": ["iot:Receive"],
      "Resource": "arn:aws:iot:*:*:topic/test/shared/*"
    }
  ]
}
  • 構成
IoTCode/
├── src/
│   ├── certs/               # AWS IoT証明書
│   ├── config.py           # 設定管理
│   ├── publisher.py        # メッセージ送信
│   ├── subscriber.py       # シェアサブスクライバー
│   ├── test_message_queuing.py  # 統合テスト
│   └── .env.example        # 環境変数テンプレート
└── blog/
    └── aws-iot-core-message-queuing-tryout.md

※ 詳しくは、GithubのREADMEをご参照ください

2.2 実装コードのポイント

  • メッセージキューイングを有効にするため、Clean Session=falseが必要
# subscriber.py
client = mqtt.Client(client_id=self.client_id, clean_session=False, protocol=mqtt.MQTTv311)
  • QoS1でのメッセージ送信が必須
# publisher.py  
result = self.client.publish(topic, json.dumps(message), qos=1)
  • トピック名は、シェアサブスクリプションへの登録が必要
# subscriber.py
shared_topic = f"$share/{self.shared_subscription_group}/{self.topic_prefix}/messages"
client.subscribe(shared_topic, qos=1)

2.2 検証と結果

ステップ1: サブスクライバーを起動

# ターミナル1でサブスクライバーを起動
python -m src.subscriber

サブスクライバーが起動すると、以下のような出力が表示されます:

[Main] サブスクライバー開始中...
[Manager] 3個のサブスクライバーが接続成功
[Subscriber-01] 接続成功: message-queuing-test-subscriber-01, セッション保持: True
[Subscriber-02] 接続成功: message-queuing-test-subscriber-02, セッション保持: True
[Subscriber-03] 接続成功: message-queuing-test-subscriber-03, セッション保持: True

=== メッセージキューイングテスト実行中 ===
Ctrl+C で終了

サブスクライバーはCtrl+Cで停止するまで継続的に動作します。

※ サブスクライバーが「🎉 全サブスクライバー準備完了!」メッセージを表示するまで待ってから、パブリッシャーを起動してください。

ステップ2: パブリッシャーを起動(別ターミナル)

# ターミナル2でパブリッシャーを起動
python -m src.publisher

パブリッシャーが20個のメッセージを送信し、サブスクライバーでメッセージ受信が確認できます。

ステップ3: Message Queuing 確認

サブスクライバーは、断続的に5〜15秒間、切断・再接続されますが、切断中のメッセージはキューイングされ、最終的に、パブリッシャーで送信した20件がすべて受信できることを確認できます。

確認ポイントは、以下です。

  • シェアサブスクリプション: 複数サブスクライバー間でのメッセージ負荷分散
  • メッセージキューイング: 切断中のメッセージが再接続時に配信される

2.3 結果

Publisher が20件送信した場合の正常に処理された場合の表示です。(統計情報は、10秒毎に表示されます)

=== サブスクライバー統計 ===
  01: 接続中, メッセージ数: 7
  02: 切断中, メッセージ数: 4 
  03: 接続中, メッセージ数: 9
📊 合計受信メッセージ数: 20
⏳ 1個のサブスクライバーが切断中
💡 切断中のサブスクライバー再接続時にキューイングメッセージが配信されます
🔄 Message Queuing機能 動作中!
  • 合計が20件未満の場合、切断中のサブスクライバーにメッセージがキューイングされています
  • これは Message Queuing機能が正常動作している証拠 です
  • 切断されたサブスクライバーが再接続すると、不足分のメッセージが配信されます

3 実用的な活用場面

今回の新機能により、以下のような用途での活用が期待できます。

3.1 産業IoT監視システム

  • 工場の複数センサーからのデータを複数のバックエンドで処理
  • ネットワーク障害時のデータ損失を回避(今回のテストでは、ネットワーク障害時でもデータ損失がなかったことを確認できました)

3.2 スマートシティ

  • 街中のセンサーデータを分散処理
  • メンテナンス時間中のデータも確実に処理

3.3 リモート監視

  • 断続的な接続環境でも重要なアラートを確実に配信

4 注意点・制限事項

4.1 QoS1

  • QoS0メッセージは、キューイング対象外です。
# ❌ キューイングされない
client.publish(topic, message, qos=0)  

# ✅ キューイング対象
client.publish(topic, message, qos=1)

4.2 配信レート制限

  • 最大20メッセージ/秒/アクティブサブスクライバー
  • 超過分は自動的にキューイング

4.3 永続セッション

永続セッションが必須です。

# ❌ キューイング無効
client = mqtt.Client(clean_session=True)

# ✅ キューイング有効  
client = mqtt.Client(clean_session=False)

5 最後に

今回は新機能であるAWS IoT Core Message Queuingについて、その動作を確認してみました。

  • 既存のShared Subscriptionsとの互換性
  • 追加設定なしでの自動キューイング
  • 優れた負荷分散とメッセージ保証の両立

元々IoT Coreは、ルール設定で、Kinesis Data Streams へ送ることで、同じような機能の実現は、可能でしたが、今後は、メッセージブローカーだけでも、システムの信頼性向上が可能になると思います。

今回検証に使用したコードは、下記に置きました。

https://github.com/furuya02/tryout-aws-iot-core-message-queuing

6 参考リンク

この記事をシェアする

facebookのロゴhatenaのロゴtwitterのロゴ

© Classmethod, Inc. All rights reserved.