[前編] Raspberry Piで取得したセンサーのデータをAWS IoTからKinesis Data Firehoseに流してS3に保存する

2023.05.25

以前、以下のブログでRaspberry Piを使って取得した温度、湿度、気圧のデータをAWS IoTに送ってみました。

今回はAWS IoTに送信したデータをメッセージのルーティングを使ってKinesis Data Firehoseにデータを流し、その後にS3に保存する構成をCDKも使って構築してみましたので、その際のコードや設定などをまとめてみました。

本ブログは前半としてRaspberry Piのセンサーデータ取得とMQTT送信用のコードの解説を行い、後半ではAWS IoT,Kinesis Data Firehose,S3のAWSインフラを構築するAWS CDKによるコードの解説を行います。

システム構成

今回構築したシステム構成の概要図は以下のようになります。


使用するセンサーはBME280というBOSH社製の非常にコンパクトなものです。このセンサーは温度、湿度、気圧を取得できます。

Raspberry Piとセンサーの接続や設定方法

先程掲載したブログにセンサーとRaspberry Piの配線方法センサーをRaspberry Piで使うに当たっての設定について記載していますので、こちらでは割愛します。 以下のリンク先に説明を記載しています。

実行環境

項目名 バージョン
mac OS Ventura 13.2
AWS CDK 2.81.0
AWS IoT Device SDK for Python 1.4.9

Raspberry Piのモデル

$ cat /proc/cpuinfo | grep Model
Model  : Raspberry Pi 3 Model B Plus Rev 1.3

コードの紹介

Raspberry Pi側のコード

Raspberry Piに接続したセンサーからデータを取得してAWS IoTにデータをMQTTで送信するコードとなります。
スイッチサイエンスさんのリポジトリに公開されているセンサーデータを取得するコードを編集して、取得したデータをMQTTプロトコル通信でAWS IoTに送信できるようにしました。

リポジトリからクローンしてきたコードの内のbme280_sample.pyを編集して使用しました。

編集後のコードが以下のようになります。ハイライトした後半部分がMQTTでAWS IoTにデータを送信するために追加編集した部分となります。

  import MqttBasicPubSub
  import json
  from smbus2 import SMBus
  from datetime import datetime


  bus_number = 1
  i2c_address = 0x76

  bus = SMBus(bus_number)

  digT = []
  digP = []
  digH = []

  t_fine = 0.0


  def get_calib_param():
      calib = []

      for i in range(0x88, 0x88 + 24):
          calib.append(bus.read_byte_data(i2c_address, i))
      calib.append(bus.read_byte_data(i2c_address, 0xA1))
      for i in range(0xE1, 0xE1 + 7):
          calib.append(bus.read_byte_data(i2c_address, i))

      digT.append((calib[1] << 8) | calib[0])
      digT.append((calib[3] << 8) | calib[2])
      digT.append((calib[5] << 8) | calib[4])
      digP.append((calib[7] << 8) | calib[6])
      digP.append((calib[9] << 8) | calib[8])
      digP.append((calib[11] << 8) | calib[10])
      digP.append((calib[13] << 8) | calib[12])
      digP.append((calib[15] << 8) | calib[14])
      digP.append((calib[17] << 8) | calib[16])
      digP.append((calib[19] << 8) | calib[18])
      digP.append((calib[21] << 8) | calib[20])
      digP.append((calib[23] << 8) | calib[22])
      digH.append(calib[24])
      digH.append((calib[26] << 8) | calib[25])
      digH.append(calib[27])
      digH.append((calib[28] << 4) | (0x0F & calib[29]))
      digH.append((calib[30] <> 4) & 0x0F))
      digH.append(calib[31])

      for i in range(1, 2):
          if digT[i] & 0x8000:
              digT[i] = (-digT[i] ^ 0xFFFF) + 1

      for i in range(1, 8):
          if digP[i] & 0x8000:
              digP[i] = (-digP[i] ^ 0xFFFF) + 1

      for i in range(0, 6):
          if digH[i] & 0x8000:
              digH[i] = (-digH[i] ^ 0xFFFF) + 1


  def readData():
      data = []
      for i in range(0xF7, 0xF7 + 8):
          data.append(bus.read_byte_data(i2c_address, i))
      pres_raw = (data[0] << 12) | (data[1] <> 4)
      temp_raw = (data[3] << 12) | (data[4] <> 4)
      hum_raw = (data[6] << 8) | data[7]

      return {
          "temperature": compensate_T(temp_raw),
          "pressure": compensate_P(pres_raw),
          "humidity": compensate_H(hum_raw),
      }


  def compensate_P(adc_P):
      global t_fine
      pressure = 0.0

      v1 = (t_fine / 2.0) - 64000.0
      v2 = (((v1 / 4.0) * (v1 / 4.0)) / 2048) * digP[5]
      v2 = v2 + ((v1 * digP[4]) * 2.0)
      v2 = (v2 / 4.0) + (digP[3] * 65536.0)
      v1 = (
          ((digP[2] * (((v1 / 4.0) * (v1 / 4.0)) / 8192)) / 8) + ((digP[1] * v1) / 2.0)
      ) / 262144
      v1 = ((32768 + v1) * digP[0]) / 32768

      if v1 == 0:
          return 0
      pressure = ((1048576 - adc_P) - (v2 / 4096)) * 3125
      if pressure  100.0:
          var_h = 100.0
      elif var_h < 0.0:
          var_h = 0.0
      return var_h


  def compensate_T(adc_T):
      global t_fine
      v1 = (adc_T / 16384.0 - digT[0] / 1024.0) * digT[1]
      v2 = (
          (adc_T / 131072.0 - digT[0] / 8192.0)
          * (adc_T / 131072.0 - digT[0] / 8192.0)
          * digT[2]
      )
      t_fine = v1 + v2
      temperature = t_fine / 5120.0
      return temperature


  def compensate_H(adc_H):
      global t_fine
      var_h = t_fine - 76800.0
      if var_h != 0:
          var_h = (adc_H - (digH[3] * 64.0 + digH[4] / 16384.0 * var_h)) * (
              digH[1]
              / 65536.0
              * (
                  1.0
                  + digH[5] / 67108864.0 * var_h * (1.0 + digH[2] / 67108864.0 * var_h)
              )
          )
      else:
          return 0
      var_h = var_h * (1.0 - digH[0] * var_h / 524288.0)
      if var_h > 100.0:
          var_h = 100.0
      elif var_h < 0.0:
          var_h = 0.0
      # print("湿度 : %6.2f %" % (var_h))
      return var_h


  get_calib_param()

  # IoTCoreのエンドポイント
  endPoint = "**************.iot.ap-northeast-1.amazonaws.com"
  # MQTTプロトコルのポート
  port = 8883
  # ルート認証局の証明書のパス
  rootCA = "/home/pi/Documents/code/send_temperature_data_to_iot_core/cert/AmazonRootCA1.pem"  # 絶対パス
  # クライアント証明書を作成元の秘密鍵のパス
  privateKey = "/home/pi/Documents/code/send_temperature_data_to_iot_core/cert/*******-privatekey.pem"  # 絶対パス
  # クライアント証明書のパス
  certificate = "/home/pi/Documents/code/send_temperature_data_to_iot_core/cert/*******-certificate.pem.crt"  # 絶対パス
  # クライアントID
  clientId = "directput-data-firehose-from-iotcore"
  # IoTルールのトピック
  topic = "send/data/firehose/"

  # センサーで取得したデータをAWS IoTに送信する関数
  def main():
      mqtt = MqttBasicPubSub.Mqtt(
          endPoint, port, rootCA, privateKey, certificate, clientId
      )

      returnValue = readData()
      now = datetime.now()
      dateTime = now.strftime("%Y%m%d%H%M%S")

      roundValueTemp = round(returnValue["temperature"], 2)
      roundValuePress = round(returnValue["pressure"], 2)
      roundValueHum = round(returnValue["humidity"], 2)

      # データ送信
      mqtt.publish(
          topic,
          payload=json.dumps(
              {
                  "temperature": roundValueTemp,
                  "pressure": roundValuePress,
                  "humidity": roundValueHum,
                  "timestamp": dateTime,
              }
          ),
      )

      # 送ったデータをターミナルに表示
      print(
          "温度:{}、気圧:{}、湿度:{}、送信日:{}".format(
              roundValueTemp, roundValuePress, roundValueHum, dateTime
          )
      )


  main()

コードの解説

ライブラリのインポート

MQTTでデータを送信するためにMqttBasicPubSubをインポートしています。

import MqttBasicPubSub

AWS IoTの情報や各証明書や鍵のパスを変数に代入

AWS IoT にMQTTでデータを送信するために必要な情報が以下の通りで、それぞれ変数に格納して行きます。

  # IoTCoreのエンドポイント
  endPoint = "**************.iot.ap-northeast-1.amazonaws.com"
  # MQTTプロトコルのポート
  port = 8883
  # ルート認証局の証明書のパス
  rootCA = "/home/pi/Documents/code/send_temperature_data_to_iot_core/cert/AmazonRootCA1.pem"  # 絶対パス
  # クライアント証明書作成の秘密鍵のパス
  privateKey = "/home/pi/Documents/code/send_temperature_data_to_iot_core/cert/***********-privatekey.pem"  # 絶対パス
  # クライアント証明書のパス
  certificate = "/home/pi/Documents/code/send_temperature_data_to_iot_core/cert/***********-certificate.pem.crt"  # 絶対パス
  # クライアントID
  clientId = "directput-data-firehose-from-iotcore"
  # IoTルールのトピック
  topic = "send/data/firehose/"
  • endPoint
    • AWS IoT のアカウント固有のエンドポイントになります。
    • AWS IoTのマネジメントコンソールの設定の画面から確認できます。
  • port
    • MQTTプロトコルのポート番号になります。
  • rootCA
    • AWSが発行するルート証明書です。Raspberry Piから見て、接続先(AWS)が正しいかを判断するための認証に使う証明書になります。
    • こちらは以下のURLからcurlで取得できます。
    • wget https://www.amazontrust.com/repository/AmazonRootCA1.pem
    • wgetをインストールしていない場合は$ brew install wgetなどで適宜インスト-ルしてください。
    • こちらの保存先を絶対パスで指定します。
  • privateKey
    • 今回はCSRによりクライアント側で発行した秘密鍵を使用して証明書を作成します。AWS IoTとの接続で使うクライアント証明書は、1-Click証明書作成によりAmazonルート認証局 (CA) によって署名された証明書が簡単に作成できます。しかしながらAWSからデバイスへの秘密鍵のネットワークを通した受け渡しがセキュリティ上の懸念となる場合は、今回のようにクライアント側で作成した秘密鍵を使用する方法もあります。後編で鍵の作成方法を解説します。
    • こちらも絶対パスで指定します。
  • certificate
    • AWSへのリソースのデプロイのタイミングで作成されるクライアント証明書のパスです。後編で作成と取得の方法を解説します。
    • こちらも絶対パスで指定します。
  • clientId
    • MQTTでデータを送信する際に必要になるクライアントIDになります。iotポリシーを定義する際にiot:Connect アクションに指定するリソース情報に clientId を記載することになります。後編でCDKのコードの解説の際にも登場します。
  • topic
    • MQTTでデータを送信する際に必要になるトピックになります。iotポリシーを定義する際にiot:Publish アクションに指定するリソース情報に topic を記載します。後編でCDKのコードで実際のポリシーの解説の際に説明します。

MQTTでデータ送信のためのコード

  def main():
      mqtt = MqttBasicPubSub.Mqtt(
          endPoint, port, rootCA, privateKey, certificate, clientId
      )

      returnValue = readData()
      now = datetime.now()
      dateTime = now.strftime("%Y%m%d%H%M%S")

      roundValueTemp = round(returnValue["temperature"], 2)
      roundValuePress = round(returnValue["pressure"], 2)
      roundValueHum = round(returnValue["humidity"], 2)

      # データ送信
      mqtt.publish(
          topic,
          payload=json.dumps(
              {
                  "temperature": roundValueTemp,
                  "pressure": roundValuePress,
                  "humidity": roundValueHum,
                  "timestamp": dateTime,
              }
          ),
      )

      # 送ったデータをターミナルに表示
      print(
          "温度:{}、気圧:{}、湿度:{}、送信日:{}".format(
              roundValueTemp, roundValuePress, roundValueHum, dateTime
          )
      )
  • 代入したAWS IoTの情報やパスを使って、MqttBasicPubSubモジュールのMqttクラスをインスタンス化
mqtt = MqttBasicPubSub.Mqtt(endPoint, port, rootCA, privateKey, certificate, clientId)
  • センサーから取得したデータをJSON形式でMQTTでAWS IoTに送信
mqtt.publish(
          topic,
          payload=json.dumps(
              {
                  "temperature": roundValueTemp,
                  "pressure": roundValuePress,
                  "humidity": roundValueHum,
                  "timestamp": dateTime,
              }
          ),
      )

先程インスタンス化したmqttのpublishメソッドを使用してセンサーで取得したデータをAWS Iotに送信する部分コードとなります。データはJSON形式にしたデータをpayloadに入れて送信します。

前編のRaspberry Pi側のコード解説は以上です。

後編で紹介する内容

次回の後編では以下を紹介いたします。

  • CDKによるAWS IoTのルール作成とKinesis Data Firehoseへのデータ流し込みのコード解説
  • 動作確認

以上