Programmable LED 〜Slackから点灯パターンを設定できるクリスマスツリー用のLEDデバイスを作ってみました〜

2023.09.16

1 はじめに

CX 事業本部 delivery部の平内(SIN)です。

本記事は、100円ショップで販売されているテープライトを「クリスマスツリーで使って見たい!」ということで、簡単な工作をした記録です。

完全に個人的な趣味で申し訳ありません。少しでも興味が持てそうでしたら、続けて読んで頂ければ嬉しいです。

最初に完成したものを見てやってください。

Slackでパターンを投稿するとツリーの点灯が変化します。「職場のみんなで、色々なパターンを作って楽しめたらいいなぁ〜」という妄想が元となっています。

2 ハードウエア

(1) USBコネクタ

とりあえずUSBコネクタ(メス)が4つ欲しかったのですが、もう使っていない古いUSBハブがあったので、バラしました。

コネクタは表面実装でしたが、ハンドコテ2本で、全体的に溶かすことで、うまく外す事ができました。

(2) ユニバーサル基板への取り付け

表面実装の部品なため、ユニバーサル基板に差し込む足が無いので、ワイヤーで結んで半田で固めちゃいました。

+と-の端子にそのまま接続して、点灯できるかどうか確認している様子です。

(3) 組み立て

雑ですが、設計図です。基盤の裏から見た絵を書いてみて、そのまま作ってます。

トランジスタは、手元に残っていた、2SC1815を使用しました。

電流制限とプルダウンは、不要になった機器から外したチップ抵抗です。作業中に過って紛失したりしても、いっぱいあるので、全然余裕です。

チップ部品のハンダ付けは、マイクロスコープが有れば、老眼でも大丈です。

とりあえず、動作確認しながら、1つ作って行きました。

(4) ケース

ケース用に塩ビ板を買ってきました。

ユニバーサル基板の上下に配置するために同じサイズで2枚カットしてます。

ユニバーサル基盤に合わせて四隅に穴を開けました。

後は、なんとかサイズの合いそうなネジを探して組み立ててます。ネジは、過去に、趣味で家電などを分解して集めたものです。

ケースといいながら、基盤への接触を避けるため、上下に塩ビ板を付けただけです。

3 ESP32

作成したデバイスのロジックのON/OFFは、ESP32で行っています。併せて、MQTTで点灯パターンの指示を受けるようになっています。

(1) MicroPython

ESP32は、MicroPythonで使用していますが、初期化や利用方法については、下記で紹介しているものと同じです。

(2) 接続

ESP32と作成したハードウエアを接続した様子です。

下記のGPIOが、USB端子のロジック(L1〜L4)に接続されています。

GPIO12 L1 WHITE
GPIO14 L2 RED
GPIO27 L3 YELLOW
GPIO26 L4 BLUE

(3) コード

ESP32で動作しているコードです。

pattern_list[]が、4つのUSBコネクタの電源を0.1秒単位でON/OFFする 0,1 のパターンです。

MQTTで、AWS IoT Coreに接続し、「christmas-tree/デバイス番号」をSubscribeしています。 メッセージは、下記のような形式となっており、受け取った時点で、pattern_list[]を書き換えます。

各色のパターンの長さは、特に制限がありません。一番長いパターンに併せて、足りない分は0で埋められます。

payload = {
   "white" : "000000000000000000000000001010100000",
   "red" : "0000110011000011000000001100000000",
   "blue" : "1010001000101000101010100000000000",
   "yellow" : "000000000000000000000000001010100000"
}
main.py
import os
import time
import json
import machine
import network
from simple import MQTTClient

state = "OK"

wifi_ssid = "XXXX"
wifi_password = "XXXXXXXX"

aws_endpoint = b"xxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com"
private_key = "private.pem.key"
private_cert = "cert.pem.crt"


pattern_list = ["1111100000", "1111100000", "1111100000", "1111100000"]

with open(private_key, "r") as f:
    key = f.read()
with open(private_cert, "r") as f:
    cert = f.read()

device_id = "0001"
topic_sub = "christmas-tree/" + device_id
ssl_params = {"key": key, "cert": cert, "server_side": False}

led_list = []
led_list.append(machine.Pin(12, machine.Pin.OUT))  # GPIO12 L1 WHITE
led_list.append(machine.Pin(14, machine.Pin.OUT))  # GPIO14 L2 RED
led_list.append(machine.Pin(27, machine.Pin.OUT))  # GPIO27 L3 YELLOW
led_list.append(machine.Pin(26, machine.Pin.OUT))  # GPIO26 L4 BLUE

wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
    print("Connecting to network...")
    wlan.connect(wifi_ssid, wifi_password)
    while not wlan.isconnected():
        pass
    print("Connection successful")
    print("Network config:", wlan.ifconfig())


def mqtt_connect(client=device_id, endpoint=aws_endpoint, sslp=ssl_params):
    mqtt = MQTTClient(
        client_id=client,
        server=endpoint,
        port=8883,
        keepalive=1200,
        ssl=True,
        ssl_params=sslp,
    )
    print("Connecting to AWS IoT...")
    mqtt.connect()
    print("Connected.")
    return mqtt


def mqtt_subscribe(topic, msg):
    print("on received...")
    print("msg.decode(): {}".format(msg.decode()))
    global pattern_list

    for i, color in enumerate(["white", "red", "yellow", "blue"]):
        pattern_list[i] = json.loads(msg.decode())[color]

    # 一番長いパターンに合わせるため、不足分に"0"を追加する
    length_list = []
    for i in range(len(pattern_list)):
        length_list.append(len(pattern_list[i]))
    max_length = max(length_list)
    print("length_list:{} max_length: {}".format(length_list, max_length))
    for i in range(len(pattern_list)):
        for _ in range(max_length - length_list[i]):
            pattern_list[i] = pattern_list[i] + "0"

    for i in range(len(pattern_list)):
        print("pattern_list[{}]: {}".format(i, pattern_list[i]))


try:
    mqtt = mqtt_connect()
    mqtt.set_callback(mqtt_subscribe)
    mqtt.subscribe(topic_sub)
except:
    print("Unable to connect to MQTT.")


index = 0

while True:
    try:
        mqtt.check_msg()
    except:
        print("Unable to check for messages.")

    time.sleep(0.1)

    index += 1
    if index >= len(pattern_list[0]):
        index = 0

    for n in range(4):
        pattern = pattern_list[n][0 + index : 1 + index]
        led_list[n].value(int(pattern))

    if index % 20 == 0:
        print("pattern_list: {}".format(pattern_list))

MQTT処理は、umqtt.simpleを、そのまま使用させていただいています。
umqtt.simple — MQTT client function

詳細は、https://dev.classmethod.jp/articles/cloudwatch-alarm-device-with-esp32-micropython/をご参照ください

4 Slack

Slackに投稿する内容は、以下のようなイメージになります。

  • Whiteだけが、0.5秒単位で点滅(その他は消灯)
white:1111100000
  • WhiteとBlueが、0.5秒単位で交互に点滅
white:1111100000
blue:0000011111
  • ちょっと複雑なパターン
white : 000000000000000000000000001010100000
red : 0000110011000011000000001100000000
blue : 1010001000101000101010100000000000
yellow : 000000000000000000000000001010100000

投稿内容を処理するために作成したSlackアプリのマニフェストは以下のとおりです。

display_information:
  name: christmas-tree
features:
  bot_user:
    display_name: christmas-tree
    always_online: false
oauth_config:
  scopes:
    user:
      - groups:history
    bot:
      - incoming-webhook
settings:
  event_subscriptions:
    request_url: https://xxxxxxxxxxxxxxxxxxxzzaf.lambda-url.ap-northeast-1.on.aws/
    user_events:
      - message.groups
  org_deploy_enabled: false
  socket_mode_enabled: false
  token_rotation_enabled: false

投稿内容をLambdaに送信するために、Event Subscriptions は、API Gateway無しで、Lambdaの関数URLに直接接続されています。LambdaからのSlackへの書き込みは、Incoming Webhooks を使用しています。

投稿を受け取ったLambdaで、MQTTへPublishしているコードは、以下のとおりです。

lambda_function.py
import json
import requests # requests を使用するためにLayerが必要
import boto3

url = "https://hooks.slack.com/services/xxxxxxxx/xxxxxxxx/xxxxxxxxxxxxDERSV1MxZ"
respone_message = "設定しました"


def text_to_pattern(text):
    lines = text.split("\n")
    pattern = {"white": "", "red": "", "yellow": "", "blue": ""}
    for line in lines:
        tmp = line.split(":")
        pattern[tmp[0].strip()] = tmp[1].strip()
    return pattern


def send_pattern_to_mqtt(pattern):
    topic = "christmas-tree/0001"
    iot = boto3.client("iot-data")
    payload = pattern
    iot.publish(topic=topic, qos=1, payload=json.dumps(payload, ensure_ascii=False))


def send_response_to_slack(pattern):
    data = {"text": "{}\n{}".format(respone_message, json.dumps(pattern))}
    response = requests.post(url, data=json.dumps(data))
    print("slack response: {} {}".format(response.status_code, response.text))


def job(text):
    pattern = text_to_pattern(text)
    send_response_to_slack(pattern)
    send_pattern_to_mqtt(pattern)


def lambda_handler(event, context):
    # Enable Eventsの認証時に使用
    # return {
    #     'statusCode': 200,
    #     'body': json.dumps(event["body"])
    # }

    # Lambdaが投稿したrespone_messageが含まれていないものを処理対象としている
    if not respone_message in event:
        if "body" in event:
            body = json.loads(event["body"])
            if "event" in body:
                event = body["event"]
                if "text" in event:
                    job(event["text"])

5 最後に

完全に個人的な趣味の記録で、ほんと申し訳ありません。 ここまで、読んで下さった方に感謝です。