MQTTに送ったメッセージをAmazon Kinesisにプロキシさせる

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

MQTT(MQ Telemetry Transport) というメッセージングプロトコルが IoT/M2M 界隈で注目されてきています。MQTTには

  • HTTPに比べてプロトコルオーバーヘッドが小さい
  • 到達可能性を細かく制御できる
  • Pub/Sub メッセージが手軽に扱える

といった特徴があります。 昨年末から販売されている超小型コンピュータ Intel Edison では初期状態から MQTT のライブラリがインストールされているなど、MQTT を利用する敷居が下がりつつあります。

IoTデバイスがMQTTブローカーにデータ送信し、Amazon Kinesis でリアルタイムでデータ処理する、というようなユースケースは今後どんどん増えてくるのではないでしょうか。

MQTT-Amazon Kinesis bridge

 

本ブログの狙い

本ブログではmqtt-kinesis-bridge を利用し、MQTT ブローカーに送信されたメッセージを Amazon Kinesis にプロキシして取り込む方法を紹介します。

AWS Labas が公開している mqtt-kinesis-bridge は MQTT と Amazon Kinesis を連携する IoT の手段としてまれに紹介されますが、どういうわけか、ちゃんとインストールしたブログをあまり見かけないので、人柱になって動かしてみた次第です。

以下の流れでデータ連携の動作確認をします。

  1. MQTT ブローカー mosquitto をインストールし、pub/sub を確認
  2. mqtt-kinesis-bridge のベースになっている MQTT ライブラリの Paho を利用して pub/sub を確認
  3. MQTT ブローカーへ publish したメッセージを mqtt-kinesis-bridge で Kinesis にプロキシし、AWS CLI で Kinesis からレコードを受信

検証環境は Amazon Linux AMI 2015.03 とします。

 

MQTT mosquitto で pub/sub してみる

まずは MQTT の OSS 実装である mosquitto をインストールし、pub/sub してみます。

MQTT mosquitto のインストール

CentOS 6 向けのパッケージが用意されているため、CentOS 6 向けと同じく yum レポジトリを追加して Amazon Linux にインストールします。

$ sudo curl http://download.opensuse.org/repositories/home:/oojah:/mqtt/CentOS_CentOS-6/home:oojah:mqtt.repo -o /etc/yum.repos.d/mqtt.repo
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   271  100   271    0     0    476      0 --:--:-- --:--:-- --:--:--   476
$ cat /etc/yum.repos.d/mqtt.repo
[home_oojah_mqtt]
name=mqtt (CentOS_CentOS-6)
type=rpm-md
baseurl=http://download.opensuse.org/repositories/home:/oojah:/mqtt/CentOS_CentOS-6/
gpgcheck=1
gpgkey=http://download.opensuse.org/repositories/home:/oojah:/mqtt/CentOS_CentOS-6/repodata/repomd.xml.key
enabled=1

$ yum search mosquitto
Loaded plugins: priorities, update-motd, upgrade-helper
================================================ N/S matched: mosquitto ======================================================================
mosquitto-clients.i686 : Mosquitto command line publish/subscribe clients
mosquitto-clients.x86_64 : Mosquitto command line publish/subscribe clients
mosquitto-debuginfo.i686 : Debug information for package mosquitto
mosquitto-debuginfo.x86_64 : Debug information for package mosquitto
libmosquitto-devel.i686 : MQTT C client library development files
libmosquitto-devel.x86_64 : MQTT C client library development files
libmosquitto1.i686 : MQTT C client library
libmosquitto1.x86_64 : MQTT C client library
libmosquittopp-devel.i686 : MQTT C++ client library development files
libmosquittopp-devel.x86_64 : MQTT C++ client library development files
libmosquittopp1.i686 : MQTT C++ client library
libmosquittopp1.x86_64 : MQTT C++ client library
mosquitto.i686 : MQTT version 3.1/3.1.1 compatible message broker
mosquitto.x86_64 : MQTT version 3.1/3.1.1 compatible message broker

  Name and summary matches only, use "search all" for everything.

$ yum search の実行結果からわかるように、mosquitto はサーバーmosquittoとクライアントmosquitto-clientの両方のアプリケーションを提供しているので、これらをインストールします。

$ sudo yum install -y mosquitto-clients mosquitto

mosquitto サーバ(MQTT ブローカー)の起動

mosquitto は自動起動する設定になっていないため chkconfig --add します。

$ sudo chkconfig --add mosquitto
$ sudo chkconfig mosquitto on
$ sudo chkconfig mosquitto --list
mosquitto       0:off   1:off   2:on    3:on    4:on    5:on    6:off

mosquitto サーバを起動します。

$ sudo service mosquitto start
Starting Mosquitto MQTT broker                             [  OK  ]
# 1439606245: mosquitto version 1.4.2 (build date 2015-06-10 13:52:20+0000) starting
1439606245: Config loaded from /etc/mosquitto/mosquitto.conf.
1439606245: Opening ipv4 listen socket on port 1883.
1439606245: Opening ipv6 listen socket on port 1883.

mosquitto はデフォルトでは 1883 番ポートで LISTEN します。

mosquitto クライアントから MQTT をしゃべってみる

MQTT ブローカーとの通信には、クライアントプログラム

  • publisher(mosquitto_pub)
  • subscribe(mosquitto_sub)

を利用します。

MQTT ブローカーに subscribe する

mosquitto_sub で MQTT ブローカーに subscribe します。 subscribe するトピックは -t オプションで指定します。 検証目的のため、デバッグオプション(-d)もつけて接続します。

$ mosquitto_sub -d -t sensors/temperature
Client mosqsub/7721-ip-172-31- sending CONNECT
Client mosqsub/7721-ip-172-31- received CONNACK
Client mosqsub/7721-ip-172-31- sending SUBSCRIBE (Mid: 1, Topic: sensors/temperature, QoS: 0)
Client mosqsub/7721-ip-172-31- received SUBACK
Subscribed (mid: 1): 0

MQTT ブローカーに publish する

次に mosquitto_pub で MQTT ブローカーにメッセージを publish します。 新規ターミナルを開き、トピック(-t)とメッセージ(-m)を指定します。

気温が32.5℃であれば、センサーからは以下の様なメッセージを送りたくなります。

$ mosquitto_pub -t sensors/temperature -m "32.5"
1439606534: New connection from 127.0.0.1 on port 1883.
1439606534: New client connected from 127.0.0.1 as mosqpub/7722-ip-172-31- (c1, k60).
1439606534: Client mosqpub/7722-ip-172-31- disconnected.

このとき mosquitto_sub のターミナルには以下のようなメッセージが表示されるはずです。

Client mosqsub/7721-ip-172-31- received PUBLISH (d0, q0, r0, m0, 'sensors/temperature', ... (4 bytes))
32.5

publish されたメッセージ 32.5 を受け取れました。

トピックをまとめて subscribe

今回は subscribe するメッセージを sensors/temperature と固定しましたが、 sensors/ で始まるトピック全般を subscribe したいときは sensors/+ のように + を接尾します。

$ mosquitto_sub -d -t sensors/+

この状態で

$ mosquitto_pub -t sensors/temperature -m "33.5"
$ mosquitto_pub -t sensors/humidity -m "78.9"

と publish すると subscriber には

...
Client mosqsub/30662-ip-172-31 received PUBLISH (d0, q0, r0, m0, 'sensors/temperature', ... (4 bytes))
33.5
...
Client mosqsub/30662-ip-172-31 received PUBLISH (d0, q0, r0, m0, 'sensors/humidity', ... (4 bytes))
78.9

というように sensors/temperaturesensors/humidity という sensors/ 系トピックのメッセージがまるっと送信されます。

Paho ライブラリで pub/sub してみる

Paho は Eclipse Foundation が中心になって開発する OSS の MQTT ライブラリです。 本ブログの最終ゴールである mqtt-kinesis-bridge はこの Python バインディングをベースにしています。 Paho の Python 実装を利用して pub/sub してみます。

MQTT ブローカーには引き続き mosquitto を利用します。

paho のインストール

Python ライブラリのためパッケージ管理ツール pip でパッケージ paho-mqtt をインストールします。

$ sudo pip install --upgrade paho-mqtt

paho-mqtt ライブラリの使い方はドキュメントやサンプロコードレポジトリを参考にしてください。 ライブラリは素直な作りで、 on_xxx とコールバックをフックする仕組みが備わっています。

MQTT サーバに subscribe する

プログラムを最小にするために、トピックやMQTT ブローカーはプログラム決め打ちとします。

まずは subscriber(sub.py) です。

# mqtt subscriber
# sub.py
import paho.mqtt.client as mqtt

# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
    print("Connected with result code "+str(rc))

    # Subscribing in on_connect() means that if we lose the connection and
    # reconnect then subscriptions will be renewed.
    client.subscribe(topic="sensors/temperature")

# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
    print("on_message:" + msg.topic+" "+str(msg.payload))

client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message

client.connect(host="localhost", port=1883)

client.loop_forever()

MQTT サーバに publish する

次に publisher(pub.py) です。 こちらもトピック、メッセージ、MQTT ブローカーともにプログラム内に決め打ちします。

# mqtt publisher
# pub.py
import paho.mqtt.publish as publish
publish.single(topic="sensors/temperature", payload="28.5", hostname="localhost", port=1883)

Paho から pub/sub の実行

次のコマンドで subscribe します。

$ python sub.py
Connected with result code 0

別ターミナルから次のコマンドで publish します。

$ python pub.py

subscriber のターミナルには次のようにメッセージ表示されれば成功です。

$ python sub.py
Connected with result code 0
on_message:sensors/temperature 28.5

mqtt-kinesis-bridge で MQTT メッセージを Kinesis にプロキシさせる

最後に MQTT-Kinesis ブリッジ mqtt-kinesis-bridge を使って MQTT に publish したメッセージを Amazon kinesis にプロキシします。

Amazon Kinesis Stream の用意

まずは Kinesis Stream を作成します。

$ aws kinesis create-stream --stream-name Foo --shard-count 1

mqtt-kinesis-bridge の起動

mqtt-kinesis-bridge はパッケージとして綺麗に切りだされていないため git でソースコードを持ってきます。

$ sudo yum install -y git jq
$ git clone https://github.com/awslabs/mqtt-kinesis-bridge.git
$ cd mqtt-kinesis-bridge

Kinesis との通信(PutRecord)には Python の AAWS SDK boto を利用しているため pip でインストールします。

$ sudo pip install --upgrade boto

プロキシープログラムである bridge.py に

Kinesis Stream名 リージョン subscribe するトピックス

を指定して MQTT-Kinesis プロキシを起動します。

$ export STREAM=Foo
$ python bridge.py $STREAM --region ap-northeast-1 --topic_name sensors/+
{
  "StreamDescription": {
    "HasMoreShards": false,
    "Shards": [
      {
        "HashKeyRange": {
          "EndingHashKey": "34",
          "StartingHashKey": "0"
        },
        "SequenceNumberRange": {
          "StartingSequenceNumber": "49"
        },
        "ShardId": "shardId-000000000000"
      }
    ],
    "StreamARN": "arn:aws:kinesis:ap-northeast-1:000000000000:stream/Foo",
    "StreamName": "Foo",
    "StreamStatus": "ACTIVE"
  }
}
Starting MQTT-to-Kinesis bridge
Bridge Connected, looping...
1439640509: New connection from 127.0.0.1 on port 1883.
1439640509: New client connected from 127.0.0.1 as paho/4DF21AE636529C7E2F (c1, k60).
Connection Msg:
Subscribe topic: sensors/+ RC: (0, 1)

MQTT に publish

次に、別ターミナルからトピック sensors/humidity にメッセージを publish します。

$ mosquitto_pub -d -t sensors/humidity -m "78.9"
Client mosqpub/29001-ip-172-31 sending CONNECT
Client mosqpub/29001-ip-172-31 received CONNACK
Client mosqpub/29001-ip-172-31 sending PUBLISH (d0, q0, r0, m1, 'sensors/humidity', ... (4 bytes))
Client mosqpub/29001-ip-172-31 sending DISCONNECT

プロキシサーバに以下のようなメッセージが表示されていれば成功です。

on_message topic: "sensors/humidity" msg.payload: "78.9"
-= put seqNum: 49553524959787167203100577550735447012172782095451553810

ログ出力されている seqNum は Amazon Kinesis の SequenceNumber です。

Kinesis からレコードを取得

AWS CLI を使って Amazon Kinesis からレコードを取得してみましょう。 SequenceNumber を元に ShardIterator を取得し、レコードを取得します。

$ aws kinesis get-shard-iterator \
  --stream-name Foo \
  --shard-id shardId-000000000000 \
  --shard-iterator-type AT_SEQUENCE_NUMBER \
  --starting-sequence-number 49553524959787167203100577550735447012172782095451553810
{
    "ShardIterator": "012345"
}
$ aws kinesis get-records --shard-iterator 012345
{
    "Records": [
        {
            "PartitionKey": "sensors/humidity",
            "Data": "NzguOQ==",
            "SequenceNumber": "49553524959787167203100577550735447012172782095451553810"
        }
    ],
    "NextShardIterator": "...",
    "MillisBehindLatest": 199000
}

レコードの payload(上のレスポンスでは NzguOQ==) は base64 エンコードされています。 このデータをデコードし、 publish したメッセージと同じか確認します。

$ echo -n "NzguOQ==" | base64 -d
78.9

期待通りの値にデコードされました。

まとめ

今回は mqtt-kinesis-bridge を使って MQTT から Amazon Kinesis にデータ連携してみました。 MQTT やMQTT ライブラリの動作確認も行ったため、ブログは長くなりましたが、手元でさくっと MQTT-Kinesis 連携させる上ではお手軽なソリューションの一つです。

mqtt-kinesis-bridge は README ファイルに "A simple Python-based MQTT-to-Kinesis Bridge example" と書かれているように、あくまでモック的な位置づけであり、 実態は MQTT からのメッセージ受信のイベントに対して、実装を最大限サボって Kinesis に PutRecord しているだけであり、MQTTのQoS保証を活かして取りこぼしなく Kinesis に流すことなどは1インデントも考慮されていません。

ちょっと動かしてみる分には便利ですが、その辺りを理解した上で利用しましょう。

補足

  • Kinesis ストリームの作成と EC2(作成したストリームへの操作権限ロール付き)の起動をする cloudformation
  • EC2 へもろもろのインストールをする Ansible Playbook

を用意しました。

mqtt-kinesis-bridge を動かす手間を減らしたいという方がいらっしゃいましたら、お使いください。

レポジトリ https://github.com/quiver/mqtt-kinesis-bridge-ansible-playbook

cloudformation で EC2/Kinesis を起動

$ git clone git@github.com:quiver/mqtt-kinesis-bridge-ansible-playbook.git
$ cd mqtt-kinesis-bridge-ansible-playbook
$ vi cloudformation/parameters.json # SSH の鍵の名前やインスタンスタイプを修正
$ aws cloudformation create-stack \
  --stack-name mqtt2kinesis \
  --template-body file://cloudformation/kinesis-data-vis-sample-app.template.json \
  --parameters file://cloudformation/parameters.json \
  --capabilities=CAPABILITY_IAM

Ansible Playbook で MQTT 関連プログラムをインストール

$ cat hosts.ini
; IP address of the target EC2 instance
1.2.3.4
$ vi hosts.ini # EC2 の IP アドレスを編集
$ ansible-playbook -i hosts.ini site.yml

参考