AWS IoT Coreで仮想的に複数デバイスを検証するスクリプトのご紹介

AWS IoT Coreで仮想的に複数デバイスを検証するスクリプトのご紹介

2026.02.24

歴史シミュレーションゲーム好きのくろすけです!

AWS IoT Core(以降 IoT Core)で複数デバイスを使用した場合の挙動を見てみたいと思い、仮想的に複数台のダミーデバイスを動かせるスクリプト dummy-devices.py を作ってみました。
この記事ではその概要と使い方を紹介します。

概要

dummy-devices.py は、IoT Core に接続してテレメトリを送信し、1 台または複数台をまとめて動かすための Python スクリプトです。

下記の初級ハンズオンで使用されているスクリプトをベースに作成しています。

https://catalog.us-east-1.prod.workshops.aws/workshops/b3e0b830-79b8-4c1d-8a4c-e10406600035/ja-JP/phase2/step1

https://static.us-east-1.prod.workshops.aws/public/4bd9eca7-3826-46d9-94fa-a2a69774089f/static/code/dummy_client/device_main.py

構成

このスクリプトの想定環境は、下記になります。
IoT Core にモノを登録する程度なので、簡単に試すことができると思います。

dummy-devices.png

やってみた

1. 前提条件とインストール

  • Python 3 と pip が利用できること
  • IoT Core で Thing と証明書(ルート CA・デバイス証明書・秘密鍵)を用意済みであること
    • こちらは上述のハンズオンに詳しいので割愛します
requirements.txt
awscrt==0.31.2
awsiotsdk==1.28.1

仮想環境を作成、ログインし、必要ライブラリをインストール

python -m venv .venv
. .venv/bin/activate
pip install -U pip
pip install -r requirements.txt

2. 設定ファイル(devices.json)の準備

このスクリプトではデバイスの設定値を外部のjsonファイルで設定する仕様となっています。
下記がその設定ファイルの例です。

事前に作成、準備したデバイスごとの証明書を各デバイス用のフォルダに保存してください。

devices.json
{
  "endpoint": "XXXXXXXXXX-ats.iot.ap-northeast-1.amazonaws.com",
  "root_ca": "./certs/AmazonRootCA1.pem",
  "verbosity": "NoLogs",
  "devices": [
    {
      "device_name": "dummy-device-001",
      "cert": "./certs/dummy-device-001/certificate-001.pem.crt",
      "private": "./certs/dummy-device-001/private-001.pem.key",
      "topic": "data/smpl/fukuoka/lab/dummy-device-001"
    },
    {
      "device_name": "dummy-device-002",
      "cert": "./certs/dummy-device-002/certificate-002.pem.crt",
      "private": "./certs/dummy-device-002/private-002.pem.key",
      "topic": "data/smpl/fukuoka/cafe/dummy-device-002"
    },
    {
      "device_name": "dummy-device-003",
      "cert": "./certs/dummy-device-003/certificate-003.pem.crt",
      "private": "./certs/dummy-device-003/private-003.pem.key",
      "topic": "data/smpl/hibiya/lab/dummy-device-003"
    },
    {
      "device_name": "dummy-device-004",
      "cert": "./certs/dummy-device-004/certificate-004.pem.crt",
      "private": "./certs/dummy-device-004/private-004.pem.key",
      "topic": "data/smpl/hibiya/cafe/dummy-device-004"
    }
  ]
}

3. 実行方法

実際の dummy-devices.py が下記です。

dummy-devices.py
from __future__ import absolute_import, print_function

import argparse
import json
import logging
import multiprocessing
import os
import random
import signal
import sys
import time
import traceback
from datetime import datetime

from awscrt import io, mqtt
from awsiot import iotshadow, mqtt_connection_builder

DEFAULT_TOPIC_PREFIX = "data/"
DEFAULT_WAIT_TIME = 5
SHADOW_WAIT_TIME_KEY = "wait_time"
KEEP_ALIVE = 300

mqtt_connection = None
shadow_client = None
wait_time = DEFAULT_WAIT_TIME
device_name = None

logger = logging.getLogger()
handler = logging.StreamHandler(sys.stdout)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
logging.basicConfig()


def parse_args():
    """
    Parse command-line arguments (config file path and optional verbosity).
    """
    parser = argparse.ArgumentParser(
        description="Run one or more dummy IoT devices from a JSON config file."
    )
    parser.add_argument(
        "--config",
        default="./devices.json",
        help="Path to JSON config file defining devices (see devices.json.example)",
    )
    parser.add_argument(
        "--verbosity",
        choices=[x.name for x in io.LogLevel],
        default=io.LogLevel.NoLogs.name,
        help="Logging level",
    )
    return parser.parse_args()


def load_config(config_path, verbosity_override=None):
    """
    Load device list from JSON config and return a list of per-device config dicts.

    JSON format:
      - Top-level optional defaults (applied to all devices): endpoint, root_ca, verbosity
      - "devices": array of objects. Each must have: device_name, cert, private.
        Optional per-device: endpoint, root_ca, topic (MQTT topic for telemetry; default "data/<device_name>")

    Returns
    -------
    list of dict
        Each dict has: device_name, endpoint, certs [root_ca, private, cert], verbosity
    """
    with open(config_path, "r", encoding="utf-8") as f:
        raw = json.load(f)

    defaults = {
        "endpoint": raw.get("endpoint"),
        "root_ca": raw.get("root_ca"),
        "verbosity": verbosity_override or raw.get("verbosity", io.LogLevel.NoLogs.name),
    }
    if not raw.get("devices"):
        raise ValueError("Config must contain a non-empty 'devices' array")

    base_dir = os.path.dirname(os.path.abspath(config_path))

    def resolve_path(p):
        if p is None:
            return None
        if os.path.isabs(p):
            return p
        ref_dir = base_dir if base_dir else os.getcwd()
        return os.path.normpath(os.path.join(ref_dir, p))

    configs = []
    for i, dev in enumerate(raw["devices"]):
        device_name = dev.get("device_name")
        if not device_name:
            raise ValueError("devices[{}] must have 'device_name'".format(i))
        endpoint = dev.get("endpoint") or defaults["endpoint"]
        root_ca = resolve_path(dev.get("root_ca") or defaults["root_ca"])
        cert = resolve_path(dev.get("cert"))
        private = resolve_path(dev.get("private"))
        if not endpoint:
            raise ValueError(
                "devices[{}]: 'endpoint' required (set in device or top-level)".format(i)
            )
        if not cert or not private:
            raise ValueError("devices[{}]: 'cert' and 'private' are required".format(i))
        if not root_ca:
            root_ca = find_certs_file()[0]
            if not os.path.isabs(root_ca):
                root_ca = os.path.normpath(os.path.join(os.getcwd(), root_ca))
        cert_list = [root_ca, private, cert]
        file_exist_check(cert_list)
        topic = dev.get("topic") or (DEFAULT_TOPIC_PREFIX + device_name)
        configs.append(
            {
                "device_name": device_name,
                "endpoint": endpoint,
                "certs": cert_list,
                "verbosity": defaults["verbosity"],
                "topic": topic,
            }
        )
    return configs


def arg_check():
    """
    Legacy: kept for any caller that expects a single init_dict.
    Use load_config() + device_main(init_dict) instead.
    """
    args = parse_args()
    configs = load_config(args.config, verbosity_override=args.verbosity)
    if len(configs) != 1:
        raise RuntimeError("arg_check expects exactly one device; use --config with one device")
    return configs[0]


def file_exist_check(cert_list):
    """
    Check the files exists
    all certs must placed in ./certs directory

    Parameters
    ----------
    cert_list: Array
    """

    for file in cert_list:
        if not os.path.exists(file):
            # if file not found, raise
            logger.error("cert file not found:%s", file)
            raise RuntimeError("file_not_exists")


def find_certs_file():
    """
    Find the certificates file from ./certs directory

    Returns
    ----------
    file_list: Array
        0: Root CA Cert, 1: private key, 2: certificate
    """

    certs_dir = "./certs"
    file_list = ["AmazonRootCA1.pem", "private.pem", "certificate.crt"]
    for _, _, names in os.walk(certs_dir):
        for file in names:
            if "AmazonRootCA1.pem" in file:
                file_list[0] = certs_dir + "/" + file
            elif "private" in file:
                file_list[1] = certs_dir + "/" + file
            elif "certificate" in file:
                file_list[2] = certs_dir + "/" + file

    return file_list

def device_main(init_dict):
    """
    Run a single dummy device: connect, subscribe to shadow, publish telemetry loop.

    Parameters
    ----------
    init_dict : dict
        Must have: device_name, endpoint, certs [root_ca, private_key, cert], verbosity, topic
    """
    global device_name, mqtt_connection
    # global shadow_client  # Device Shadow コメントアウトのため不要

    device_name = init_dict["device_name"]
    iot_endpoint = init_dict["endpoint"]
    rootca_file = init_dict["certs"][0]
    private_key_file = init_dict["certs"][1]
    certificate_file = init_dict["certs"][2]
    verbosity_name = init_dict.get("verbosity", io.LogLevel.NoLogs.name)

    log_level = getattr(io.LogLevel, verbosity_name, io.LogLevel.NoLogs)
    io.init_logging(log_level, "stderr")
    logger.setLevel(logging.DEBUG if verbosity_name in ("Debug", "Trace") else logging.INFO)
    logging.basicConfig()

    logger.info("device_name: %s", device_name)
    logger.info("endpoint: %s", iot_endpoint)
    logger.info("rootca cert: %s", rootca_file)
    logger.info("private key: %s", private_key_file)
    logger.info("certificate: %s", certificate_file)

    # Spin up resources
    event_loop_group = io.EventLoopGroup(1)
    host_resolver = io.DefaultHostResolver(event_loop_group)
    client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)

    mqtt_connection = mqtt_connection_builder.mtls_from_path(
        endpoint=iot_endpoint,
        cert_filepath=certificate_file,
        pri_key_filepath=private_key_file,
        client_bootstrap=client_bootstrap,
        ca_filepath=rootca_file,
        client_id=device_name,
        clean_session=False,
        keep_alive_secs=KEEP_ALIVE,
    )

    connected_future = mqtt_connection.connect()
    connected_future.result()

    # Start sending dummy data
    topic = init_dict["topic"]
    logging.info("topic: %s", topic)
    while True:
        now = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
        tmp = 20 + random.randint(-5, 5)
        hum = 50 + random.randint(-10, 10)
        payload = {
            "DEVICE_NAME": device_name,
            "TIMESTAMP": now,
            "TEMPERATURE": tmp,
            "HUMIDITY": hum,
        }
        logger.info("  payload: %s", payload)

        mqtt_connection.publish(
            topic=topic, payload=json.dumps(payload), qos=mqtt.QoS.AT_LEAST_ONCE
        )

        time.sleep(wait_time)


def exit_sample(msg_or_exception):
    """
    Exit sample with cleaning

    Parameters
    ----------
    msg_or_exception: str or Exception
    """
    if isinstance(msg_or_exception, Exception):
        logger.error("Exiting sample due to exception.")
        traceback.print_exception(msg_or_exception.__class__, msg_or_exception, sys.exc_info()[2])
    else:
        logger.info("Exiting: %s", msg_or_exception)

    if not mqtt_connection:
        logger.info("Disconnecting...")
        mqtt_connection.disconnect()
    sys.exit(0)


def exit_handler(_signal, frame):
    """
    Exit sample
    """
    exit_sample(" Key abort")


if __name__ == "__main__":
    signal.signal(signal.SIGINT, exit_handler)

    args = parse_args()
    try:
        configs = load_config(args.config, verbosity_override=args.verbosity)
    except Exception as e:
        logger.error("Failed to load config: %s", e)
        sys.exit(1)

    if len(configs) == 1:
        device_main(configs[0])
    else:
        processes = []
        for init_dict in configs:
            p = multiprocessing.Process(target=device_main, args=(init_dict,))
            p.start()
            processes.append(p)
        try:
            for p in processes:
                p.join()
        except KeyboardInterrupt:
            for p in processes:
                p.terminate()
            for p in processes:
                p.join()

設定ファイルのパスを --config で指定して実行します(省略時はカレントディレクトリの devices.json を参照)。

実行コマンド

python dummy-devices.py --config devices.json
  • 設定に1台だけ書いてある場合は、その1台がメインプロセスで動作します。
  • 複数台書いてある場合は、デバイスごとに子プロセスが起動し、それぞれが IoT Core に接続してテレメトリを送信します。

実行時のイメージ

CleanShot20260224at20.43.45.png

4. 実行結果

マネジメントコンソールの MQTTテストクライアント で確認したパブリッシュの結果が下記です。

せっかくトピックも設定できるようにしたので、ワイルドカードでフィルタしてみました。
最初、ワイルドカードの指定方法を間違っており、パブリッシュが確認出来ずというアクシデントがありましたが、無事成功してました。

CleanShot20260224at20.45.38.png
CleanShot20260224at20.45.59.png

あとがき

簡単にではありますがこのスクリプトで複数デバイス分の挙動を再現できるので、自分のように IoT Core 入門レベルの方がサービスや設計のイメージを掴む分には役に立つかなと思っています。
今回デバイスを手動で登録しましたが、面倒だったので今度は一括登録なども試してみたいと思っています。

以上、くろすけでした!

この記事をシェアする

FacebookHatena blogX

関連記事