【Contacts IoT】コンタクトレンズの着脱時間をデータベースに保存する

はじめに

テントの中から失礼します、CX事業本部のてんとタカハシです!

コンタクトレンズ使用者を眼障害から守ることを目的として、コンタクトレンズの着脱管理を行う IoT プロジェクトを進めています。 プロジェクトの概要については、下記の記事をご参照ください。

本プロジェクトに関する記事の一覧は下記のページにまとまっています。

概要

下記の記事で作成したデバイス(コンタクトレンズの着脱検知用ケース)を AWS IoT Core に繋げて、着脱した時間を DynamoDB に登録します。

今回の記事では、バックエンド側の環境・実装が既に作成されていることを前提として、デバイス側の実装を記載していきます。

システム構成図

デバイスから MQTT で AWS IoT Core にリクエストを投げると、Lambda を経由して DynamoDB に着脱時間が記録されます。Lambda の実装では、DynamoDB 上で既に着用開始時間が記録されているレコードが存在するかの確認を行い、既存のレコードを上書きする or 新規でレコードを作成します。

今回の記事では、上記の図で赤く囲った部分の実装 & デバイスを AWS IoT Core に繋ぐために必要なセットアップ部分の実装を記載していきます。

環境

Raspberry Pi 上の環境は下記の通りです。

$ lsb_release -a
No LSB modules are available.
Distributor ID:	Raspbian
Description:	Raspbian GNU/Linux 10 (buster)
Release:	10
Codename:	buster

$ python3 --version
Python 3.7.3

$ pip3 --version
pip 18.1 from /usr/lib/python3/dist-packages/pip (python 3.7)

$ aws --version
aws-cli/1.18.223 Python/3.7.3 Linux/5.10.17-v7+ botocore/1.19.63

$ jq --version
jq-1.5-1-a5b5cbe

デバイスのセットアップ

実装

下記を順に実行して、デバイスを AWS IoT Core に接続できるようにします。

  • Amazon ルート CA 証明書ファイルをダウンロードする
  • Amazon ルート認証局によって署名されたクライアント証明書、鍵を作成する
  • クライアント証明書にポリシーを紐付ける
  • モノを作成する
  • モノにクライアント証明書を紐付ける
  • ATS エンドポイントを取得する
  • モノの名前と ATS エンドポイントを JSON で吐く

device-setup.sh

#!/bin/bash

set -euo pipefail

PROJECT_NAME="contact-iot-device"
IOT_POLICY_NAME="${PROJECT_NAME}-iot-policy"
IOT_THING_NAME="${PROJECT_NAME}-iot-thing-$(cat /proc/sys/kernel/random/uuid)"
CERTIFICATE_DIR="certificates"
ROOT_CA_URL="https://www.amazontrust.com/repository/AmazonRootCA1.pem"
PROFILE_PATH="${HOME}/.profile"

mkdir -p ${CERTIFICATE_DIR}

curl -o "${CERTIFICATE_DIR}/root.pem" ${ROOT_CA_URL} > /dev/null 2>&1

CERTIFICATE_ARN=$(aws iot create-keys-and-certificate \
  --set-as-active \
  --certificate-pem-outfile "${CERTIFICATE_DIR}/certificate.pem.crt" \
  --public-key-outfile "${CERTIFICATE_DIR}/public.pem.key" \
  --private-key-outfile "${CERTIFICATE_DIR}/private.pem.key" \
  --query certificateArn \
  --output text \
)

aws iot attach-policy \
  --policy-name ${IOT_POLICY_NAME} \
  --target ${CERTIFICATE_ARN}

aws iot create-thing \
  --thing-name ${IOT_THING_NAME} \
  > /dev/null 2>&1

aws iot attach-thing-principal \
  --thing-name ${IOT_THING_NAME} \
  --principal ${CERTIFICATE_ARN}

IOT_CORE_ENDPOINT=$(aws iot describe-endpoint --endpoint-type iot:Data-ATS \
  --query endpointAddress \
  --output text
)

JSON=$(cat << EOS
{
  "deviceId": "${IOT_THING_NAME}",
  "endpoint": "${IOT_CORE_ENDPOINT}"
}
EOS
)

echo "${JSON}"

クライアント証明書に紐付けるポリシーは、contact-iot-device-iot-policyという名前で事前に作成されていることを前提とします。

contact-iot-device-iot-policy.json

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": "iot:*",
      "Resource": "*"
    }
  ]
}

実行

セットアップ用のスクリプトを実行します。その後、吐き出される JSON から必要な情報を抜き出して環境変数として設定します。

$ sudo apt-get update
$ sudo apt-get install jq

$ ./device-setup.sh > device-info.json
$ cat device-info.json 
{
  "deviceId": "contact-iot-device-iot-thing-xxxxx",
  "endpoint": "yyyyy-ats.iot.zzzzz.amazonaws.com"
}

$ echo "export DEVICE_ID=$(cat device-info.json | jq .deviceId)" >> ~/.profile
$ echo "export ENDPOINT=$(cat device-info.json | jq .endpoint)" >> ~/.profile
$ source ~/.profile

MQTT で AWS IoT Core に繋げる

実装

AWS IoT SDK for Python v2 を使用して、MQTT 接続用モジュールを実装します。

mqtt_connector.py

import json

from awscrt import io, mqtt
from awsiot import mqtt_connection_builder


class MqttConnector:

    def __init__(self, client_id, endpoint, path_to_root, path_to_key, path_to_cert):
        event_loop_group = io.EventLoopGroup(1)
        host_resolver = io.DefaultHostResolver(event_loop_group)
        client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)

        self.client = mqtt_connection_builder.mtls_from_path(
            client_bootstrap=client_bootstrap,
            client_id=client_id,
            endpoint=endpoint,
            cert_filepath=path_to_cert,
            pri_key_filepath=path_to_key,
            ca_filepath=path_to_root,
            clean_session=False,
            keep_alive_secs=6)

    def __del__(self):
        disconnect_future = self.client.disconnect()
        disconnect_future.result()

    def connect(self):
        connect_future = self.client.connect()
        connect_future.result()

    def publish(self, topic, message):
        self.client.publish(
            topic=topic,
            payload=json.dumps(message),
            qos=mqtt.QoS.AT_LEAST_ONCE)

コンタクトレンズの着脱を検知したタイミング(ボタンが押下がされたら)で Publish する処理をmain.pyに追加します。追加したコードの部分が分かりやすいようにハイライト表示しています。

main.py

import RPi.GPIO as GPIO
import os
from time import sleep

import wiringpi

from mqtt_connector import MqttConnector

DEVICE_ID = os.environ['DEVICE_ID']
ENDPOINT = os.environ['ENDPOINT']
TOPIC = 'contact/case/' + DEVICE_ID

LED_PIN = 25
BTN_PIN = 24
SERVO_PIN = 18
OPEN = 36
CLOSE = 80

mqtt_connector = MqttConnector(
                  DEVICE_ID,
                  ENDPOINT,
                  'certificates/root.pem',
                  'certificates/private.pem.key',
                  'certificates/certificate.pem.crt')
mqtt_connector.connect()

on_off = GPIO.LOW


def callback(ch):
    global on_off, mqtt_connector
    if ch == BTN_PIN:
        on_off = not on_off
        GPIO.output(LED_PIN, on_off)
        if on_off == GPIO.HIGH:
            wiringpi.pwmWrite(SERVO_PIN, CLOSE)
            mqtt_connector.publish(TOPIC, {})
        else:
            wiringpi.pwmWrite(SERVO_PIN, OPEN)
            mqtt_connector.publish(TOPIC, {})


GPIO.setmode(GPIO.BCM)
GPIO.setup(LED_PIN, GPIO.OUT, initial=GPIO.LOW)
GPIO.setup(BTN_PIN, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
GPIO.add_event_detect(BTN_PIN, GPIO.RISING, callback=callback, bouncetime=200)

wiringpi.wiringPiSetupGpio()
wiringpi.pinMode(SERVO_PIN, wiringpi.GPIO.PWM_OUTPUT)
wiringpi.pwmSetMode(wiringpi.GPIO.PWM_MODE_MS)
wiringpi.pwmSetClock(375)
wiringpi.pwmWrite(SERVO_PIN, OPEN)

try:
    print('start...')
    while True:
        sleep(0.01)

except KeyboardInterrupt:
    pass

print('end...')

del mqtt_connector

GPIO.cleanup()

実行

AWS IoT SDK for Python v2をインストールしてからmain.py を実行します。

$ sudo pip3 install awsiotsdk
$ sudo -E python3 main.py

コンタクトレンズの着脱が検知されたタイミングで、その時刻が DynamoDB に保存されます。

おわりに

デバイスの作成 & 着脱時間の保存まで実装できました。続きはアプリ側の実装です。アプリ側では、コンタクトレンズの着用時間等に応じて通知が送られてきたり、着用データを可視化したりしようと考えています。

ひと通り完成したら、どこかで LT したいなあと考えています。

今回は以上になります。最後まで読んで頂きありがとうございました!