AWS IoTルールでProtocol Buffersをデコードする with MQTTX
IoTデバイスとクラウド間でデータをやり取りする際、通信手段としてMQTTは広く使われています。
工場内の機器や車両といったユースケースを考えると、ネットワークの帯域幅が限られているケースも少なくありません。そうした環境下では、通信データのサイズや形式にも工夫が求められます。
今回は、スキーマを事前に定めてバイナリ形式にシリアライズすることでデータを軽量にできるProtocol Buffers(protobuf)形式のデータをMQTTでパブリッシュし、それをIoT Coreのルールエンジンを使って別のトピックに転送する、という流れを試してみたいと思います。
MQTTクライアントには、GUIで扱いやすいMQTTXを使用します。MQTTXでは、protobufスキーマを登録し、送受信するメッセージのシリアライズ/デコード(デシリアライズ)が可能です。
メッセージングプロトコルとシリアライズ形式の比較については、IoV(Internet of Vehicles)における内容ですが、以下の資料が参考になります。
事前準備
次の内容は事前に対応済みであることを想定しています。
- protobufコンパイラのprotocをインストール
- IoT Coreのモノの登録、エンドポイントの登録、MQTTXでの接続の設定
試してみる
次のような流れで試します。
- protobufのprotoファイルの作成(スキーマ定義)
- protobufの記述子ファイルを生成
- 記述子ファイルをS3バケットにアップロード
- protobufデータをデコード&別のトピックに転送するルールを作成
- S3バケットポリシーの設定
- MQTTXにprotoファイルをimport
- MQTTXでprotobufデータをパブリッシュ&確認
protobufのprotoファイルの作成(スキーマ定義)
センサーデータのスキーマを定義するためのprotoファイルを作成します。以下の内容でsensor.proto
というファイルを作成します。
syntax = "proto3";
message Sensor {
optional string id = 1;
optional float temperature = 2;
optional float humidity = 3;
optional int64 timestamp = 4;
}
protobufの記述子ファイルを生成
次に、protocを使って、protoファイルから記述子ファイルを生成します。
protoc --descriptor_set_out=sensor.desc --include_imports sensor.proto
protocの使い方については、protoc --help
で確認できます。
また、protobufの記述子については以下の資料が参考になります。
記述子ファイルをS3バケットにアップロード
aws cliもしくはマネコンなどから生成した記述子ファイルをS3バケットにアップロードします。必要に応じてS3バケットを作成してください。
aws s3 cp sensor.desc s3://<your-bucket-name>/<object-key>
protobufデータをデコード&別のトピックに転送するルールを作成
マネジメントコンソールからAWS IoTルールを作成します。
decode関数を使って、protobufをデコードするSQLを書きます。関数の引数には、デコード対象、protbuf(prot)であることや記述子があるS3バケットとオブジェクトキー、protoファイルの名前、メッセージ名を記載します。
また、以下のドキュメントに記載のベストプラクティスとして、記述子ファイルの中身を変更する場合は、オブジェクトキーの変更を推奨する旨の記載があります。オブジェクトキーを変更しない場合、記述子ファイルの中身の変更が反映されるまで15分程度かかるようなので、注意が必要です。
アクションとしてdecoded
トピック にパブリッシュするように設定し、ルールを作成します。
対象トピックにパブリッシュできるIAMロールを作成し、指定します。
S3バケットポリシーの設定
マネジメントコンソールでS3バケットのアクセス許可ページを開き、以下のバケットポリシーを設定します。記述子を配置したS3バケットにAWS IoTがアクセスできるようにします。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "Statement1",
"Effect": "Allow",
"Principal": {
"Service": "iot.amazonaws.com"
},
"Action": "s3:Get*",
"Resource": "arn:aws:s3:::<バケット名>/<オブジェクトキー>"
}
]
}
MQTTXでprotoファイルのimport
MQTTXを開き、サイドバーの上から4つ目のアイコンを開きます。そうすると、script画面が開くので、上のタブでSchemaを開きます。
右上にある、Import proto fileをクリックし、先ほど作成したprotoファイルを選択します。
そうすると、先ほど表示されたprotobufスキーマが表示されます。
下のInputのところにjsonデータをいれて、右側でmessage名を入力して、Testを押すことでシリアライズできるか確認できます。
MQTTXでprotobufデータをパブリッシュ&確認
サイドバーの一番上のアイコンからConnections画面に移動します。右側のConnectからIoT Coreのエンドポイントに接続します。
※ 接続の設定がまだの場合は、こちらの記事を参考に設定してください。
接続が完了したら、右上の...ボタンからRun Scriptを選択します。先ほど作成したスキーマを選択します。スキーマを選択することで、送ったデータを自動的にスキーマに応じてシリアライズしてくれます。今回は送信時に適用するようにします。
パブリッシュしたデータを確認するために、画面中央のNew Subscriptionから、 sensors/+
とdecoded
をサブスクライブします。
画面下部に以下のJSONデータを入れて、右下のボタンからパブリッシュします。
{
"id": "sensor1",
"temperature": 25.5,
"humidity": 60.0,
"timestamp": 1625256000
}
画面中央の右側にはパブリッシュによって送信したデータが、左側にはサブスクライブによって受け取ったデータが表示されます。左側にはsensors/123abとしてパブリッシュしたバイナリデータがそのまま表示され、decodedとしてデコードされたデータが表示されています。データ量は少なくなっていそうなことがわかります。
さいごに
今回は、Protocol Buffers形式のデータをMQTTでパブリッシュし、AWS IoTルールでデコードする流れを試してみました。AWS IoTルールでは、decode関数を呼ぶことでprotobufデータを簡単にデコードできました。where句でデコードし、デコード結果を元に絞り込むことなども可能なようなので、活用の幅は広そうです。
また、今回使ったMQTTクライアントとして用いたMQTTXは、protobufデータを扱うことができ、検証時に使いやすそうでした!
AWS IoTでProtocol Buffersを使う際には、以下の資料も参考になります。
Protocol Buffersについては、ベストプラクティスなども公式ドキュメントに記載があり、参考になります。
- Programming Guides | Protocol Buffers Documentation
- Proto Best Practices | Protocol Buffers Documentation
今回の検証には、以下のAWS公式ブログを参考にしました。where句でprotobufデータをデコードし、デコード結果を元に絞り込む例も記載されてます。