この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
こんにちは、せーのです。今日はBX1を使って取得したデータをクラウドに飛ばしてみたいと思います。
おさらい
まずは今までのおさらいです。初期設定、ネットワーク設定、センサーデータの取得方法は以下の記事を参考にしてください。
- OpenBlocks IoT BX1を触ってみた
- OpenBlocks IoT BX1:コンソールによるネットワーク設定と初期化の方法
- OpenBlocks IoT BX1でTI製センサータグ(CC2650)のセンサーデータ(10種類)を取得する
クラウドに飛ばしてみよう
さて、今回は前回の応用編です。前回書いたセンサーデータを取得するソースを改変してデータをAmazon Kinesisにputしてみたいと思います。またセキュリティを考えCognitoにて認証してみたいと思います。
やってみる
Kinesisストリームを作る
それではさっそくやってみます。まずはKinesisのストリームを作ります。マネージメントコンソールのAmazon Kinesisの画面からストリームを作成します。 センサーデータは大抵の場合1kbにも満たない小さいものです。今回はファンクションごとにputするので秒間7-8発叩かれるのでそれを計算してみるとシャードは1つでいいようなのでそれで設定致します。
SDKよりAmazon Kinesisオブジェクトを作成する
次にコードを書いていきます。Amazon Kinesisのオブジェクトを作成します。
var AWS = require("aws-sdk");
AWS.config.region = "ap-northeast-1";
var kinesis = new AWS.Kinesis({
region: 'ap-northeast-1'
});
console.info("Using Amazon Kinesis Stream: " + mySTREAM);
Amazon Kinesisのput部分を作成する
前回のコードでは取得したデータはconsole.logでただ表示しただけでした。今回はそこの部分を改変してAmazon kinesisにputするAPIを叩いてみます。例えばジャイロスコープであればこのような感じです。
var moment = require("moment-timezone");
function ti_gyroscope(conned_obj) {
var period = 1000; // ms
conned_obj.enableGyroscope(function() {
conned_obj.setGyroscopePeriod(period, function() {
conned_obj.notifyGyroscope(function() {
console.info("ready: notifyGyroscope");
console.info("notify period = " + period + "ms");
conned_obj.on('gyroscopeChange', function(x, y, z) {
var data = {
device_uuid: conned_obj.uuid,
time: moment().tz("Asia/Tokyo").format(),
payload: {gyro_X: x, gyro_Y: y, gyro_Z: z}
};
var kparams = {Data: JSON.stringify(data), PartitionKey: "any", StreamName: mySTREAM};
kinesis.putRecord(kparams, function(err, data) {
if (err) {
console.log(err, err.stack);
} else {
console.log(kparams);
console.log(data);
}
});
});
});
});
});
}
実際のジャイロスコープのセンサーデータの他にもセンサータグのUUIDを識別子として、またタイムスタンプをつけて見ました。これを全センサーデータ分書き換えるとこうなります。
/*
* $ npm install sandeepmistry/node-sensortag aws-sdk moment-timezone
* $ TI_UUID=your_ti_sensor_tag_UUID AK_STREAM=your_kinesis_stream node this_file.js
*/
var mySTREAM = process.env["AK_STREAM"] || "hyakuyo";
var myAddress = process.env["TI_ADDRESS"] || "YOUR_TI_sensor_tag_ADDRESS";
var AWS = require("aws-sdk");
AWS.config.region = "ap-northeast-1";
var kinesis = new AWS.Kinesis({
region: 'ap-northeast-1'
});
console.info("Using AWS Kinesis Stream: " + mySTREAM);
var moment = require("moment-timezone");
function ti_simple_key(conned_obj) {
conned_obj.notifySimpleKey(function() {
console.info("ready: notifySimpleKey");
console.info("/* left right (true = pushed, false = released) */");
conned_obj.on("simpleKeyChange", function(left, right) { /* run per pushed button */
var data = {
device_uuid: conned_obj.uuid,
time: moment().tz("Asia/Tokyo").format(),
payload: {L: left, R: right}
};
var kparams = {Data: JSON.stringify(data), PartitionKey: "any", StreamName: mySTREAM};
kinesis.putRecord(kparams, function(err, data) {
if (err) {
console.log(err, err.stack);
} else {
console.log(kparams);
console.log(data);
}
});
});
});
}
function ti_gyroscope(conned_obj) {
var period = 1000; // ms
conned_obj.enableGyroscope(function() {
conned_obj.setGyroscopePeriod(period, function() {
conned_obj.notifyGyroscope(function() {
console.info("ready: notifyGyroscope");
console.info("notify period = " + period + "ms");
conned_obj.on('gyroscopeChange', function(x, y, z) {
var data = {
device_uuid: conned_obj.uuid,
time: moment().tz("Asia/Tokyo").format(),
payload: {gyro_X: x, gyro_Y: y, gyro_Z: z}
};
var kparams = {Data: JSON.stringify(data), PartitionKey: "any", StreamName: mySTREAM};
kinesis.putRecord(kparams, function(err, data) {
if (err) {
console.log(err, err.stack);
} else {
console.log(kparams);
console.log(data);
}
});
});
});
});
});
}
function ti_ir_temperature(conned_obj) {
var period = 1000; // ms
conned_obj.enableIrTemperature(function() {
conned_obj.setIrTemperaturePeriod(period, function() {
conned_obj.notifyIrTemperature(function() {
console.info("ready: notifyIrTemperature");
console.info("notify period = " + period + "ms");
conned_obj.on('irTemperatureChange', function(objectTemperature, ambientTemperature) {
var data = {
device_uuid: conned_obj.uuid,
time: moment().tz("Asia/Tokyo").format(),
payload: {objectTemperature: objectTemperature, ambientTemperature: ambientTemperature}
};
var kparams = {Data: JSON.stringify(data), PartitionKey: "any", StreamName: mySTREAM};
kinesis.putRecord(kparams, function(err, data) {
if (err) {
console.log(err, err.stack);
} else {
console.log(kparams);
console.log(data);
}
});
});
});
});
});
}
function ti_accelerometer(conned_obj) {
var period = 1000; // ms
conned_obj.enableAccelerometer(function() {
conned_obj.setAccelerometerPeriod(period, function() {
conned_obj.notifyAccelerometer(function() {
console.info("ready: notifyAccelerometer");
console.info("notify period = " + period + "ms");
conned_obj.on('accelerometerChange', function(x, y, z) {
var data = {
device_uuid: conned_obj.uuid,
time: moment().tz("Asia/Tokyo").format(),
payload: {accel_X: x, accel_Y: y, accel_Z: z}
};
var kparams = {Data: JSON.stringify(data), PartitionKey: "any", StreamName: mySTREAM};
kinesis.putRecord(kparams, function(err, data) {
if (err) {
console.log(err, err.stack);
} else {
console.log(kparams);
console.log(data);
}
});
});
});
});
});
}
function ti_humidity(conned_obj) {
var period = 1000; // ms
conned_obj.enableHumidity(function() {
conned_obj.setHumidityPeriod(period, function() {
conned_obj.notifyHumidity(function() {
console.info("ready: notifyHumidity");
console.info("notify period = " + period + "ms");
conned_obj.on('humidityChange', function(temperature, humidity) {
var data = {
device_uuid: conned_obj.uuid,
time: moment().tz("Asia/Tokyo").format(),
payload: {temperature: temperature, humidity: humidity}
};
var kparams = {Data: JSON.stringify(data), PartitionKey: "any", StreamName: mySTREAM};
kinesis.putRecord(kparams, function(err, data) {
if (err) {
console.log(err, err.stack);
} else {
console.log(kparams);
console.log(data);
}
});
});
});
});
});
}
function ti_magnetometer(conned_obj) {
var period = 1000; // ms
conned_obj.enableMagnetometer(function() {
conned_obj.setMagnetometerPeriod(period, function() {
conned_obj.notifyMagnetometer(function() {
console.info("ready: notifyMagnetometer");
console.info("notify period = " + period + "ms");
conned_obj.on('magnetometerChange', function(x, y, z) {
var data = {
device_uuid: conned_obj.uuid,
time: moment().tz("Asia/Tokyo").format(),
payload: {magnet_X: x, magnet_Y: y, magnet_Z: z}
};
var kparams = {Data: JSON.stringify(data), PartitionKey: "any", StreamName: mySTREAM};
kinesis.putRecord(kparams, function(err, data) {
if (err) {
console.log(err, err.stack);
} else {
console.log(kparams);
console.log(data);
}
});
});
});
});
});
}
function ti_barometric_pressure(conned_obj) {
var period = 1000; // ms
conned_obj.enableBarometricPressure(function() {
conned_obj.setBarometricPressurePeriod(period, function() {
conned_obj.notifyBarometricPressure(function() {
console.info("ready: notifyBarometricPressure");
console.info("notify period = " + period + "ms");
conned_obj.on('barometricPressureChange', function(pressure) {
var data = {
device_uuid: conned_obj.uuid,
time: moment().tz("Asia/Tokyo").format(),
payload: {pressure: pressure}
};
var kparams = {Data: JSON.stringify(data), PartitionKey: "any", StreamName: mySTREAM};
kinesis.putRecord(kparams, function(err, data) {
if (err) {
console.log(err, err.stack);
} else {
console.log(kparams);
console.log(data);
}
});
console.log('\tpressure = %d mBar', pressure.toFixed(1));
});
});
});
});
}
function ti_luxometer(conned_obj) {
var period = 1000; // ms
conned_obj.enableLuxometer(function() {
conned_obj.setLuxometerPeriod(period, function() {
conned_obj.notifyLuxometer(function() {
console.info("ready: notifyLuxometer");
console.info("notify period = " + period + "ms");
conned_obj.on('luxometerChange', function(lux) {
var data = {
device_uuid: conned_obj.uuid,
time: moment().tz("Asia/Tokyo").format(),
payload: {lux: lux}
};
var kparams = {Data: JSON.stringify(data), PartitionKey: "any", StreamName: mySTREAM};
kinesis.putRecord(kparams, function(err, data) {
if (err) {
console.log(err, err.stack);
} else {
console.log(kparams);
console.log(data);
}
});
console.log('\tlux = %d', lux.toFixed(1));
});
});
});
});
}
var SensorTag = require('sensortag');
console.info(">> STOP: Ctrl+C or SensorTag power off");
console.info("start");
console.info("waiting for connect from " + myAddress);
SensorTag.discoverByAddress(myAddress, function(sensorTag) {
console.info("found: connect and setup ... (waiting 5~10 seconds)");
sensorTag.connectAndSetup(function() {
sensorTag.readDeviceName(function(error, deviceName) {
console.info("connect: " + deviceName);
ti_simple_key(sensorTag);
ti_gyroscope(sensorTag);
ti_ir_temperature(sensorTag);
ti_accelerometer(sensorTag);
ti_humidity(sensorTag);
ti_magnetometer(sensorTag);
ti_barometric_pressure(sensorTag);
ti_luxometer(sensorTag);
});
});
/* In case of SensorTag PowerOff or out of range when fired `onDisconnect` */
sensorTag.on("disconnect", function() {
console.info("disconnect and exit");
process.exit(0);
});
});
後はaws configureコマンドでAPI KEY, API SECRET, regionを登録するか、パラメータで指定しておけばkinesisにputされます。
AWS_ACCESS_KEY_ID=XXXXXXXXXXXXXX AWS_SECRET_ACCESS_KEY=XXXXXXXXXXXXXXX TI_ADDRESS=B0:B4:48:B9:59:05 AK_STREAM=bx1data node ti_all.js
Using AWS Kinesis Stream: bx1data
>> STOP: Ctrl+C or SensorTag power off
start
waiting for connect from B0:B4:48:B9:59:05
found: connect and setup ... (waiting 5~10 seconds)
connect: SensorTag 2.0
ready: notifySimpleKey
/* left right (true = pushed, false = released) */
ready: notifyAccelerometer
notify period = 1000ms
ready: notifyMagnetometer
notify period = 1000ms
ready: notifyGyroscope
notify period = 1000ms
ready: notifyIrTemperature
notify period = 1000ms
ready: notifyHumidity
notify period = 1000ms
ready: notifyBarometricPressure
notify period = 1000ms
ready: notifyLuxometer
notify period = 1000ms
pressure = 1014.9 mBar
{ Data: '{"device_uuid":"b0b448b95905","time":"2015-09-14T02:26:34+09:00","payload":{"gyro_X":0,"gyro_Y":0,"gyro_Z":0}}',
PartitionKey: 'any',
StreamName: 'bx1data' }
{ SequenceNumber: '49554242436466365897581273724784425716267634993494228994',
ShardId: 'shardId-000000000000' }
{ Data: '{"device_uuid":"b0b448b95905","time":"2015-09-14T02:26:34+09:00","payload":{"accel_X":0,"accel_Y":0,"accel_Z":0}}',
PartitionKey: 'any',
StreamName: 'bx1data' }
{ SequenceNumber: '49554242436466365897581273724785634642087249622668935170',
ShardId: 'shardId-000000000000' }
lux = 22.8
{ Data: '{"device_uuid":"b0b448b95905","time":"2015-09-14T02:26:34+09:00","payload":{"magnet_X":0,"magnet_Y":0,"magnet_Z":0}}',
PartitionKey: 'any',
StreamName: 'bx1data' }
{ SequenceNumber: '49554242436466365897581273724786843567906864251843641346',
ShardId: 'shardId-000000000000' }
{ Data: '{"device_uuid":"b0b448b95905","time":"2015-09-14T02:26:35+09:00","payload":{"objectTemperature":24.8125,"ambientTemperature":30.6875}}',
PartitionKey: 'any',
StreamName: 'bx1data' }
{ SequenceNumber: '49554242436466365897581273724788052493726478881018347522',
ShardId: 'shardId-000000000000' }
{ Data: '{"device_uuid":"b0b448b95905","time":"2015-09-14T02:26:35+09:00","payload":{"temperature":30.7171630859375,"humidity":69.9462890625}}',
PartitionKey: 'any',
StreamName: 'bx1data' }
{ SequenceNumber: '49554242436466365897581273724789261419546093510193053698',
ShardId: 'shardId-000000000000' }
{ Data: '{"device_uuid":"b0b448b95905","time":"2015-09-14T02:26:35+09:00","payload":{"pressure":1014.87}}',
PartitionKey: 'any',
StreamName: 'bx1data' }
{ SequenceNumber: '49554242436466365897581273724790470345365708139367759874',
ShardId: 'shardId-000000000000' }
{ Data: '{"device_uuid":"b0b448b95905","time":"2015-09-14T02:26:35+09:00","payload":{"lux":22.83}}',
PartitionKey: 'any',
StreamName: 'bx1data' }
{ SequenceNumber: '49554242436466365897581273724791679271185322768542466050',
ShardId: 'shardId-000000000000' }
{ Data: '{"device_uuid":"b0b448b95905","time":"2015-09-14T02:26:35+09:00","payload":{"accel_X":-0.03607177734375,"accel_Y":-0.0791015625,"accel_Z":0.22222900390625}}',
PartitionKey: 'any',
StreamName: 'bx1data' }
{ SequenceNumber: '49554242436466365897581273724792888197004937397717172226',
ShardId: 'shardId-000000000000' }
{ Data: '{"device_uuid":"b0b448b95905","time":"2015-09-14T02:26:35+09:00","payload":{"gyro_X":-2.7008056640625,"gyro_Y":25.20751953125,"gyro_Z":23.97918701171875}}',
PartitionKey: 'any',
StreamName: 'bx1data' }
.......
あとはこのデータを使ってTableauやKibanaで可視化すればリアルタイムでグラフデータが見られるかと思います。ちなみにLogstashとKibana 4を使ってこのデータを可視化するとこんな感じになります。
ここらへんのデータの扱い方はこちらの記事等を参考にしてみてください。
Amazon Cognito対応をする
これでも目的は達成しているわけですが、IoTゲートウェイとはいえAPI KEYをむき出しで扱うのはセキュリティ的にあまり宜しくありません。ということでAmazon CognitoのUnAuthを使って認証部分を書き換えてみたいと思います。
まずはマネージメントコンソールからAmazon Cognitoを開き、新しいIdentity Poolを作成します。
後は出来上がったIdentity Pool IdとUnAuthのRole、アカウントIDを使ってAmazon kinesisのオブジェクト作成前に認証部分を書き換えます。
var AWS = require("aws-sdk");
//AWS.config.region = "ap-northeast-1";
AWS.config.region = "us-east-1";
AWS.config.credentials = new AWS.CognitoIdentityCredentials({
AccountId: "123456789012",
RoleArn: "arn:aws:iam::123456789012:role/cognito-unauth",
IdentityPoolId: 'us-east-1:xxxxxxxxxx-xxxx-xxxx-xxxxx-xxxxxxxxxx'
});
var kinesis = new AWS.Kinesis({
region: 'ap-northeast-1'
});
気をつけるのはAmazon Cognitoは東京リージョンでは使えないので認証は別のリージョンにて行い、Amazon kinesisのリージョンを明示することでデータは東京リージョンのAmazon kinesisに行くようにしてあげる事です。 これでnodeを叩く時にAPI KEYやAPI SECRETを指定しなくてもAWSにデータを飛ばすことができるようになりました。
まとめ
いかがでしたでしょうか。AWSのSDKに慣れていれば意外と簡単でしたね。ただこれですと7回/秒でAmazon kinesisのAPIが叩かれるのでBX1内にデータを貯めておいて、ひと通りのセンサーデータが溜まったらAPIを叩く、というようなリファクタリングをするとより効率のよいデータ送信ができるかと思います。