AWS IoT Coreで仮想的に複数デバイスを検証するスクリプトのご紹介
歴史シミュレーションゲーム好きのくろすけです!
AWS IoT Core(以降 IoT Core)で複数デバイスを使用した場合の挙動を見てみたいと思い、仮想的に複数台のダミーデバイスを動かせるスクリプト dummy-devices.py を作ってみました。
この記事ではその概要と使い方を紹介します。
概要
dummy-devices.py は、IoT Core に接続してテレメトリを送信し、1 台または複数台をまとめて動かすための Python スクリプトです。
下記の初級ハンズオンで使用されているスクリプトをベースに作成しています。
構成
このスクリプトの想定環境は、下記になります。
IoT Core にモノを登録する程度なので、簡単に試すことができると思います。

やってみた
1. 前提条件とインストール
- Python 3 と pip が利用できること
- IoT Core で Thing と証明書(ルート CA・デバイス証明書・秘密鍵)を用意済みであること
- こちらは上述のハンズオンに詳しいので割愛します
requirements.txt
awscrt==0.31.2
awsiotsdk==1.28.1
仮想環境を作成、ログインし、必要ライブラリをインストール
python -m venv .venv
. .venv/bin/activate
pip install -U pip
pip install -r requirements.txt
2. 設定ファイル(devices.json)の準備
このスクリプトではデバイスの設定値を外部のjsonファイルで設定する仕様となっています。
下記がその設定ファイルの例です。
事前に作成、準備したデバイスごとの証明書を各デバイス用のフォルダに保存してください。
devices.json
{
"endpoint": "XXXXXXXXXX-ats.iot.ap-northeast-1.amazonaws.com",
"root_ca": "./certs/AmazonRootCA1.pem",
"verbosity": "NoLogs",
"devices": [
{
"device_name": "dummy-device-001",
"cert": "./certs/dummy-device-001/certificate-001.pem.crt",
"private": "./certs/dummy-device-001/private-001.pem.key",
"topic": "data/smpl/fukuoka/lab/dummy-device-001"
},
{
"device_name": "dummy-device-002",
"cert": "./certs/dummy-device-002/certificate-002.pem.crt",
"private": "./certs/dummy-device-002/private-002.pem.key",
"topic": "data/smpl/fukuoka/cafe/dummy-device-002"
},
{
"device_name": "dummy-device-003",
"cert": "./certs/dummy-device-003/certificate-003.pem.crt",
"private": "./certs/dummy-device-003/private-003.pem.key",
"topic": "data/smpl/hibiya/lab/dummy-device-003"
},
{
"device_name": "dummy-device-004",
"cert": "./certs/dummy-device-004/certificate-004.pem.crt",
"private": "./certs/dummy-device-004/private-004.pem.key",
"topic": "data/smpl/hibiya/cafe/dummy-device-004"
}
]
}
3. 実行方法
実際の dummy-devices.py が下記です。
dummy-devices.py
from __future__ import absolute_import, print_function
import argparse
import json
import logging
import multiprocessing
import os
import random
import signal
import sys
import time
import traceback
from datetime import datetime
from awscrt import io, mqtt
from awsiot import iotshadow, mqtt_connection_builder
DEFAULT_TOPIC_PREFIX = "data/"
DEFAULT_WAIT_TIME = 5
SHADOW_WAIT_TIME_KEY = "wait_time"
KEEP_ALIVE = 300
mqtt_connection = None
shadow_client = None
wait_time = DEFAULT_WAIT_TIME
device_name = None
logger = logging.getLogger()
handler = logging.StreamHandler(sys.stdout)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
logging.basicConfig()
def parse_args():
"""
Parse command-line arguments (config file path and optional verbosity).
"""
parser = argparse.ArgumentParser(
description="Run one or more dummy IoT devices from a JSON config file."
)
parser.add_argument(
"--config",
default="./devices.json",
help="Path to JSON config file defining devices (see devices.json.example)",
)
parser.add_argument(
"--verbosity",
choices=[x.name for x in io.LogLevel],
default=io.LogLevel.NoLogs.name,
help="Logging level",
)
return parser.parse_args()
def load_config(config_path, verbosity_override=None):
"""
Load device list from JSON config and return a list of per-device config dicts.
JSON format:
- Top-level optional defaults (applied to all devices): endpoint, root_ca, verbosity
- "devices": array of objects. Each must have: device_name, cert, private.
Optional per-device: endpoint, root_ca, topic (MQTT topic for telemetry; default "data/<device_name>")
Returns
-------
list of dict
Each dict has: device_name, endpoint, certs [root_ca, private, cert], verbosity
"""
with open(config_path, "r", encoding="utf-8") as f:
raw = json.load(f)
defaults = {
"endpoint": raw.get("endpoint"),
"root_ca": raw.get("root_ca"),
"verbosity": verbosity_override or raw.get("verbosity", io.LogLevel.NoLogs.name),
}
if not raw.get("devices"):
raise ValueError("Config must contain a non-empty 'devices' array")
base_dir = os.path.dirname(os.path.abspath(config_path))
def resolve_path(p):
if p is None:
return None
if os.path.isabs(p):
return p
ref_dir = base_dir if base_dir else os.getcwd()
return os.path.normpath(os.path.join(ref_dir, p))
configs = []
for i, dev in enumerate(raw["devices"]):
device_name = dev.get("device_name")
if not device_name:
raise ValueError("devices[{}] must have 'device_name'".format(i))
endpoint = dev.get("endpoint") or defaults["endpoint"]
root_ca = resolve_path(dev.get("root_ca") or defaults["root_ca"])
cert = resolve_path(dev.get("cert"))
private = resolve_path(dev.get("private"))
if not endpoint:
raise ValueError(
"devices[{}]: 'endpoint' required (set in device or top-level)".format(i)
)
if not cert or not private:
raise ValueError("devices[{}]: 'cert' and 'private' are required".format(i))
if not root_ca:
root_ca = find_certs_file()[0]
if not os.path.isabs(root_ca):
root_ca = os.path.normpath(os.path.join(os.getcwd(), root_ca))
cert_list = [root_ca, private, cert]
file_exist_check(cert_list)
topic = dev.get("topic") or (DEFAULT_TOPIC_PREFIX + device_name)
configs.append(
{
"device_name": device_name,
"endpoint": endpoint,
"certs": cert_list,
"verbosity": defaults["verbosity"],
"topic": topic,
}
)
return configs
def arg_check():
"""
Legacy: kept for any caller that expects a single init_dict.
Use load_config() + device_main(init_dict) instead.
"""
args = parse_args()
configs = load_config(args.config, verbosity_override=args.verbosity)
if len(configs) != 1:
raise RuntimeError("arg_check expects exactly one device; use --config with one device")
return configs[0]
def file_exist_check(cert_list):
"""
Check the files exists
all certs must placed in ./certs directory
Parameters
----------
cert_list: Array
"""
for file in cert_list:
if not os.path.exists(file):
# if file not found, raise
logger.error("cert file not found:%s", file)
raise RuntimeError("file_not_exists")
def find_certs_file():
"""
Find the certificates file from ./certs directory
Returns
----------
file_list: Array
0: Root CA Cert, 1: private key, 2: certificate
"""
certs_dir = "./certs"
file_list = ["AmazonRootCA1.pem", "private.pem", "certificate.crt"]
for _, _, names in os.walk(certs_dir):
for file in names:
if "AmazonRootCA1.pem" in file:
file_list[0] = certs_dir + "/" + file
elif "private" in file:
file_list[1] = certs_dir + "/" + file
elif "certificate" in file:
file_list[2] = certs_dir + "/" + file
return file_list
def device_main(init_dict):
"""
Run a single dummy device: connect, subscribe to shadow, publish telemetry loop.
Parameters
----------
init_dict : dict
Must have: device_name, endpoint, certs [root_ca, private_key, cert], verbosity, topic
"""
global device_name, mqtt_connection
# global shadow_client # Device Shadow コメントアウトのため不要
device_name = init_dict["device_name"]
iot_endpoint = init_dict["endpoint"]
rootca_file = init_dict["certs"][0]
private_key_file = init_dict["certs"][1]
certificate_file = init_dict["certs"][2]
verbosity_name = init_dict.get("verbosity", io.LogLevel.NoLogs.name)
log_level = getattr(io.LogLevel, verbosity_name, io.LogLevel.NoLogs)
io.init_logging(log_level, "stderr")
logger.setLevel(logging.DEBUG if verbosity_name in ("Debug", "Trace") else logging.INFO)
logging.basicConfig()
logger.info("device_name: %s", device_name)
logger.info("endpoint: %s", iot_endpoint)
logger.info("rootca cert: %s", rootca_file)
logger.info("private key: %s", private_key_file)
logger.info("certificate: %s", certificate_file)
# Spin up resources
event_loop_group = io.EventLoopGroup(1)
host_resolver = io.DefaultHostResolver(event_loop_group)
client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
mqtt_connection = mqtt_connection_builder.mtls_from_path(
endpoint=iot_endpoint,
cert_filepath=certificate_file,
pri_key_filepath=private_key_file,
client_bootstrap=client_bootstrap,
ca_filepath=rootca_file,
client_id=device_name,
clean_session=False,
keep_alive_secs=KEEP_ALIVE,
)
connected_future = mqtt_connection.connect()
connected_future.result()
# Start sending dummy data
topic = init_dict["topic"]
logging.info("topic: %s", topic)
while True:
now = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
tmp = 20 + random.randint(-5, 5)
hum = 50 + random.randint(-10, 10)
payload = {
"DEVICE_NAME": device_name,
"TIMESTAMP": now,
"TEMPERATURE": tmp,
"HUMIDITY": hum,
}
logger.info(" payload: %s", payload)
mqtt_connection.publish(
topic=topic, payload=json.dumps(payload), qos=mqtt.QoS.AT_LEAST_ONCE
)
time.sleep(wait_time)
def exit_sample(msg_or_exception):
"""
Exit sample with cleaning
Parameters
----------
msg_or_exception: str or Exception
"""
if isinstance(msg_or_exception, Exception):
logger.error("Exiting sample due to exception.")
traceback.print_exception(msg_or_exception.__class__, msg_or_exception, sys.exc_info()[2])
else:
logger.info("Exiting: %s", msg_or_exception)
if not mqtt_connection:
logger.info("Disconnecting...")
mqtt_connection.disconnect()
sys.exit(0)
def exit_handler(_signal, frame):
"""
Exit sample
"""
exit_sample(" Key abort")
if __name__ == "__main__":
signal.signal(signal.SIGINT, exit_handler)
args = parse_args()
try:
configs = load_config(args.config, verbosity_override=args.verbosity)
except Exception as e:
logger.error("Failed to load config: %s", e)
sys.exit(1)
if len(configs) == 1:
device_main(configs[0])
else:
processes = []
for init_dict in configs:
p = multiprocessing.Process(target=device_main, args=(init_dict,))
p.start()
processes.append(p)
try:
for p in processes:
p.join()
except KeyboardInterrupt:
for p in processes:
p.terminate()
for p in processes:
p.join()
設定ファイルのパスを --config で指定して実行します(省略時はカレントディレクトリの devices.json を参照)。
実行コマンド
python dummy-devices.py --config devices.json
- 設定に1台だけ書いてある場合は、その1台がメインプロセスで動作します。
- 複数台書いてある場合は、デバイスごとに子プロセスが起動し、それぞれが IoT Core に接続してテレメトリを送信します。
実行時のイメージ

4. 実行結果
マネジメントコンソールの MQTTテストクライアント で確認したパブリッシュの結果が下記です。
せっかくトピックも設定できるようにしたので、ワイルドカードでフィルタしてみました。
最初、ワイルドカードの指定方法を間違っており、パブリッシュが確認出来ずというアクシデントがありましたが、無事成功してました。


あとがき
簡単にではありますがこのスクリプトで複数デバイス分の挙動を再現できるので、自分のように IoT Core 入門レベルの方がサービスや設計のイメージを掴む分には役に立つかなと思っています。
今回デバイスを手動で登録しましたが、面倒だったので今度は一括登録なども試してみたいと思っています。
以上、くろすけでした!






