ESP32 + Micropython + MQTT で CloudWatch Alarmを机上で表示するデバイスを作ってみました

2023.07.01

1 はじめに

CX 事業本部 delivery部の平内(SIN)です。

CloudWatch Alarmは、システムの監視に幅広く利用されていると思いますが、監視対象がアラーム状態となった時に、それを認識する方法は、要求仕様に基づいて色々構築されていると思います。 今回は、その1つの形として、机の上で光ってアラートを知らせるデバイスを作成してみました。

最初に動作している様子です。

このデバイスは、IoT CoreのMQTTトピックをSubscribeしており、Statusが「ALARM」となった時に、点灯し、「OK」となった時に、消灯するようになっています。 動画では、ALARM状態になった時にSNSでトリガーされるLambdaを擬似的に手動で実行しています。

動画では、分かりにくいですが、デバイスを撮影した写真は、次のようになっています。

MQTTで制御できるパトランプなど、既に商品として販売されているものもありますが、今回は、できるだけ費用を抑えることも目的としています。

警報で光らせているLEDは、ダイソーで販売されている330円のものであり、デバイスを構成するCPUも、WiFi付きで1台1,000円程度で手に入るESP32の開発基盤を使用しています。

2 構成

全体の構成です。


❶ 監視対象のLambdaのERRORメトリクスは、Alarmとして設定されています。
❷ Alarmには、SNSトピックへのメッセージ送信が、アクションとして追加されています。
❸ SNSにメッセージが到着すると、トリガー設定しているLambdaが起動されます。
❹ Lambdaは、メッセージ内容(Status=ALARM/OK)をIoT CoreにPublishします。
❺ Publishされたメッセージは、予め、IoT CoreをSubscribeしているデバイスで受信され、その内容(Status=ALARM/OK)によってLEDの表示を変更します。

3 LEDテープライト

LEDテープライトは、最初に紹介した通り、ダイソーで330円で販売されているもので。USBコネクタを接続するだけで、簡単に点灯させることができます。

USBのメスコネクタを準備することができなかったので、ブレッドボードで利用できるように端子に付け替えています。

4 デバイス

(1) ESP32

ESP32の開発ボードは、Amazonで購入しました。


KeeYees ESP32 ESP-32S 開発ボード デュアルコア 2.4 GHz WiFi + Bluetoothデュアルモード対応 ESP-WROOM-32モジュール内臓 マイクロコントローラ (2個入れ)

ESP32Sは、ESP32-DevKitCの互換モデルで、30ピンで、ややサイズも小さくなっているものです。

参考:【ESP32の選び方】ESP32-DevKitC・ESP32S NodeMCUは何が違うのか?

2023/06/18 現在、概ね1,000円程度で、色々見つけることができるようです。

(2) LEDドライブ

ESP32のGPIO(13)でLEDテープライトをON/OFFしていますが、駆動に5V/395mAを必要としており、GPIOから吸い出すことができないので、トランジスタでドライブしています。

動作確認している様子です。

(3) MicroPython

ESP32は、MicroPythonで使用しています。

esp32-20230426-v1.20.0.bin https://micropython.org/download/esp32/

  • ファームの書き込み
% esptool.py --port /dev/cu.SLAB_USBtoUART erase_flash
% esptool.py --chip esp32 --port /dev/tty.SLAB_USBtoUART --baud 460800 write_flash -z 0x1000 esp32-20230426-v1.20.0.bin
  • シリアル接続
% screen /dev/tty.SLAB_USBtoUART 115200
>>> import os
>>> os.uname()
(sysname='esp32', nodename='esp32', release='1.20.0', version='v1.20.0 on 2023-04-26', machine='ESP32 module with ESP32')
  • rshell

ampyは、なぜか、ときどき途中で止まってしまったので、rshellでファイルを書き込みました

% rshell -p /dev/cu.SLAB_USBtoUART
> ls /pyboard
> cp cert.pem.crt /pyboard
> cp private.pem.key /pyboard
> cp simple.py /pyboard
> cp main.py /pyboard

MicroPythonで動作しているコードです。

MQTTのメインコードは、umqtt.simpleを、そのまま使用させていただいています。 umqtt.simple — MQTT client function

また、下記のブログを参考にさせていただきました。
Using MicroPython to get started with AWS IoT Core

main.py

import os
import time
import json
import machine
import network
from simple import MQTTClient

state = "OK"

wifi_ssid = "XXX"
wifi_password = "XXXXXXXX"

aws_endpoint = b"xxxxxxxxxxxx.iot.ap-northeast-1.amazonaws.com"
private_key = "private.pem.key"
private_cert = "cert.pem.crt"


with open(private_key, "r") as f:
    key = f.read()
with open(private_cert, "r") as f:
    cert = f.read()

device_id = "0001"
topic_sub = "alarm-sample/" + device_id
ssl_params = {"key": key, "cert": cert, "server_side": False}

led_tape = machine.Pin(13, machine.Pin.OUT)
led_onboard = machine.Pin(2, machine.Pin.OUT)

wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
    print("Connecting to network...")
    wlan.connect(wifi_ssid, wifi_password)
    while not wlan.isconnected():
        pass
    print("Connection successful")
    print("Network config:", wlan.ifconfig())


def mqtt_connect(client=device_id, endpoint=aws_endpoint, sslp=ssl_params):
    mqtt = MQTTClient(
        client_id=client,
        server=endpoint,
        port=8883,
        keepalive=1200,
        ssl=True,
        ssl_params=sslp,
    )
    print("Connecting to AWS IoT...")
    mqtt.connect()
    print("Connected.")
    return mqtt


def mqtt_subscribe(topic, msg):
    print("on received...")
    global state
    state = json.loads(msg.decode())["state"]
    print("state {}".format(state))


try:
    mqtt = mqtt_connect()
    mqtt.set_callback(mqtt_subscribe)
    mqtt.subscribe(topic_sub)
except:
    print("Unable to connect to MQTT.")

led_onboard.value(1)

toggle = False

while True:
    try:
        mqtt.check_msg()
    except:
        print("Unable to check for messages.")

    if state == "ALARM":  # 点滅
        if toggle:
            led_tape.value(1)
            toggle = False
        else:
            led_tape.value(0)
            toggle = True
    else:  # 消灯
        led_tape.value(0)

    print("sleep for 1 seconds state:{}".format(state))
    time.sleep(1)

5 CDK

クラウド側のCloudwatch Alarm、SNSトピック、Lambdaを構築しているCDKです。

監視対象のLambda(test_function)のErrorメトリクスにアラームを設定し、SNSトピックをトリガーLambdaが起動するようになっています。

import * as cdk from "aws-cdk-lib";
import { Construct } from "constructs";
import {
  aws_cloudwatch,
  aws_cloudwatch_actions,
  aws_sns,
  aws_iam,
  aws_lambda_event_sources,
  aws_lambda_nodejs,
  aws_lambda,
} from "aws-cdk-lib";

export class LedAlarmCdkStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    const tag = "led_alarm";
    const target_functionName = "test_function";

    const topic = new aws_sns.Topic(this, `Topic`, {
      displayName: `${tag}_Topic`,
      topicName: `${tag}_Topic`,
    });

    const alarm = new aws_cloudwatch.Alarm(this, "Alarm", {
      alarmName: `${tag}_Alarm`,
      comparisonOperator:
        aws_cloudwatch.ComparisonOperator.GREATER_THAN_OR_EQUAL_TO_THRESHOLD,
      threshold: 1,
      evaluationPeriods: 1,
      datapointsToAlarm: 1,
      treatMissingData: aws_cloudwatch.TreatMissingData.NOT_BREACHING,
      metric: new aws_cloudwatch.Metric({
        namespace: "AWS/Lambda",
        metricName: "Errors",
        dimensionsMap: { FunctionName: target_functionName },
        statistic: "SUM",
        period: cdk.Duration.minutes(1),
      }),
    });
    alarm.addAlarmAction(new aws_cloudwatch_actions.SnsAction(topic));
    alarm.addOkAction(new aws_cloudwatch_actions.SnsAction(topic));

    const lambda = new aws_lambda_nodejs.NodejsFunction(this, "lambda", {
      functionName: `${tag}_function`,
      entry: "lambda/index.ts",
      handler: "handler",
      runtime: aws_lambda.Runtime.NODEJS_18_X,
      logRetention: cdk.aws_logs.RetentionDays.TWO_WEEKS,
    });
    lambda.addToRolePolicy(
      new aws_iam.PolicyStatement({
        resources: ["*"],
        actions: ["iot:Publish"],
      })
    );
    lambda.addEventSource(
      new aws_lambda_event_sources.SnsEventSource(topic, {})
    );
  }
}

SNSトピックをトリガーして動作するLambdadでは、MQTTメッセージをPublishしてます。

Lambda/indexts

import { SNSEvent } from "aws-lambda";
import * as AWS from "aws-sdk";

const endpoint = "xxxxxxxxxxxx.iot.ap-northeast-1.amazonaws.com";
const topic = "alarm-sample/0001";
const region = "ap-northeast-1";

async function publish(state: string) {
  console.log(`publish(${state})`);
  const iotdata = new AWS.IotData({
    endpoint: endpoint,
    region: region,
  });
  const params = {
    topic: topic,
    payload: `{"state":"${state}"}`,
    qos: 0,
  };
  let result = await iotdata.publish(params).promise();
  console.log(`publish result: ${result}`);
}

export const handler = async (event: SNSEvent, _: any) => {
  for (const record of event.Records) {
    const message = JSON.parse(record.Sns.Message);
    if (message.hasOwnProperty("NewStateValue")) {
      const newStateValue = message["NewStateValue"]; // "ALARM" of "OK"
      await publish(newStateValue);
    }
  }
};

監視対象のLmabdaが、例外で落ちたりすると、Errorメトリクスでアラート状態となります。

SNS経由でLambdaが起動され、メッセージをPublishしている様子です。

6 最後に

今回は、CloudWatchのアラームを机上で確認できるようにするデバイスを作成してみました。

価格を抑えようとすると、どうしても、簡単な工作などが必要になることが避けられませんでした。このまま利用すると言うのは、あまり無いかなと思うのですが、この記事が、何かのヒントになれば嬉しく思います。

コードは、下記に置きました。説明不足については、こちらご参照いいただければ幸いです。
https://github.com/furuya02/led-alarm