ESP32 + Micropython + MQTT で CloudWatch Alarmを机上で表示するデバイスを作ってみました
1 はじめに
CX 事業本部 delivery部の平内(SIN)です。
CloudWatch Alarmは、システムの監視に幅広く利用されていると思いますが、監視対象がアラーム状態となった時に、それを認識する方法は、要求仕様に基づいて色々構築されていると思います。 今回は、その1つの形として、机の上で光ってアラートを知らせるデバイスを作成してみました。
最初に動作している様子です。
このデバイスは、IoT CoreのMQTTトピックをSubscribeしており、Statusが「ALARM」となった時に、点灯し、「OK」となった時に、消灯するようになっています。 動画では、ALARM状態になった時にSNSでトリガーされるLambdaを擬似的に手動で実行しています。
動画では、分かりにくいですが、デバイスを撮影した写真は、次のようになっています。
MQTTで制御できるパトランプなど、既に商品として販売されているものもありますが、今回は、できるだけ費用を抑えることも目的としています。
警報で光らせているLEDは、ダイソーで販売されている330円のものであり、デバイスを構成するCPUも、WiFi付きで1台1,000円程度で手に入るESP32の開発基盤を使用しています。
2 構成
全体の構成です。
❶ 監視対象のLambdaのERRORメトリクスは、Alarmとして設定されています。
❷ Alarmには、SNSトピックへのメッセージ送信が、アクションとして追加されています。
❸ SNSにメッセージが到着すると、トリガー設定しているLambdaが起動されます。
❹ Lambdaは、メッセージ内容(Status=ALARM/OK)をIoT CoreにPublishします。
❺ Publishされたメッセージは、予め、IoT CoreをSubscribeしているデバイスで受信され、その内容(Status=ALARM/OK)によってLEDの表示を変更します。
3 LEDテープライト
LEDテープライトは、最初に紹介した通り、ダイソーで330円で販売されているもので。USBコネクタを接続するだけで、簡単に点灯させることができます。
USBのメスコネクタを準備することができなかったので、ブレッドボードで利用できるように端子に付け替えています。
4 デバイス
(1) ESP32
ESP32の開発ボードは、Amazonで購入しました。
ESP32Sは、ESP32-DevKitCの互換モデルで、30ピンで、ややサイズも小さくなっているものです。
参考:【ESP32の選び方】ESP32-DevKitC・ESP32S NodeMCUは何が違うのか?
2023/06/18 現在、概ね1,000円程度で、色々見つけることができるようです。
(2) LEDドライブ
ESP32のGPIO(13)でLEDテープライトをON/OFFしていますが、駆動に5V/395mAを必要としており、GPIOから吸い出すことができないので、トランジスタでドライブしています。
動作確認している様子です。
(3) MicroPython
ESP32は、MicroPythonで使用しています。
esp32-20230426-v1.20.0.bin https://micropython.org/download/esp32/
- ファームの書き込み
% esptool.py --port /dev/cu.SLAB_USBtoUART erase_flash % esptool.py --chip esp32 --port /dev/tty.SLAB_USBtoUART --baud 460800 write_flash -z 0x1000 esp32-20230426-v1.20.0.bin
- シリアル接続
% screen /dev/tty.SLAB_USBtoUART 115200 >>> import os >>> os.uname() (sysname='esp32', nodename='esp32', release='1.20.0', version='v1.20.0 on 2023-04-26', machine='ESP32 module with ESP32')
- rshell
ampyは、なぜか、ときどき途中で止まってしまったので、rshellでファイルを書き込みました
% rshell -p /dev/cu.SLAB_USBtoUART > ls /pyboard > cp cert.pem.crt /pyboard > cp private.pem.key /pyboard > cp simple.py /pyboard > cp main.py /pyboard
MicroPythonで動作しているコードです。
MQTTのメインコードは、umqtt.simpleを、そのまま使用させていただいています。 umqtt.simple — MQTT client function
また、下記のブログを参考にさせていただきました。
Using MicroPython to get started with AWS IoT Core
main.py
import os import time import json import machine import network from simple import MQTTClient state = "OK" wifi_ssid = "XXX" wifi_password = "XXXXXXXX" aws_endpoint = b"xxxxxxxxxxxx.iot.ap-northeast-1.amazonaws.com" private_key = "private.pem.key" private_cert = "cert.pem.crt" with open(private_key, "r") as f: key = f.read() with open(private_cert, "r") as f: cert = f.read() device_id = "0001" topic_sub = "alarm-sample/" + device_id ssl_params = {"key": key, "cert": cert, "server_side": False} led_tape = machine.Pin(13, machine.Pin.OUT) led_onboard = machine.Pin(2, machine.Pin.OUT) wlan = network.WLAN(network.STA_IF) wlan.active(True) if not wlan.isconnected(): print("Connecting to network...") wlan.connect(wifi_ssid, wifi_password) while not wlan.isconnected(): pass print("Connection successful") print("Network config:", wlan.ifconfig()) def mqtt_connect(client=device_id, endpoint=aws_endpoint, sslp=ssl_params): mqtt = MQTTClient( client_id=client, server=endpoint, port=8883, keepalive=1200, ssl=True, ssl_params=sslp, ) print("Connecting to AWS IoT...") mqtt.connect() print("Connected.") return mqtt def mqtt_subscribe(topic, msg): print("on received...") global state state = json.loads(msg.decode())["state"] print("state {}".format(state)) try: mqtt = mqtt_connect() mqtt.set_callback(mqtt_subscribe) mqtt.subscribe(topic_sub) except: print("Unable to connect to MQTT.") led_onboard.value(1) toggle = False while True: try: mqtt.check_msg() except: print("Unable to check for messages.") if state == "ALARM": # 点滅 if toggle: led_tape.value(1) toggle = False else: led_tape.value(0) toggle = True else: # 消灯 led_tape.value(0) print("sleep for 1 seconds state:{}".format(state)) time.sleep(1)
5 CDK
クラウド側のCloudwatch Alarm、SNSトピック、Lambdaを構築しているCDKです。
監視対象のLambda(test_function)のErrorメトリクスにアラームを設定し、SNSトピックをトリガーLambdaが起動するようになっています。
import * as cdk from "aws-cdk-lib"; import { Construct } from "constructs"; import { aws_cloudwatch, aws_cloudwatch_actions, aws_sns, aws_iam, aws_lambda_event_sources, aws_lambda_nodejs, aws_lambda, } from "aws-cdk-lib"; export class LedAlarmCdkStack extends cdk.Stack { constructor(scope: Construct, id: string, props?: cdk.StackProps) { super(scope, id, props); const tag = "led_alarm"; const target_functionName = "test_function"; const topic = new aws_sns.Topic(this, `Topic`, { displayName: `${tag}_Topic`, topicName: `${tag}_Topic`, }); const alarm = new aws_cloudwatch.Alarm(this, "Alarm", { alarmName: `${tag}_Alarm`, comparisonOperator: aws_cloudwatch.ComparisonOperator.GREATER_THAN_OR_EQUAL_TO_THRESHOLD, threshold: 1, evaluationPeriods: 1, datapointsToAlarm: 1, treatMissingData: aws_cloudwatch.TreatMissingData.NOT_BREACHING, metric: new aws_cloudwatch.Metric({ namespace: "AWS/Lambda", metricName: "Errors", dimensionsMap: { FunctionName: target_functionName }, statistic: "SUM", period: cdk.Duration.minutes(1), }), }); alarm.addAlarmAction(new aws_cloudwatch_actions.SnsAction(topic)); alarm.addOkAction(new aws_cloudwatch_actions.SnsAction(topic)); const lambda = new aws_lambda_nodejs.NodejsFunction(this, "lambda", { functionName: `${tag}_function`, entry: "lambda/index.ts", handler: "handler", runtime: aws_lambda.Runtime.NODEJS_18_X, logRetention: cdk.aws_logs.RetentionDays.TWO_WEEKS, }); lambda.addToRolePolicy( new aws_iam.PolicyStatement({ resources: ["*"], actions: ["iot:Publish"], }) ); lambda.addEventSource( new aws_lambda_event_sources.SnsEventSource(topic, {}) ); } }
SNSトピックをトリガーして動作するLambdadでは、MQTTメッセージをPublishしてます。
Lambda/indexts
import { SNSEvent } from "aws-lambda"; import * as AWS from "aws-sdk"; const endpoint = "xxxxxxxxxxxx.iot.ap-northeast-1.amazonaws.com"; const topic = "alarm-sample/0001"; const region = "ap-northeast-1"; async function publish(state: string) { console.log(`publish(${state})`); const iotdata = new AWS.IotData({ endpoint: endpoint, region: region, }); const params = { topic: topic, payload: `{"state":"${state}"}`, qos: 0, }; let result = await iotdata.publish(params).promise(); console.log(`publish result: ${result}`); } export const handler = async (event: SNSEvent, _: any) => { for (const record of event.Records) { const message = JSON.parse(record.Sns.Message); if (message.hasOwnProperty("NewStateValue")) { const newStateValue = message["NewStateValue"]; // "ALARM" of "OK" await publish(newStateValue); } } };
監視対象のLmabdaが、例外で落ちたりすると、Errorメトリクスでアラート状態となります。
SNS経由でLambdaが起動され、メッセージをPublishしている様子です。
6 最後に
今回は、CloudWatchのアラームを机上で確認できるようにするデバイスを作成してみました。
価格を抑えようとすると、どうしても、簡単な工作などが必要になることが避けられませんでした。このまま利用すると言うのは、あまり無いかなと思うのですが、この記事が、何かのヒントになれば嬉しく思います。
コードは、下記に置きました。説明不足については、こちらご参照いいただければ幸いです。
https://github.com/furuya02/led-alarm