OpenBlocks IoT BX1でTI製センサータグ(CC2650)のセンサーデータ(10種類)をAmazon Kinesisに飛ばす

2015.09.14

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、せーのです。今日はBX1を使って取得したデータをクラウドに飛ばしてみたいと思います。

おさらい

まずは今までのおさらいです。初期設定、ネットワーク設定、センサーデータの取得方法は以下の記事を参考にしてください。

クラウドに飛ばしてみよう

さて、今回は前回の応用編です。前回書いたセンサーデータを取得するソースを改変してデータをAmazon Kinesisにputしてみたいと思います。またセキュリティを考えCognitoにて認証してみたいと思います。

やってみる

Kinesisストリームを作る

それではさっそくやってみます。まずはKinesisのストリームを作ります。マネージメントコンソールのAmazon Kinesisの画面からストリームを作成します。 センサーデータは大抵の場合1kbにも満たない小さいものです。今回はファンクションごとにputするので秒間7-8発叩かれるのでそれを計算してみるとシャードは1つでいいようなのでそれで設定致します。

bx1tokinesis1

SDKよりAmazon Kinesisオブジェクトを作成する

次にコードを書いていきます。Amazon Kinesisのオブジェクトを作成します。

var AWS = require("aws-sdk");
AWS.config.region = "ap-northeast-1";
var kinesis = new AWS.Kinesis({
  region: 'ap-northeast-1'
});
console.info("Using Amazon Kinesis Stream: " + mySTREAM);

Amazon Kinesisのput部分を作成する

前回のコードでは取得したデータはconsole.logでただ表示しただけでした。今回はそこの部分を改変してAmazon kinesisにputするAPIを叩いてみます。例えばジャイロスコープであればこのような感じです。

var moment = require("moment-timezone");

function ti_gyroscope(conned_obj) {
	var period = 1000; // ms
	conned_obj.enableGyroscope(function() {
		conned_obj.setGyroscopePeriod(period, function() {
			conned_obj.notifyGyroscope(function() {
				console.info("ready: notifyGyroscope");
				console.info("notify period = " + period + "ms");
				conned_obj.on('gyroscopeChange', function(x, y, z) {
          var data = {
    				device_uuid: conned_obj.uuid,
    				time: moment().tz("Asia/Tokyo").format(),
    				payload: {gyro_X: x, gyro_Y: y, gyro_Z: z}
    			};
    			var kparams = {Data: JSON.stringify(data), PartitionKey: "any", StreamName: mySTREAM};
    			kinesis.putRecord(kparams, function(err, data) {
    				if (err) {
    					console.log(err, err.stack);
    				} else {
    					console.log(kparams);
    					console.log(data);
    				}
    			});
				});
			});
		});
	});
}

実際のジャイロスコープのセンサーデータの他にもセンサータグのUUIDを識別子として、またタイムスタンプをつけて見ました。これを全センサーデータ分書き換えるとこうなります。

/*
* $ npm install sandeepmistry/node-sensortag aws-sdk moment-timezone
* $ TI_UUID=your_ti_sensor_tag_UUID AK_STREAM=your_kinesis_stream node this_file.js
*/
var mySTREAM = process.env["AK_STREAM"] || "hyakuyo";
var myAddress = process.env["TI_ADDRESS"] || "YOUR_TI_sensor_tag_ADDRESS";

var AWS = require("aws-sdk");
AWS.config.region = "ap-northeast-1";
var kinesis = new AWS.Kinesis({
  region: 'ap-northeast-1'
});
console.info("Using AWS Kinesis Stream: " + mySTREAM);

var moment = require("moment-timezone");

function ti_simple_key(conned_obj) {
	conned_obj.notifySimpleKey(function() {
		console.info("ready: notifySimpleKey");
		console.info("/* left right (true = pushed, false = released) */");
		conned_obj.on("simpleKeyChange", function(left, right) { /* run per pushed button */
			var data = {
				device_uuid: conned_obj.uuid,
				time: moment().tz("Asia/Tokyo").format(),
				payload: {L: left, R: right}
			};
			var kparams = {Data: JSON.stringify(data), PartitionKey: "any", StreamName: mySTREAM};
			kinesis.putRecord(kparams, function(err, data) {
				if (err) {
					console.log(err, err.stack);
				} else {
					console.log(kparams);
					console.log(data);
				}
			});
		});
	});
}

function ti_gyroscope(conned_obj) {
	var period = 1000; // ms
	conned_obj.enableGyroscope(function() {
		conned_obj.setGyroscopePeriod(period, function() {
			conned_obj.notifyGyroscope(function() {
				console.info("ready: notifyGyroscope");
				console.info("notify period = " + period + "ms");
				conned_obj.on('gyroscopeChange', function(x, y, z) {
          var data = {
    				device_uuid: conned_obj.uuid,
    				time: moment().tz("Asia/Tokyo").format(),
    				payload: {gyro_X: x, gyro_Y: y, gyro_Z: z}
    			};
    			var kparams = {Data: JSON.stringify(data), PartitionKey: "any", StreamName: mySTREAM};
    			kinesis.putRecord(kparams, function(err, data) {
    				if (err) {
    					console.log(err, err.stack);
    				} else {
    					console.log(kparams);
    					console.log(data);
    				}
    			});
				});
			});
		});
	});
}

function ti_ir_temperature(conned_obj) {
	var period = 1000; // ms
	conned_obj.enableIrTemperature(function() {
		conned_obj.setIrTemperaturePeriod(period, function() {
			conned_obj.notifyIrTemperature(function() {
				console.info("ready: notifyIrTemperature");
				console.info("notify period = " + period + "ms");
				conned_obj.on('irTemperatureChange', function(objectTemperature, ambientTemperature) {
          var data = {
            device_uuid: conned_obj.uuid,
            time: moment().tz("Asia/Tokyo").format(),
            payload: {objectTemperature: objectTemperature, ambientTemperature: ambientTemperature}
          };
          var kparams = {Data: JSON.stringify(data), PartitionKey: "any", StreamName: mySTREAM};
          kinesis.putRecord(kparams, function(err, data) {
            if (err) {
              console.log(err, err.stack);
            } else {
              console.log(kparams);
              console.log(data);
            }
          });
				});
			});
		});
	});
}

function ti_accelerometer(conned_obj) {
	var period = 1000; // ms
	conned_obj.enableAccelerometer(function() {
		conned_obj.setAccelerometerPeriod(period, function() {
			conned_obj.notifyAccelerometer(function() {
				console.info("ready: notifyAccelerometer");
				console.info("notify period = " + period + "ms");
				conned_obj.on('accelerometerChange', function(x, y, z) {
            var data = {
              device_uuid: conned_obj.uuid,
              time: moment().tz("Asia/Tokyo").format(),
              payload: {accel_X: x, accel_Y: y, accel_Z: z}
            };
            var kparams = {Data: JSON.stringify(data), PartitionKey: "any", StreamName: mySTREAM};
            kinesis.putRecord(kparams, function(err, data) {
              if (err) {
                console.log(err, err.stack);
              } else {
                console.log(kparams);
                console.log(data);
              }
            });
				});
			});
		});
	});
}

function ti_humidity(conned_obj) {
	var period = 1000; // ms
	conned_obj.enableHumidity(function() {
		conned_obj.setHumidityPeriod(period, function() {
			conned_obj.notifyHumidity(function() {
				console.info("ready: notifyHumidity");
				console.info("notify period = " + period + "ms");
				conned_obj.on('humidityChange', function(temperature, humidity) {
            var data = {
              device_uuid: conned_obj.uuid,
              time: moment().tz("Asia/Tokyo").format(),
              payload: {temperature: temperature, humidity: humidity}
            };
            var kparams = {Data: JSON.stringify(data), PartitionKey: "any", StreamName: mySTREAM};
            kinesis.putRecord(kparams, function(err, data) {
              if (err) {
                console.log(err, err.stack);
              } else {
                console.log(kparams);
                console.log(data);
              }
            });
				});
			});
		});
	});
}

function ti_magnetometer(conned_obj) {
	var period = 1000; // ms
	conned_obj.enableMagnetometer(function() {
		conned_obj.setMagnetometerPeriod(period, function() {
			conned_obj.notifyMagnetometer(function() {
				console.info("ready: notifyMagnetometer");
				console.info("notify period = " + period + "ms");
				conned_obj.on('magnetometerChange', function(x, y, z) {
          var data = {
            device_uuid: conned_obj.uuid,
            time: moment().tz("Asia/Tokyo").format(),
            payload: {magnet_X: x, magnet_Y: y, magnet_Z: z}
          };
          var kparams = {Data: JSON.stringify(data), PartitionKey: "any", StreamName: mySTREAM};
          kinesis.putRecord(kparams, function(err, data) {
            if (err) {
              console.log(err, err.stack);
            } else {
              console.log(kparams);
              console.log(data);
            }
          });
				});
			});
		});
	});
}

function ti_barometric_pressure(conned_obj) {
	var period = 1000; // ms
	conned_obj.enableBarometricPressure(function() {
		conned_obj.setBarometricPressurePeriod(period, function() {
			conned_obj.notifyBarometricPressure(function() {
				console.info("ready: notifyBarometricPressure");
				console.info("notify period = " + period + "ms");
				conned_obj.on('barometricPressureChange', function(pressure) {
          var data = {
            device_uuid: conned_obj.uuid,
            time: moment().tz("Asia/Tokyo").format(),
            payload: {pressure: pressure}
          };
          var kparams = {Data: JSON.stringify(data), PartitionKey: "any", StreamName: mySTREAM};
          kinesis.putRecord(kparams, function(err, data) {
            if (err) {
              console.log(err, err.stack);
            } else {
              console.log(kparams);
              console.log(data);
            }
          });
            console.log('\tpressure = %d mBar', pressure.toFixed(1));
				});
			});
		});
	});
}

function ti_luxometer(conned_obj) {
	var period = 1000; // ms
	conned_obj.enableLuxometer(function() {
		conned_obj.setLuxometerPeriod(period, function() {
			conned_obj.notifyLuxometer(function() {
				console.info("ready: notifyLuxometer");
				console.info("notify period = " + period + "ms");
				conned_obj.on('luxometerChange', function(lux) {
          var data = {
            device_uuid: conned_obj.uuid,
            time: moment().tz("Asia/Tokyo").format(),
            payload: {lux: lux}
          };
          var kparams = {Data: JSON.stringify(data), PartitionKey: "any", StreamName: mySTREAM};
          kinesis.putRecord(kparams, function(err, data) {
            if (err) {
              console.log(err, err.stack);
            } else {
              console.log(kparams);
              console.log(data);
            }
          });
        	console.log('\tlux = %d', lux.toFixed(1));
				});
			});
		});
	});
}

var SensorTag = require('sensortag');
console.info(">> STOP: Ctrl+C or SensorTag power off");
console.info("start");
console.info("waiting for connect from " + myAddress);
SensorTag.discoverByAddress(myAddress, function(sensorTag) {
	console.info("found: connect and setup ... (waiting 5~10 seconds)");
	sensorTag.connectAndSetup(function() {
		sensorTag.readDeviceName(function(error, deviceName) {
			console.info("connect: " + deviceName);
			ti_simple_key(sensorTag);
      ti_gyroscope(sensorTag);
      ti_ir_temperature(sensorTag);
      ti_accelerometer(sensorTag);
      ti_humidity(sensorTag);
      ti_magnetometer(sensorTag);
      ti_barometric_pressure(sensorTag);
      ti_luxometer(sensorTag);
		});
	});
	/* In case of SensorTag PowerOff or out of range when fired `onDisconnect` */
	sensorTag.on("disconnect", function() {
		console.info("disconnect and exit");
		process.exit(0);
	});
});

後はaws configureコマンドでAPI KEY, API SECRET, regionを登録するか、パラメータで指定しておけばkinesisにputされます。

AWS_ACCESS_KEY_ID=XXXXXXXXXXXXXX AWS_SECRET_ACCESS_KEY=XXXXXXXXXXXXXXX TI_ADDRESS=B0:B4:48:B9:59:05 AK_STREAM=bx1data node ti_all.js
Using AWS Kinesis Stream: bx1data
>> STOP: Ctrl+C or SensorTag power off
start
waiting for connect from B0:B4:48:B9:59:05
found: connect and setup ... (waiting 5~10 seconds)
connect: SensorTag 2.0
ready: notifySimpleKey
/* left right (true = pushed, false = released) */
ready: notifyAccelerometer
notify period = 1000ms
ready: notifyMagnetometer
notify period = 1000ms
ready: notifyGyroscope
notify period = 1000ms
ready: notifyIrTemperature
notify period = 1000ms
ready: notifyHumidity
notify period = 1000ms
ready: notifyBarometricPressure
notify period = 1000ms
ready: notifyLuxometer
notify period = 1000ms
	pressure = 1014.9 mBar
{ Data: '{"device_uuid":"b0b448b95905","time":"2015-09-14T02:26:34+09:00","payload":{"gyro_X":0,"gyro_Y":0,"gyro_Z":0}}',
  PartitionKey: 'any',
  StreamName: 'bx1data' }
{ SequenceNumber: '49554242436466365897581273724784425716267634993494228994',
  ShardId: 'shardId-000000000000' }
{ Data: '{"device_uuid":"b0b448b95905","time":"2015-09-14T02:26:34+09:00","payload":{"accel_X":0,"accel_Y":0,"accel_Z":0}}',
  PartitionKey: 'any',
  StreamName: 'bx1data' }
{ SequenceNumber: '49554242436466365897581273724785634642087249622668935170',
  ShardId: 'shardId-000000000000' }
	lux = 22.8
{ Data: '{"device_uuid":"b0b448b95905","time":"2015-09-14T02:26:34+09:00","payload":{"magnet_X":0,"magnet_Y":0,"magnet_Z":0}}',
  PartitionKey: 'any',
  StreamName: 'bx1data' }
{ SequenceNumber: '49554242436466365897581273724786843567906864251843641346',
  ShardId: 'shardId-000000000000' }
{ Data: '{"device_uuid":"b0b448b95905","time":"2015-09-14T02:26:35+09:00","payload":{"objectTemperature":24.8125,"ambientTemperature":30.6875}}',
  PartitionKey: 'any',
  StreamName: 'bx1data' }
{ SequenceNumber: '49554242436466365897581273724788052493726478881018347522',
  ShardId: 'shardId-000000000000' }
{ Data: '{"device_uuid":"b0b448b95905","time":"2015-09-14T02:26:35+09:00","payload":{"temperature":30.7171630859375,"humidity":69.9462890625}}',
  PartitionKey: 'any',
  StreamName: 'bx1data' }
{ SequenceNumber: '49554242436466365897581273724789261419546093510193053698',
  ShardId: 'shardId-000000000000' }
{ Data: '{"device_uuid":"b0b448b95905","time":"2015-09-14T02:26:35+09:00","payload":{"pressure":1014.87}}',
  PartitionKey: 'any',
  StreamName: 'bx1data' }
{ SequenceNumber: '49554242436466365897581273724790470345365708139367759874',
  ShardId: 'shardId-000000000000' }
{ Data: '{"device_uuid":"b0b448b95905","time":"2015-09-14T02:26:35+09:00","payload":{"lux":22.83}}',
  PartitionKey: 'any',
  StreamName: 'bx1data' }
{ SequenceNumber: '49554242436466365897581273724791679271185322768542466050',
  ShardId: 'shardId-000000000000' }
{ Data: '{"device_uuid":"b0b448b95905","time":"2015-09-14T02:26:35+09:00","payload":{"accel_X":-0.03607177734375,"accel_Y":-0.0791015625,"accel_Z":0.22222900390625}}',
  PartitionKey: 'any',
  StreamName: 'bx1data' }
{ SequenceNumber: '49554242436466365897581273724792888197004937397717172226',
  ShardId: 'shardId-000000000000' }
{ Data: '{"device_uuid":"b0b448b95905","time":"2015-09-14T02:26:35+09:00","payload":{"gyro_X":-2.7008056640625,"gyro_Y":25.20751953125,"gyro_Z":23.97918701171875}}',
  PartitionKey: 'any',
  StreamName: 'bx1data' }
  .......

あとはこのデータを使ってTableauやKibanaで可視化すればリアルタイムでグラフデータが見られるかと思います。ちなみにLogstashとKibana 4を使ってこのデータを可視化するとこんな感じになります。

bx1tokinesis2

ここらへんのデータの扱い方はこちらの記事等を参考にしてみてください。

Amazon Cognito対応をする

これでも目的は達成しているわけですが、IoTゲートウェイとはいえAPI KEYをむき出しで扱うのはセキュリティ的にあまり宜しくありません。ということでAmazon CognitoのUnAuthを使って認証部分を書き換えてみたいと思います。

まずはマネージメントコンソールからAmazon Cognitoを開き、新しいIdentity Poolを作成します。

bx1tokinesis3

後は出来上がったIdentity Pool IdとUnAuthのRole、アカウントIDを使ってAmazon kinesisのオブジェクト作成前に認証部分を書き換えます。

var AWS = require("aws-sdk");
//AWS.config.region = "ap-northeast-1";
AWS.config.region = "us-east-1";
AWS.config.credentials = new AWS.CognitoIdentityCredentials({
    AccountId: "123456789012",
    RoleArn: "arn:aws:iam::123456789012:role/cognito-unauth",
    IdentityPoolId: 'us-east-1:xxxxxxxxxx-xxxx-xxxx-xxxxx-xxxxxxxxxx'
});

var kinesis = new AWS.Kinesis({
  region: 'ap-northeast-1'
});

気をつけるのはAmazon Cognitoは東京リージョンでは使えないので認証は別のリージョンにて行い、Amazon kinesisのリージョンを明示することでデータは東京リージョンのAmazon kinesisに行くようにしてあげる事です。 これでnodeを叩く時にAPI KEYやAPI SECRETを指定しなくてもAWSにデータを飛ばすことができるようになりました。

まとめ

いかがでしたでしょうか。AWSのSDKに慣れていれば意外と簡単でしたね。ただこれですと7回/秒でAmazon kinesisのAPIが叩かれるのでBX1内にデータを貯めておいて、ひと通りのセンサーデータが溜まったらAPIを叩く、というようなリファクタリングをするとより効率のよいデータ送信ができるかと思います。

参考リンク