MQTT Version 5.0 のリクエスト・レスポンスパターンを試してみた

MQTT Version 5.0 で楽したい。
2022.11.10

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

約4ヶ月ほど前ですが、AWS IoT Greengrass で MQTT Version 5.0 がサポートされるアップデートがありました。
しかし私はMQTT Version 5 について詳しくないので、本記事では新機能の1つである「リクエスト・レスポンス」を試してみて、機能の詳細や使い方を確認していきたいと思います。

(なお、AWS IoT Core は MQTT Version 3.1.1 となります)

MQTT Version 5 全体の詳細については下記を参照頂ければと思います。

リクエスト・レスポンスとは?

従来の MQTT Version 3.1.1 では、Pub/Sub モデルによる非同期メッセージングなので、リクエストとレスポンスは互いに切り離されたものになります。

MQTT Version 5 では、リクエスト( Publish )時に予めレスポンス先のトピックを指定することで、特定のメッセージを受け取った際に指定されたトピックにレスポンスを返す事ができます。もちろん、MQTT v3.1.1 でも MQTT クライアントの実装次第で同様のことを実現することができますが、MQTT Version 5 ではプロトコルの仕様として、リクエスト・レスポンスパターンがサポートされたので、より簡単に実現することができるようになりました。

イメージは以下の図のようになります。

  • Client1 が メッセージを Publish する際に、レスポンストピックとして myres というトピック(任意)を指定します。
  • メッセージを受け取った Client2 では、メタデータに含まれるレスポンストピックを取り出して、そのトピックに応答( Publish )します。
  • Client1 では事前にレスポンストピックを Subscribe しておいて Client2 からの応答メッセージを受け取ります。

00-request-response

流れを見ていただくと分かりますが、従来の MQTT v3.1.1 でも同じ処理の流れを実現することは可能です。しかし、MQTT Version 5 では上記の通り、応答メッセージを受け取るトピックをリクエスト元で指定できるので、より簡単にメッセージのやり取りが行えるようになります。

EMQX パブリックブローカーで試してみる

MQTT Version 5 をサポートしている MQTT ブローカーはいくつかありますが、今回は Public に公開されている、EMQ 社の MQTT Version 5 ブローカーを使ってみます。
EMQ 社では「EMQX」という OSS の MQTT ブローカーを開発しており、今回はパブリックに公開されているものを利用しています。

ちなみに冒頭で紹介した AWS IoT Greengrass の MQTT Version 5.0 サポートにおいても、この EMQX をベースとしたコンポーネントを使うことで実現しているようです。

ブローカーの接続情報は下記ページで公開されています。

17-access-info

テスト環境

先程の図を元にして、以下のような構成でテストしてみたいと思います。

100-req-res-test-env

MQTT X においては同じ UI で使えるオンラインのクライアントもあるので、必要に応じて利用してみて下さい。

MQTT X でパブリックブローカーに接続する

最初に MQTT X で EMQX ブローカーに接続するための設定を行います。
「New Connection」 をクリックします。

11-mqttx-conn

次に、接続に必要な情報を記入します。
今回は動作検証なので暗号化なしの MQTT で接続します(not MQTT over TLS という意味です)

項目 記入内容
Name 接続名の名前。適当なものを指定。
Client ID 適当なものを指定。MQTT Xはランダムな ID を自動でセット(変更可)
Host - プロトコルは mqtt を選択
- ホスト名は broker.emqx.io
Port 1883

12-config-mqttx

「Advanced」の設定項目にある「MQTT Version」は「5.0」を指定します。

13-mqtt-config-advanced

設定が完了したら、右上にある矢印アイコンをクリックしてブローカーに接続します。

14-start-connect

正常に接続できたら「Connected」というメッセージが出ます。

15-connected

レスポンダ用のクライアントの作成

次にメッセージを受ける側の MQTT クライアントを作成します。
メッセージを受け取った際に、その属性情報からレスポンストピックを読み取り、そのトピックに何らかのメッセージを Publish したいので、今回は MQTT.js でクライアントを作ります。

インストールされていない場合はインストールしておきましょう。

$ npm install mqtt --save

今回は MQTT.js を使いますが、MQTT Version 5 をサポートしているものであれば Python の MQTT ライブラリである「Paho」などでもよいと思います。

下記はサンプルコードです。

  • 18行目:
    • packet.properties.responseTopic を参照してレスポンストピックをログ出力しています。
  • 24-35行目:
    • report_proc 関数で、レスポンストピックに Success Req-Res! というメッセージを Publish します。

mqtt5test.js

const mqtt = require('mqtt');
const options = {
    rejectUnauthorized : false,
};
let client = mqtt.connect('mqtt://broker.emqx.io:1883',{
    options,
    protocolVersion: 5,
});

let topic = "rrtest/topic";

client.subscribe(topic);
client.on('message', (topic, payload, packet) => {
    date = new Date().toLocaleString({ timeZone: 'Asia/Tokyo' });
    console.log(date, 'Received Message:', payload.toString());

    if (packet.properties && packet.properties.responseTopic) {
        console.log(date,'Response Topic:', packet.properties.responseTopic.toString());
        report_proc(packet.properties.responseTopic);
    };
});


function report_proc(responseTopic)
    {
    let msgArray = new Object ();
    msgArray["message"] = new Object ();
    msgArray["message"] = 'Success Req-Res!';
    console.log(date,'published message:',JSON.stringify(msgArray).toString());
    const json_str = JSON.stringify(msgArray, null, 4);
    client.publish(responseTopic, json_str, {
        qos: 0,
        retain: false,
    });
}

実行して Subscribe させておきます。

$ node mqtt5test.js

「MQTT X」でリクエストレスポンスの設定を追加

次に「MQTT X」でレスポンストピックを設定しておきましょう。
「Meta」というボタンがあるのでクリックします。

20-edit-meta

メタデータの編集画面が開くので「Response Topic」ack/1 というトピックをレスポンス用のトピックとしてセットして保存します。(任意のものをセットして下さい)

21-set-response-topic

ついでに、レスポンストピックのメッセージを確認するために「MQTT X」上で ack/1 を Subscribe しておきましょう。「New Subscription」 をクリックします。

( Subscribe を追加するにはブローカーに接続しておく必要があるので、切れていたら再接続してください)

22-set-subscription

Subscribe するトピック名 ack/1 をセットして保存します。

23-set-sub-topic

これで準備できました。

リクエスト・レスポンスの動作確認

準備できたので、トピック rrtest/topic に適当なメッセージを送ってみます。
(送信時は矢印マークをクリックします)

24-publish-test

Publishすると、レスポンス用のトピック ack/1 からメッセージを受け取ることができました。

25-complete-req-res

MQTT.js 側を見ると、次のようなログが確認できると思います。
レスポンストピックで受け取ったメッセージの属性を読み取って、トピック名(ack/1)を出力できています。

2022/11/10 18:28:13 Received Message: {"message": "hello"}
2022/11/10 18:28:13 Response Topic: ack/1
2022/11/10 18:28:13 published message: {"message":"Success Req-Res!"}

最後に

簡単に MQTT Version 5.0 のリクエスト・レスポンスパターンを試してみました。
他にも色々な機能が追加されているので、引き続き試してみたいと思います。

以上です。