OpenBlocks IoT BX1でTI製センサータグ(CC2650)のセンサーデータ(10種類)をAmazon Kinesisに飛ばす
こんにちは、せーのです。今日はBX1を使って取得したデータをクラウドに飛ばしてみたいと思います。
おさらい
まずは今までのおさらいです。初期設定、ネットワーク設定、センサーデータの取得方法は以下の記事を参考にしてください。
- OpenBlocks IoT BX1を触ってみた
- OpenBlocks IoT BX1:コンソールによるネットワーク設定と初期化の方法
- OpenBlocks IoT BX1でTI製センサータグ(CC2650)のセンサーデータ(10種類)を取得する
クラウドに飛ばしてみよう
さて、今回は前回の応用編です。前回書いたセンサーデータを取得するソースを改変してデータをAmazon Kinesisにputしてみたいと思います。またセキュリティを考えCognitoにて認証してみたいと思います。
やってみる
Kinesisストリームを作る
それではさっそくやってみます。まずはKinesisのストリームを作ります。マネージメントコンソールのAmazon Kinesisの画面からストリームを作成します。 センサーデータは大抵の場合1kbにも満たない小さいものです。今回はファンクションごとにputするので秒間7-8発叩かれるのでそれを計算してみるとシャードは1つでいいようなのでそれで設定致します。
SDKよりAmazon Kinesisオブジェクトを作成する
次にコードを書いていきます。Amazon Kinesisのオブジェクトを作成します。
var AWS = require("aws-sdk"); AWS.config.region = "ap-northeast-1"; var kinesis = new AWS.Kinesis({ region: 'ap-northeast-1' }); console.info("Using Amazon Kinesis Stream: " + mySTREAM);
Amazon Kinesisのput部分を作成する
前回のコードでは取得したデータはconsole.logでただ表示しただけでした。今回はそこの部分を改変してAmazon kinesisにputするAPIを叩いてみます。例えばジャイロスコープであればこのような感じです。
var moment = require("moment-timezone"); function ti_gyroscope(conned_obj) { var period = 1000; // ms conned_obj.enableGyroscope(function() { conned_obj.setGyroscopePeriod(period, function() { conned_obj.notifyGyroscope(function() { console.info("ready: notifyGyroscope"); console.info("notify period = " + period + "ms"); conned_obj.on('gyroscopeChange', function(x, y, z) { var data = { device_uuid: conned_obj.uuid, time: moment().tz("Asia/Tokyo").format(), payload: {gyro_X: x, gyro_Y: y, gyro_Z: z} }; var kparams = {Data: JSON.stringify(data), PartitionKey: "any", StreamName: mySTREAM}; kinesis.putRecord(kparams, function(err, data) { if (err) { console.log(err, err.stack); } else { console.log(kparams); console.log(data); } }); }); }); }); }); }
実際のジャイロスコープのセンサーデータの他にもセンサータグのUUIDを識別子として、またタイムスタンプをつけて見ました。これを全センサーデータ分書き換えるとこうなります。
/* * $ npm install sandeepmistry/node-sensortag aws-sdk moment-timezone * $ TI_UUID=your_ti_sensor_tag_UUID AK_STREAM=your_kinesis_stream node this_file.js */ var mySTREAM = process.env["AK_STREAM"] || "hyakuyo"; var myAddress = process.env["TI_ADDRESS"] || "YOUR_TI_sensor_tag_ADDRESS"; var AWS = require("aws-sdk"); AWS.config.region = "ap-northeast-1"; var kinesis = new AWS.Kinesis({ region: 'ap-northeast-1' }); console.info("Using AWS Kinesis Stream: " + mySTREAM); var moment = require("moment-timezone"); function ti_simple_key(conned_obj) { conned_obj.notifySimpleKey(function() { console.info("ready: notifySimpleKey"); console.info("/* left right (true = pushed, false = released) */"); conned_obj.on("simpleKeyChange", function(left, right) { /* run per pushed button */ var data = { device_uuid: conned_obj.uuid, time: moment().tz("Asia/Tokyo").format(), payload: {L: left, R: right} }; var kparams = {Data: JSON.stringify(data), PartitionKey: "any", StreamName: mySTREAM}; kinesis.putRecord(kparams, function(err, data) { if (err) { console.log(err, err.stack); } else { console.log(kparams); console.log(data); } }); }); }); } function ti_gyroscope(conned_obj) { var period = 1000; // ms conned_obj.enableGyroscope(function() { conned_obj.setGyroscopePeriod(period, function() { conned_obj.notifyGyroscope(function() { console.info("ready: notifyGyroscope"); console.info("notify period = " + period + "ms"); conned_obj.on('gyroscopeChange', function(x, y, z) { var data = { device_uuid: conned_obj.uuid, time: moment().tz("Asia/Tokyo").format(), payload: {gyro_X: x, gyro_Y: y, gyro_Z: z} }; var kparams = {Data: JSON.stringify(data), PartitionKey: "any", StreamName: mySTREAM}; kinesis.putRecord(kparams, function(err, data) { if (err) { console.log(err, err.stack); } else { console.log(kparams); console.log(data); } }); }); }); }); }); } function ti_ir_temperature(conned_obj) { var period = 1000; // ms conned_obj.enableIrTemperature(function() { conned_obj.setIrTemperaturePeriod(period, function() { conned_obj.notifyIrTemperature(function() { console.info("ready: notifyIrTemperature"); console.info("notify period = " + period + "ms"); conned_obj.on('irTemperatureChange', function(objectTemperature, ambientTemperature) { var data = { device_uuid: conned_obj.uuid, time: moment().tz("Asia/Tokyo").format(), payload: {objectTemperature: objectTemperature, ambientTemperature: ambientTemperature} }; var kparams = {Data: JSON.stringify(data), PartitionKey: "any", StreamName: mySTREAM}; kinesis.putRecord(kparams, function(err, data) { if (err) { console.log(err, err.stack); } else { console.log(kparams); console.log(data); } }); }); }); }); }); } function ti_accelerometer(conned_obj) { var period = 1000; // ms conned_obj.enableAccelerometer(function() { conned_obj.setAccelerometerPeriod(period, function() { conned_obj.notifyAccelerometer(function() { console.info("ready: notifyAccelerometer"); console.info("notify period = " + period + "ms"); conned_obj.on('accelerometerChange', function(x, y, z) { var data = { device_uuid: conned_obj.uuid, time: moment().tz("Asia/Tokyo").format(), payload: {accel_X: x, accel_Y: y, accel_Z: z} }; var kparams = {Data: JSON.stringify(data), PartitionKey: "any", StreamName: mySTREAM}; kinesis.putRecord(kparams, function(err, data) { if (err) { console.log(err, err.stack); } else { console.log(kparams); console.log(data); } }); }); }); }); }); } function ti_humidity(conned_obj) { var period = 1000; // ms conned_obj.enableHumidity(function() { conned_obj.setHumidityPeriod(period, function() { conned_obj.notifyHumidity(function() { console.info("ready: notifyHumidity"); console.info("notify period = " + period + "ms"); conned_obj.on('humidityChange', function(temperature, humidity) { var data = { device_uuid: conned_obj.uuid, time: moment().tz("Asia/Tokyo").format(), payload: {temperature: temperature, humidity: humidity} }; var kparams = {Data: JSON.stringify(data), PartitionKey: "any", StreamName: mySTREAM}; kinesis.putRecord(kparams, function(err, data) { if (err) { console.log(err, err.stack); } else { console.log(kparams); console.log(data); } }); }); }); }); }); } function ti_magnetometer(conned_obj) { var period = 1000; // ms conned_obj.enableMagnetometer(function() { conned_obj.setMagnetometerPeriod(period, function() { conned_obj.notifyMagnetometer(function() { console.info("ready: notifyMagnetometer"); console.info("notify period = " + period + "ms"); conned_obj.on('magnetometerChange', function(x, y, z) { var data = { device_uuid: conned_obj.uuid, time: moment().tz("Asia/Tokyo").format(), payload: {magnet_X: x, magnet_Y: y, magnet_Z: z} }; var kparams = {Data: JSON.stringify(data), PartitionKey: "any", StreamName: mySTREAM}; kinesis.putRecord(kparams, function(err, data) { if (err) { console.log(err, err.stack); } else { console.log(kparams); console.log(data); } }); }); }); }); }); } function ti_barometric_pressure(conned_obj) { var period = 1000; // ms conned_obj.enableBarometricPressure(function() { conned_obj.setBarometricPressurePeriod(period, function() { conned_obj.notifyBarometricPressure(function() { console.info("ready: notifyBarometricPressure"); console.info("notify period = " + period + "ms"); conned_obj.on('barometricPressureChange', function(pressure) { var data = { device_uuid: conned_obj.uuid, time: moment().tz("Asia/Tokyo").format(), payload: {pressure: pressure} }; var kparams = {Data: JSON.stringify(data), PartitionKey: "any", StreamName: mySTREAM}; kinesis.putRecord(kparams, function(err, data) { if (err) { console.log(err, err.stack); } else { console.log(kparams); console.log(data); } }); console.log('\tpressure = %d mBar', pressure.toFixed(1)); }); }); }); }); } function ti_luxometer(conned_obj) { var period = 1000; // ms conned_obj.enableLuxometer(function() { conned_obj.setLuxometerPeriod(period, function() { conned_obj.notifyLuxometer(function() { console.info("ready: notifyLuxometer"); console.info("notify period = " + period + "ms"); conned_obj.on('luxometerChange', function(lux) { var data = { device_uuid: conned_obj.uuid, time: moment().tz("Asia/Tokyo").format(), payload: {lux: lux} }; var kparams = {Data: JSON.stringify(data), PartitionKey: "any", StreamName: mySTREAM}; kinesis.putRecord(kparams, function(err, data) { if (err) { console.log(err, err.stack); } else { console.log(kparams); console.log(data); } }); console.log('\tlux = %d', lux.toFixed(1)); }); }); }); }); } var SensorTag = require('sensortag'); console.info(">> STOP: Ctrl+C or SensorTag power off"); console.info("start"); console.info("waiting for connect from " + myAddress); SensorTag.discoverByAddress(myAddress, function(sensorTag) { console.info("found: connect and setup ... (waiting 5~10 seconds)"); sensorTag.connectAndSetup(function() { sensorTag.readDeviceName(function(error, deviceName) { console.info("connect: " + deviceName); ti_simple_key(sensorTag); ti_gyroscope(sensorTag); ti_ir_temperature(sensorTag); ti_accelerometer(sensorTag); ti_humidity(sensorTag); ti_magnetometer(sensorTag); ti_barometric_pressure(sensorTag); ti_luxometer(sensorTag); }); }); /* In case of SensorTag PowerOff or out of range when fired `onDisconnect` */ sensorTag.on("disconnect", function() { console.info("disconnect and exit"); process.exit(0); }); });
後はaws configureコマンドでAPI KEY, API SECRET, regionを登録するか、パラメータで指定しておけばkinesisにputされます。
AWS_ACCESS_KEY_ID=XXXXXXXXXXXXXX AWS_SECRET_ACCESS_KEY=XXXXXXXXXXXXXXX TI_ADDRESS=B0:B4:48:B9:59:05 AK_STREAM=bx1data node ti_all.js Using AWS Kinesis Stream: bx1data >> STOP: Ctrl+C or SensorTag power off start waiting for connect from B0:B4:48:B9:59:05 found: connect and setup ... (waiting 5~10 seconds) connect: SensorTag 2.0 ready: notifySimpleKey /* left right (true = pushed, false = released) */ ready: notifyAccelerometer notify period = 1000ms ready: notifyMagnetometer notify period = 1000ms ready: notifyGyroscope notify period = 1000ms ready: notifyIrTemperature notify period = 1000ms ready: notifyHumidity notify period = 1000ms ready: notifyBarometricPressure notify period = 1000ms ready: notifyLuxometer notify period = 1000ms pressure = 1014.9 mBar { Data: '{"device_uuid":"b0b448b95905","time":"2015-09-14T02:26:34+09:00","payload":{"gyro_X":0,"gyro_Y":0,"gyro_Z":0}}', PartitionKey: 'any', StreamName: 'bx1data' } { SequenceNumber: '49554242436466365897581273724784425716267634993494228994', ShardId: 'shardId-000000000000' } { Data: '{"device_uuid":"b0b448b95905","time":"2015-09-14T02:26:34+09:00","payload":{"accel_X":0,"accel_Y":0,"accel_Z":0}}', PartitionKey: 'any', StreamName: 'bx1data' } { SequenceNumber: '49554242436466365897581273724785634642087249622668935170', ShardId: 'shardId-000000000000' } lux = 22.8 { Data: '{"device_uuid":"b0b448b95905","time":"2015-09-14T02:26:34+09:00","payload":{"magnet_X":0,"magnet_Y":0,"magnet_Z":0}}', PartitionKey: 'any', StreamName: 'bx1data' } { SequenceNumber: '49554242436466365897581273724786843567906864251843641346', ShardId: 'shardId-000000000000' } { Data: '{"device_uuid":"b0b448b95905","time":"2015-09-14T02:26:35+09:00","payload":{"objectTemperature":24.8125,"ambientTemperature":30.6875}}', PartitionKey: 'any', StreamName: 'bx1data' } { SequenceNumber: '49554242436466365897581273724788052493726478881018347522', ShardId: 'shardId-000000000000' } { Data: '{"device_uuid":"b0b448b95905","time":"2015-09-14T02:26:35+09:00","payload":{"temperature":30.7171630859375,"humidity":69.9462890625}}', PartitionKey: 'any', StreamName: 'bx1data' } { SequenceNumber: '49554242436466365897581273724789261419546093510193053698', ShardId: 'shardId-000000000000' } { Data: '{"device_uuid":"b0b448b95905","time":"2015-09-14T02:26:35+09:00","payload":{"pressure":1014.87}}', PartitionKey: 'any', StreamName: 'bx1data' } { SequenceNumber: '49554242436466365897581273724790470345365708139367759874', ShardId: 'shardId-000000000000' } { Data: '{"device_uuid":"b0b448b95905","time":"2015-09-14T02:26:35+09:00","payload":{"lux":22.83}}', PartitionKey: 'any', StreamName: 'bx1data' } { SequenceNumber: '49554242436466365897581273724791679271185322768542466050', ShardId: 'shardId-000000000000' } { Data: '{"device_uuid":"b0b448b95905","time":"2015-09-14T02:26:35+09:00","payload":{"accel_X":-0.03607177734375,"accel_Y":-0.0791015625,"accel_Z":0.22222900390625}}', PartitionKey: 'any', StreamName: 'bx1data' } { SequenceNumber: '49554242436466365897581273724792888197004937397717172226', ShardId: 'shardId-000000000000' } { Data: '{"device_uuid":"b0b448b95905","time":"2015-09-14T02:26:35+09:00","payload":{"gyro_X":-2.7008056640625,"gyro_Y":25.20751953125,"gyro_Z":23.97918701171875}}', PartitionKey: 'any', StreamName: 'bx1data' } .......
あとはこのデータを使ってTableauやKibanaで可視化すればリアルタイムでグラフデータが見られるかと思います。ちなみにLogstashとKibana 4を使ってこのデータを可視化するとこんな感じになります。
ここらへんのデータの扱い方はこちらの記事等を参考にしてみてください。
Amazon Cognito対応をする
これでも目的は達成しているわけですが、IoTゲートウェイとはいえAPI KEYをむき出しで扱うのはセキュリティ的にあまり宜しくありません。ということでAmazon CognitoのUnAuthを使って認証部分を書き換えてみたいと思います。
まずはマネージメントコンソールからAmazon Cognitoを開き、新しいIdentity Poolを作成します。
後は出来上がったIdentity Pool IdとUnAuthのRole、アカウントIDを使ってAmazon kinesisのオブジェクト作成前に認証部分を書き換えます。
var AWS = require("aws-sdk"); //AWS.config.region = "ap-northeast-1"; AWS.config.region = "us-east-1"; AWS.config.credentials = new AWS.CognitoIdentityCredentials({ AccountId: "123456789012", RoleArn: "arn:aws:iam::123456789012:role/cognito-unauth", IdentityPoolId: 'us-east-1:xxxxxxxxxx-xxxx-xxxx-xxxxx-xxxxxxxxxx' }); var kinesis = new AWS.Kinesis({ region: 'ap-northeast-1' });
気をつけるのはAmazon Cognitoは東京リージョンでは使えないので認証は別のリージョンにて行い、Amazon kinesisのリージョンを明示することでデータは東京リージョンのAmazon kinesisに行くようにしてあげる事です。 これでnodeを叩く時にAPI KEYやAPI SECRETを指定しなくてもAWSにデータを飛ばすことができるようになりました。
まとめ
いかがでしたでしょうか。AWSのSDKに慣れていれば意外と簡単でしたね。ただこれですと7回/秒でAmazon kinesisのAPIが叩かれるのでBX1内にデータを貯めておいて、ひと通りのセンサーデータが溜まったらAPIを叩く、というようなリファクタリングをするとより効率のよいデータ送信ができるかと思います。