[前編] Raspberry Piで取得したセンサーのデータをAWS IoTからKinesis Data Firehoseに流してS3に保存する
以前、以下のブログでRaspberry Piを使って取得した温度、湿度、気圧のデータをAWS IoTに送ってみました。
今回はAWS IoTに送信したデータをメッセージのルーティング
を使ってKinesis Data Firehoseにデータを流し、その後にS3に保存する構成をCDKも使って構築してみましたので、その際のコードや設定などをまとめてみました。
本ブログは前半としてRaspberry Piのセンサーデータ取得とMQTT送信用のコードの解説を行い、後半ではAWS IoT,Kinesis Data Firehose,S3のAWSインフラを構築するAWS CDKによるコードの解説を行います。
システム構成
今回構築したシステム構成の概要図は以下のようになります。
使用するセンサーはBME280というBOSH社製の非常にコンパクトなものです。このセンサーは温度、湿度、気圧を取得できます。
Raspberry Piとセンサーの接続や設定方法
先程掲載したブログにセンサーとRaspberry Piの配線方法
やセンサーをRaspberry Piで使うに当たっての設定
について記載していますので、こちらでは割愛します。
以下のリンク先に説明を記載しています。
実行環境
項目名 | バージョン |
---|---|
mac OS | Ventura 13.2 |
AWS CDK | 2.81.0 |
AWS IoT Device SDK for Python | 1.4.9 |
Raspberry Piのモデル
$ cat /proc/cpuinfo | grep Model Model : Raspberry Pi 3 Model B Plus Rev 1.3
コードの紹介
Raspberry Pi側のコード
Raspberry Piに接続したセンサーからデータを取得してAWS IoTにデータをMQTTで送信するコードとなります。
スイッチサイエンスさんのリポジトリに公開されているセンサーデータを取得するコード
を編集して、取得したデータをMQTTプロトコル通信でAWS IoTに送信できるようにしました。
リポジトリからクローンしてきたコードの内のbme280_sample.py
を編集して使用しました。
編集後のコードが以下のようになります。ハイライトした後半部分がMQTTでAWS IoTにデータを送信するために追加編集した部分となります。
import MqttBasicPubSub import json from smbus2 import SMBus from datetime import datetime bus_number = 1 i2c_address = 0x76 bus = SMBus(bus_number) digT = [] digP = [] digH = [] t_fine = 0.0 def get_calib_param(): calib = [] for i in range(0x88, 0x88 + 24): calib.append(bus.read_byte_data(i2c_address, i)) calib.append(bus.read_byte_data(i2c_address, 0xA1)) for i in range(0xE1, 0xE1 + 7): calib.append(bus.read_byte_data(i2c_address, i)) digT.append((calib[1] << 8) | calib[0]) digT.append((calib[3] << 8) | calib[2]) digT.append((calib[5] << 8) | calib[4]) digP.append((calib[7] << 8) | calib[6]) digP.append((calib[9] << 8) | calib[8]) digP.append((calib[11] << 8) | calib[10]) digP.append((calib[13] << 8) | calib[12]) digP.append((calib[15] << 8) | calib[14]) digP.append((calib[17] << 8) | calib[16]) digP.append((calib[19] << 8) | calib[18]) digP.append((calib[21] << 8) | calib[20]) digP.append((calib[23] << 8) | calib[22]) digH.append(calib[24]) digH.append((calib[26] << 8) | calib[25]) digH.append(calib[27]) digH.append((calib[28] << 4) | (0x0F & calib[29])) digH.append((calib[30] <> 4) & 0x0F)) digH.append(calib[31]) for i in range(1, 2): if digT[i] & 0x8000: digT[i] = (-digT[i] ^ 0xFFFF) + 1 for i in range(1, 8): if digP[i] & 0x8000: digP[i] = (-digP[i] ^ 0xFFFF) + 1 for i in range(0, 6): if digH[i] & 0x8000: digH[i] = (-digH[i] ^ 0xFFFF) + 1 def readData(): data = [] for i in range(0xF7, 0xF7 + 8): data.append(bus.read_byte_data(i2c_address, i)) pres_raw = (data[0] << 12) | (data[1] <> 4) temp_raw = (data[3] << 12) | (data[4] <> 4) hum_raw = (data[6] << 8) | data[7] return { "temperature": compensate_T(temp_raw), "pressure": compensate_P(pres_raw), "humidity": compensate_H(hum_raw), } def compensate_P(adc_P): global t_fine pressure = 0.0 v1 = (t_fine / 2.0) - 64000.0 v2 = (((v1 / 4.0) * (v1 / 4.0)) / 2048) * digP[5] v2 = v2 + ((v1 * digP[4]) * 2.0) v2 = (v2 / 4.0) + (digP[3] * 65536.0) v1 = ( ((digP[2] * (((v1 / 4.0) * (v1 / 4.0)) / 8192)) / 8) + ((digP[1] * v1) / 2.0) ) / 262144 v1 = ((32768 + v1) * digP[0]) / 32768 if v1 == 0: return 0 pressure = ((1048576 - adc_P) - (v2 / 4096)) * 3125 if pressure 100.0: var_h = 100.0 elif var_h < 0.0: var_h = 0.0 return var_h def compensate_T(adc_T): global t_fine v1 = (adc_T / 16384.0 - digT[0] / 1024.0) * digT[1] v2 = ( (adc_T / 131072.0 - digT[0] / 8192.0) * (adc_T / 131072.0 - digT[0] / 8192.0) * digT[2] ) t_fine = v1 + v2 temperature = t_fine / 5120.0 return temperature def compensate_H(adc_H): global t_fine var_h = t_fine - 76800.0 if var_h != 0: var_h = (adc_H - (digH[3] * 64.0 + digH[4] / 16384.0 * var_h)) * ( digH[1] / 65536.0 * ( 1.0 + digH[5] / 67108864.0 * var_h * (1.0 + digH[2] / 67108864.0 * var_h) ) ) else: return 0 var_h = var_h * (1.0 - digH[0] * var_h / 524288.0) if var_h > 100.0: var_h = 100.0 elif var_h < 0.0: var_h = 0.0 # print("湿度 : %6.2f %" % (var_h)) return var_h get_calib_param() # IoTCoreのエンドポイント endPoint = "**************.iot.ap-northeast-1.amazonaws.com" # MQTTプロトコルのポート port = 8883 # ルート認証局の証明書のパス rootCA = "/home/pi/Documents/code/send_temperature_data_to_iot_core/cert/AmazonRootCA1.pem" # 絶対パス # クライアント証明書を作成元の秘密鍵のパス privateKey = "/home/pi/Documents/code/send_temperature_data_to_iot_core/cert/*******-privatekey.pem" # 絶対パス # クライアント証明書のパス certificate = "/home/pi/Documents/code/send_temperature_data_to_iot_core/cert/*******-certificate.pem.crt" # 絶対パス # クライアントID clientId = "directput-data-firehose-from-iotcore" # IoTルールのトピック topic = "send/data/firehose/" # センサーで取得したデータをAWS IoTに送信する関数 def main(): mqtt = MqttBasicPubSub.Mqtt( endPoint, port, rootCA, privateKey, certificate, clientId ) returnValue = readData() now = datetime.now() dateTime = now.strftime("%Y%m%d%H%M%S") roundValueTemp = round(returnValue["temperature"], 2) roundValuePress = round(returnValue["pressure"], 2) roundValueHum = round(returnValue["humidity"], 2) # データ送信 mqtt.publish( topic, payload=json.dumps( { "temperature": roundValueTemp, "pressure": roundValuePress, "humidity": roundValueHum, "timestamp": dateTime, } ), ) # 送ったデータをターミナルに表示 print( "温度:{}、気圧:{}、湿度:{}、送信日:{}".format( roundValueTemp, roundValuePress, roundValueHum, dateTime ) ) main()
コードの解説
ライブラリのインポート
MQTTでデータを送信するためにMqttBasicPubSub
をインポートしています。
import MqttBasicPubSub
AWS IoTの情報や各証明書や鍵のパスを変数に代入
AWS IoT にMQTTでデータを送信するために必要な情報が以下の通りで、それぞれ変数に格納して行きます。
# IoTCoreのエンドポイント endPoint = "**************.iot.ap-northeast-1.amazonaws.com" # MQTTプロトコルのポート port = 8883 # ルート認証局の証明書のパス rootCA = "/home/pi/Documents/code/send_temperature_data_to_iot_core/cert/AmazonRootCA1.pem" # 絶対パス # クライアント証明書作成の秘密鍵のパス privateKey = "/home/pi/Documents/code/send_temperature_data_to_iot_core/cert/***********-privatekey.pem" # 絶対パス # クライアント証明書のパス certificate = "/home/pi/Documents/code/send_temperature_data_to_iot_core/cert/***********-certificate.pem.crt" # 絶対パス # クライアントID clientId = "directput-data-firehose-from-iotcore" # IoTルールのトピック topic = "send/data/firehose/"
- endPoint
- AWS IoT のアカウント固有のエンドポイントになります。
- AWS IoTのマネジメントコンソールの
設定
の画面から確認できます。
- port
- MQTTプロトコルのポート番号になります。
- rootCA
- AWSが発行するルート証明書です。Raspberry Piから見て、接続先(AWS)が正しいかを判断するための認証に使う証明書になります。
- こちらは以下のURLからcurlで取得できます。
wget https://www.amazontrust.com/repository/AmazonRootCA1.pem
- wgetをインストールしていない場合は
$ brew install wget
などで適宜インスト-ルしてください。 - こちらの保存先を絶対パスで指定します。
- privateKey
- 今回はCSRによりクライアント側で発行した秘密鍵を使用して証明書を作成します。AWS IoTとの接続で使うクライアント証明書は、1-Click証明書作成によりAmazonルート認証局 (CA) によって署名された証明書が簡単に作成できます。しかしながらAWSからデバイスへの秘密鍵のネットワークを通した受け渡しがセキュリティ上の懸念となる場合は、今回のようにクライアント側で作成した秘密鍵を使用する方法もあります。後編で鍵の作成方法を解説します。
- こちらも絶対パスで指定します。
- certificate
- AWSへのリソースのデプロイのタイミングで作成される
クライアント証明書
のパスです。後編で作成と取得の方法を解説します。 - こちらも絶対パスで指定します。
- AWSへのリソースのデプロイのタイミングで作成される
- clientId
- MQTTでデータを送信する際に必要になるクライアントIDになります。iotポリシーを定義する際にiot:Connect アクションに指定するリソース情報に clientId を記載することになります。後編でCDKのコードの解説の際にも登場します。
- topic
- MQTTでデータを送信する際に必要になるトピックになります。iotポリシーを定義する際にiot:Publish アクションに指定するリソース情報に topic を記載します。後編でCDKのコードで実際のポリシーの解説の際に説明します。
MQTTでデータ送信のためのコード
def main(): mqtt = MqttBasicPubSub.Mqtt( endPoint, port, rootCA, privateKey, certificate, clientId ) returnValue = readData() now = datetime.now() dateTime = now.strftime("%Y%m%d%H%M%S") roundValueTemp = round(returnValue["temperature"], 2) roundValuePress = round(returnValue["pressure"], 2) roundValueHum = round(returnValue["humidity"], 2) # データ送信 mqtt.publish( topic, payload=json.dumps( { "temperature": roundValueTemp, "pressure": roundValuePress, "humidity": roundValueHum, "timestamp": dateTime, } ), ) # 送ったデータをターミナルに表示 print( "温度:{}、気圧:{}、湿度:{}、送信日:{}".format( roundValueTemp, roundValuePress, roundValueHum, dateTime ) )
- 代入したAWS IoTの情報やパスを使って、
MqttBasicPubSub
モジュールのMqttクラス
をインスタンス化
mqtt = MqttBasicPubSub.Mqtt(endPoint, port, rootCA, privateKey, certificate, clientId)
- センサーから取得したデータをJSON形式でMQTTでAWS IoTに送信
mqtt.publish( topic, payload=json.dumps( { "temperature": roundValueTemp, "pressure": roundValuePress, "humidity": roundValueHum, "timestamp": dateTime, } ), )
先程インスタンス化したmqttのpublish
メソッドを使用してセンサーで取得したデータをAWS Iotに送信する部分コードとなります。データはJSON形式にしたデータをpayloadに入れて送信します。
前編のRaspberry Pi側のコード解説は以上です。
後編で紹介する内容
次回の後編では以下を紹介いたします。
- CDKによるAWS IoTのルール作成とKinesis Data Firehoseへのデータ流し込みのコード解説
- 動作確認