この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
はじめに
MQTT(MQ Telemetry Transport) というメッセージングプロトコルが IoT/M2M 界隈で注目されてきています。MQTTには
- HTTPに比べてプロトコルオーバーヘッドが小さい
- 到達可能性を細かく制御できる
- Pub/Sub メッセージが手軽に扱える
といった特徴があります。 昨年末から販売されている超小型コンピュータ Intel Edison では初期状態から MQTT のライブラリがインストールされているなど、MQTT を利用する敷居が下がりつつあります。
IoTデバイスがMQTTブローカーにデータ送信し、Amazon Kinesis でリアルタイムでデータ処理する、というようなユースケースは今後どんどん増えてくるのではないでしょうか。
本ブログの狙い
本ブログではmqtt-kinesis-bridge を利用し、MQTT ブローカーに送信されたメッセージを Amazon Kinesis にプロキシして取り込む方法を紹介します。
AWS Labas が公開している mqtt-kinesis-bridge
は MQTT と Amazon Kinesis を連携する IoT の手段としてまれに紹介されますが、どういうわけか、ちゃんとインストールしたブログをあまり見かけないので、人柱になって動かしてみた次第です。
以下の流れでデータ連携の動作確認をします。
- MQTT ブローカー mosquitto をインストールし、pub/sub を確認
mqtt-kinesis-bridge
のベースになっている MQTT ライブラリの Paho を利用して pub/sub を確認- MQTT ブローカーへ publish したメッセージを
mqtt-kinesis-bridge
で Kinesis にプロキシし、AWS CLI で Kinesis からレコードを受信
検証環境は Amazon Linux AMI 2015.03 とします。
MQTT mosquitto で pub/sub してみる
まずは MQTT の OSS 実装である mosquitto
をインストールし、pub/sub してみます。
MQTT mosquitto のインストール
CentOS 6 向けのパッケージが用意されているため、CentOS 6 向けと同じく yum レポジトリを追加して Amazon Linux にインストールします。
$ sudo curl http://download.opensuse.org/repositories/home:/oojah:/mqtt/CentOS_CentOS-6/home:oojah:mqtt.repo -o /etc/yum.repos.d/mqtt.repo
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 271 100 271 0 0 476 0 --:--:-- --:--:-- --:--:-- 476
$ cat /etc/yum.repos.d/mqtt.repo
[home_oojah_mqtt]
name=mqtt (CentOS_CentOS-6)
type=rpm-md
baseurl=http://download.opensuse.org/repositories/home:/oojah:/mqtt/CentOS_CentOS-6/
gpgcheck=1
gpgkey=http://download.opensuse.org/repositories/home:/oojah:/mqtt/CentOS_CentOS-6/repodata/repomd.xml.key
enabled=1
$ yum search mosquitto
Loaded plugins: priorities, update-motd, upgrade-helper
================================================ N/S matched: mosquitto ======================================================================
mosquitto-clients.i686 : Mosquitto command line publish/subscribe clients
mosquitto-clients.x86_64 : Mosquitto command line publish/subscribe clients
mosquitto-debuginfo.i686 : Debug information for package mosquitto
mosquitto-debuginfo.x86_64 : Debug information for package mosquitto
libmosquitto-devel.i686 : MQTT C client library development files
libmosquitto-devel.x86_64 : MQTT C client library development files
libmosquitto1.i686 : MQTT C client library
libmosquitto1.x86_64 : MQTT C client library
libmosquittopp-devel.i686 : MQTT C++ client library development files
libmosquittopp-devel.x86_64 : MQTT C++ client library development files
libmosquittopp1.i686 : MQTT C++ client library
libmosquittopp1.x86_64 : MQTT C++ client library
mosquitto.i686 : MQTT version 3.1/3.1.1 compatible message broker
mosquitto.x86_64 : MQTT version 3.1/3.1.1 compatible message broker
Name and summary matches only, use "search all" for everything.
$ yum search
の実行結果からわかるように、mosquitto はサーバーmosquitto
とクライアントmosquitto-client
の両方のアプリケーションを提供しているので、これらをインストールします。
$ sudo yum install -y mosquitto-clients mosquitto
mosquitto サーバ(MQTT ブローカー)の起動
mosquitto は自動起動する設定になっていないため chkconfig --add
します。
$ sudo chkconfig --add mosquitto
$ sudo chkconfig mosquitto on
$ sudo chkconfig mosquitto --list
mosquitto 0:off 1:off 2:on 3:on 4:on 5:on 6:off
mosquitto サーバを起動します。
$ sudo service mosquitto start
Starting Mosquitto MQTT broker [ OK ]
# 1439606245: mosquitto version 1.4.2 (build date 2015-06-10 13:52:20+0000) starting
1439606245: Config loaded from /etc/mosquitto/mosquitto.conf.
1439606245: Opening ipv4 listen socket on port 1883.
1439606245: Opening ipv6 listen socket on port 1883.
mosquitto はデフォルトでは 1883 番ポートで LISTEN
します。
mosquitto クライアントから MQTT をしゃべってみる
MQTT ブローカーとの通信には、クライアントプログラム
- publisher(
mosquitto_pub
) - subscribe(
mosquitto_sub
)
を利用します。
MQTT ブローカーに subscribe する
mosquitto_sub
で MQTT ブローカーに subscribe します。
subscribe するトピックは -t
オプションで指定します。
検証目的のため、デバッグオプション(-d
)もつけて接続します。
$ mosquitto_sub -d -t sensors/temperature
Client mosqsub/7721-ip-172-31- sending CONNECT
Client mosqsub/7721-ip-172-31- received CONNACK
Client mosqsub/7721-ip-172-31- sending SUBSCRIBE (Mid: 1, Topic: sensors/temperature, QoS: 0)
Client mosqsub/7721-ip-172-31- received SUBACK
Subscribed (mid: 1): 0
MQTT ブローカーに publish する
次に mosquitto_pub
で MQTT ブローカーにメッセージを publish します。
新規ターミナルを開き、トピック(-t
)とメッセージ(-m
)を指定します。
気温が32.5℃であれば、センサーからは以下の様なメッセージを送りたくなります。
$ mosquitto_pub -t sensors/temperature -m "32.5"
1439606534: New connection from 127.0.0.1 on port 1883.
1439606534: New client connected from 127.0.0.1 as mosqpub/7722-ip-172-31- (c1, k60).
1439606534: Client mosqpub/7722-ip-172-31- disconnected.
このとき mosquitto_sub
のターミナルには以下のようなメッセージが表示されるはずです。
Client mosqsub/7721-ip-172-31- received PUBLISH (d0, q0, r0, m0, 'sensors/temperature', ... (4 bytes))
32.5
publish されたメッセージ 32.5
を受け取れました。
トピックをまとめて subscribe
今回は subscribe するメッセージを sensors/temperature
と固定しましたが、 sensors/
で始まるトピック全般を subscribe したいときは sensors/+
のように +
を接尾します。
$ mosquitto_sub -d -t sensors/+
この状態で
$ mosquitto_pub -t sensors/temperature -m "33.5"
$ mosquitto_pub -t sensors/humidity -m "78.9"
と publish すると subscriber には
...
Client mosqsub/30662-ip-172-31 received PUBLISH (d0, q0, r0, m0, 'sensors/temperature', ... (4 bytes))
33.5
...
Client mosqsub/30662-ip-172-31 received PUBLISH (d0, q0, r0, m0, 'sensors/humidity', ... (4 bytes))
78.9
というように sensors/temperature
と sensors/humidity
という sensors/
系トピックのメッセージがまるっと送信されます。
Paho ライブラリで pub/sub してみる
Paho は Eclipse Foundation が中心になって開発する OSS の MQTT ライブラリです。 本ブログの最終ゴールである mqtt-kinesis-bridge はこの Python バインディングをベースにしています。 Paho の Python 実装を利用して pub/sub してみます。
MQTT ブローカーには引き続き mosquitto を利用します。
paho のインストール
Python ライブラリのためパッケージ管理ツール pip
でパッケージ paho-mqtt
をインストールします。
$ sudo pip install --upgrade paho-mqtt
paho-mqtt
ライブラリの使い方はドキュメントやサンプロコードレポジトリを参考にしてください。
ライブラリは素直な作りで、 on_xxx
とコールバックをフックする仕組みが備わっています。
MQTT サーバに subscribe する
プログラムを最小にするために、トピックやMQTT ブローカーはプログラム決め打ちとします。
まずは subscriber(sub.py) です。
# mqtt subscriber
# sub.py
import paho.mqtt.client as mqtt
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
client.subscribe(topic="sensors/temperature")
# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
print("on_message:" + msg.topic+" "+str(msg.payload))
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect(host="localhost", port=1883)
client.loop_forever()
MQTT サーバに publish する
次に publisher(pub.py) です。 こちらもトピック、メッセージ、MQTT ブローカーともにプログラム内に決め打ちします。
# mqtt publisher
# pub.py
import paho.mqtt.publish as publish
publish.single(topic="sensors/temperature", payload="28.5", hostname="localhost", port=1883)
Paho から pub/sub の実行
次のコマンドで subscribe します。
$ python sub.py
Connected with result code 0
別ターミナルから次のコマンドで publish します。
$ python pub.py
subscriber のターミナルには次のようにメッセージ表示されれば成功です。
$ python sub.py
Connected with result code 0
on_message:sensors/temperature 28.5
mqtt-kinesis-bridge で MQTT メッセージを Kinesis にプロキシさせる
最後に MQTT-Kinesis ブリッジ mqtt-kinesis-bridge
を使って MQTT に publish したメッセージを Amazon kinesis にプロキシします。
Amazon Kinesis Stream の用意
まずは Kinesis Stream を作成します。
$ aws kinesis create-stream --stream-name Foo --shard-count 1
mqtt-kinesis-bridge の起動
mqtt-kinesis-bridge
はパッケージとして綺麗に切りだされていないため git でソースコードを持ってきます。
$ sudo yum install -y git jq
$ git clone https://github.com/awslabs/mqtt-kinesis-bridge.git
$ cd mqtt-kinesis-bridge
Kinesis との通信(PutRecord
)には Python の AAWS SDK boto を利用しているため pip でインストールします。
$ sudo pip install --upgrade boto
プロキシープログラムである bridge.py に
Kinesis Stream名 リージョン subscribe するトピックス
を指定して MQTT-Kinesis プロキシを起動します。
$ export STREAM=Foo
$ python bridge.py $STREAM --region ap-northeast-1 --topic_name sensors/+
{
"StreamDescription": {
"HasMoreShards": false,
"Shards": [
{
"HashKeyRange": {
"EndingHashKey": "34",
"StartingHashKey": "0"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49"
},
"ShardId": "shardId-000000000000"
}
],
"StreamARN": "arn:aws:kinesis:ap-northeast-1:000000000000:stream/Foo",
"StreamName": "Foo",
"StreamStatus": "ACTIVE"
}
}
Starting MQTT-to-Kinesis bridge
Bridge Connected, looping...
1439640509: New connection from 127.0.0.1 on port 1883.
1439640509: New client connected from 127.0.0.1 as paho/4DF21AE636529C7E2F (c1, k60).
Connection Msg:
Subscribe topic: sensors/+ RC: (0, 1)
MQTT に publish
次に、別ターミナルからトピック sensors/humidity
にメッセージを publish します。
$ mosquitto_pub -d -t sensors/humidity -m "78.9"
Client mosqpub/29001-ip-172-31 sending CONNECT
Client mosqpub/29001-ip-172-31 received CONNACK
Client mosqpub/29001-ip-172-31 sending PUBLISH (d0, q0, r0, m1, 'sensors/humidity', ... (4 bytes))
Client mosqpub/29001-ip-172-31 sending DISCONNECT
プロキシサーバに以下のようなメッセージが表示されていれば成功です。
on_message topic: "sensors/humidity" msg.payload: "78.9"
-= put seqNum: 49553524959787167203100577550735447012172782095451553810
ログ出力されている seqNum は Amazon Kinesis の SequenceNumber
です。
Kinesis からレコードを取得
AWS CLI を使って Amazon Kinesis からレコードを取得してみましょう。
SequenceNumber
を元に ShardIterator
を取得し、レコードを取得します。
$ aws kinesis get-shard-iterator \
--stream-name Foo \
--shard-id shardId-000000000000 \
--shard-iterator-type AT_SEQUENCE_NUMBER \
--starting-sequence-number 49553524959787167203100577550735447012172782095451553810
{
"ShardIterator": "012345"
}
$ aws kinesis get-records --shard-iterator 012345
{
"Records": [
{
"PartitionKey": "sensors/humidity",
"Data": "NzguOQ==",
"SequenceNumber": "49553524959787167203100577550735447012172782095451553810"
}
],
"NextShardIterator": "...",
"MillisBehindLatest": 199000
}
レコードの payload(上のレスポンスでは NzguOQ==
) は base64 エンコードされています。
このデータをデコードし、 publish したメッセージと同じか確認します。
$ echo -n "NzguOQ==" | base64 -d
78.9
期待通りの値にデコードされました。
まとめ
今回は mqtt-kinesis-bridge を使って MQTT から Amazon Kinesis にデータ連携してみました。 MQTT やMQTT ライブラリの動作確認も行ったため、ブログは長くなりましたが、手元でさくっと MQTT-Kinesis 連携させる上ではお手軽なソリューションの一つです。
mqtt-kinesis-bridge は README ファイルに "A simple Python-based MQTT-to-Kinesis Bridge example" と書かれているように、あくまでモック的な位置づけであり、 実態は MQTT からのメッセージ受信のイベントに対して、実装を最大限サボって Kinesis に PutRecord しているだけであり、MQTTのQoS保証を活かして取りこぼしなく Kinesis に流すことなどは1インデントも考慮されていません。
ちょっと動かしてみる分には便利ですが、その辺りを理解した上で利用しましょう。
補足
- Kinesis ストリームの作成と EC2(作成したストリームへの操作権限ロール付き)の起動をする cloudformation
- EC2 へもろもろのインストールをする Ansible Playbook
を用意しました。
mqtt-kinesis-bridge を動かす手間を減らしたいという方がいらっしゃいましたら、お使いください。
レポジトリ https://github.com/quiver/mqtt-kinesis-bridge-ansible-playbook
cloudformation で EC2/Kinesis を起動
$ git clone git@github.com:quiver/mqtt-kinesis-bridge-ansible-playbook.git
$ cd mqtt-kinesis-bridge-ansible-playbook
$ vi cloudformation/parameters.json # SSH の鍵の名前やインスタンスタイプを修正
$ aws cloudformation create-stack \
--stack-name mqtt2kinesis \
--template-body file://cloudformation/kinesis-data-vis-sample-app.template.json \
--parameters file://cloudformation/parameters.json \
--capabilities=CAPABILITY_IAM
Ansible Playbook で MQTT 関連プログラムをインストール
$ cat hosts.ini
; IP address of the target EC2 instance
1.2.3.4
$ vi hosts.ini # EC2 の IP アドレスを編集
$ ansible-playbook -i hosts.ini site.yml
参考
- AWS Black Belt Techシリーズ Amazon Kinesis http://www.slideshare.net/AmazonWebServicesJapan/aws-black-belt-tech-amazon-kinesis
- GitHub の mqtt-kinesis-bridge レポジトリ https://github.com/awslabs/mqtt-kinesis-bridge
- MQTT Mosquitto http://mosquitto.org/
- MQTT Paho http://www.eclipse.org/paho/