AWS IoTルールでProtocol Buffersをデコードする with MQTTX

AWS IoTルールでProtocol Buffersをデコードする with MQTTX

MQTTXでprotbufデータをパブリッシュし、AWS IoTルールでデコード&別トピックに転送するのを試してみました
Clock Icon2025.04.10

IoTデバイスとクラウド間でデータをやり取りする際、通信手段としてMQTTは広く使われています。

工場内の機器や車両といったユースケースを考えると、ネットワークの帯域幅が限られているケースも少なくありません。そうした環境下では、通信データのサイズや形式にも工夫が求められます。

今回は、スキーマを事前に定めてバイナリ形式にシリアライズすることでデータを軽量にできるProtocol Buffers(protobuf)形式のデータをMQTTでパブリッシュし、それをIoT Coreのルールエンジンを使って別のトピックに転送する、という流れを試してみたいと思います。

MQTTクライアントには、GUIで扱いやすいMQTTXを使用します。MQTTXでは、protobufスキーマを登録し、送受信するメッセージのシリアライズ/デコード(デシリアライズ)が可能です。

メッセージングプロトコルとシリアライズ形式の比較については、IoV(Internet of Vehicles)における内容ですが、以下の資料が参考になります。

事前準備

次の内容は事前に対応済みであることを想定しています。

試してみる

次のような流れで試します。

  • protobufのprotoファイルの作成(スキーマ定義)
  • protobufの記述子ファイルを生成
  • 記述子ファイルをS3バケットにアップロード
  • protobufデータをデコード&別のトピックに転送するルールを作成
  • S3バケットポリシーの設定
  • MQTTXにprotoファイルをimport
  • MQTTXでprotobufデータをパブリッシュ&確認

protobufのprotoファイルの作成(スキーマ定義)

センサーデータのスキーマを定義するためのprotoファイルを作成します。以下の内容でsensor.protoというファイルを作成します。

syntax = "proto3";

message Sensor {
  optional string id = 1;
  optional float temperature = 2;
  optional float humidity = 3;
  optional int64 timestamp = 4;
}

protobufの記述子ファイルを生成

次に、protocを使って、protoファイルから記述子ファイルを生成します。

protoc --descriptor_set_out=sensor.desc --include_imports sensor.proto

protocの使い方については、protoc --helpで確認できます。
また、protobufの記述子については以下の資料が参考になります。

記述子ファイルをS3バケットにアップロード

aws cliもしくはマネコンなどから生成した記述子ファイルをS3バケットにアップロードします。必要に応じてS3バケットを作成してください。

aws s3 cp sensor.desc s3://<your-bucket-name>/<object-key>

protobufデータをデコード&別のトピックに転送するルールを作成

マネジメントコンソールからAWS IoTルールを作成します。

Screenshot 2025-04-10 at 16.48.44

decode関数を使って、protobufをデコードするSQLを書きます。関数の引数には、デコード対象、protbuf(prot)であることや記述子があるS3バケットとオブジェクトキー、protoファイルの名前、メッセージ名を記載します。

また、以下のドキュメントに記載のベストプラクティスとして、記述子ファイルの中身を変更する場合は、オブジェクトキーの変更を推奨する旨の記載があります。オブジェクトキーを変更しない場合、記述子ファイルの中身の変更が反映されるまで15分程度かかるようなので、注意が必要です。

Screenshot 2025-04-10 at 17.47.59

アクションとしてdecodedトピック にパブリッシュするように設定し、ルールを作成します。
対象トピックにパブリッシュできるIAMロールを作成し、指定します。

Screenshot 2025-04-10 at 17.48.27

S3バケットポリシーの設定

マネジメントコンソールでS3バケットのアクセス許可ページを開き、以下のバケットポリシーを設定します。記述子を配置したS3バケットにAWS IoTがアクセスできるようにします。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "Statement1",
      "Effect": "Allow",
      "Principal": {
        "Service": "iot.amazonaws.com"
      },
      "Action": "s3:Get*",
      "Resource": "arn:aws:s3:::<バケット名>/<オブジェクトキー>"
    }
  ]
}

MQTTXでprotoファイルのimport

MQTTXを開き、サイドバーの上から4つ目のアイコンを開きます。そうすると、script画面が開くので、上のタブでSchemaを開きます。

Screenshot 2025-04-10 at 9.25.26

右上にある、Import proto fileをクリックし、先ほど作成したprotoファイルを選択します。

Screenshot 2025-04-10 at 9.24.12

そうすると、先ほど表示されたprotobufスキーマが表示されます。

Screenshot 2025-04-10 at 9.24.39

下のInputのところにjsonデータをいれて、右側でmessage名を入力して、Testを押すことでシリアライズできるか確認できます。

MQTTXでprotobufデータをパブリッシュ&確認

サイドバーの一番上のアイコンからConnections画面に移動します。右側のConnectからIoT Coreのエンドポイントに接続します。
※ 接続の設定がまだの場合は、こちらの記事を参考に設定してください。

接続が完了したら、右上の...ボタンからRun Scriptを選択します。先ほど作成したスキーマを選択します。スキーマを選択することで、送ったデータを自動的にスキーマに応じてシリアライズしてくれます。今回は送信時に適用するようにします。

Screenshot 2025-04-10 at 10.35.09
Screenshot 2025-04-10 at 10.34.57

パブリッシュしたデータを確認するために、画面中央のNew Subscriptionから、 sensors/+decodedをサブスクライブします。

Screenshot 2025-04-10 at 17.59.33

画面下部に以下のJSONデータを入れて、右下のボタンからパブリッシュします。

{
  "id": "sensor1",
  "temperature": 25.5,
  "humidity": 60.0,
  "timestamp": 1625256000
}

画面中央の右側にはパブリッシュによって送信したデータが、左側にはサブスクライブによって受け取ったデータが表示されます。左側にはsensors/123abとしてパブリッシュしたバイナリデータがそのまま表示され、decodedとしてデコードされたデータが表示されています。データ量は少なくなっていそうなことがわかります。

Screenshot 2025-04-10 at 17.28.45

さいごに

今回は、Protocol Buffers形式のデータをMQTTでパブリッシュし、AWS IoTルールでデコードする流れを試してみました。AWS IoTルールでは、decode関数を呼ぶことでprotobufデータを簡単にデコードできました。where句でデコードし、デコード結果を元に絞り込むことなども可能なようなので、活用の幅は広そうです。

また、今回使ったMQTTクライアントとして用いたMQTTXは、protobufデータを扱うことができ、検証時に使いやすそうでした!

AWS IoTでProtocol Buffersを使う際には、以下の資料も参考になります。

Protocol Buffersについては、ベストプラクティスなども公式ドキュメントに記載があり、参考になります。

今回の検証には、以下のAWS公式ブログを参考にしました。where句でprotobufデータをデコードし、デコード結果を元に絞り込む例も記載されてます。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.