1 はじめに
CX 事業本部 delivery部の平内(SIN)です。
昨年の、AWS re:Invent 2023 で公開された、AWS IoT Core Device Location (以下、Device Location)を使用すると、サードパーティーのソルバーを使用して IoT デバイスの位置を取得する事ができます。
ソースは、「Wifi アクセスポイント」「セルラー無線タワー」「IP アドレス」及び、「GNSS スキャンデータ」の4種類となっており、単体もしくは、組み合わせでリクエストできます。
https://docs.aws.amazon.com/ja_jp/iot/latest/developerguide/device-location.html
今回は、「Wifi アクセスポイント」の情報を使用して、デバイスの位置を取得してみました。
下記は、動作している様子です。
左の黒色のコンソールは、設置されたデバイス(RaspberryPI)で動作しているプログラムです。そして、右の青色のコンソールが、デバイスの位置を確認しようとしているプログラムです。 デバイスへの指示は、MQTTメッセージで行われ、デバイス側は、リクエストを受けると、現在の「Wifiアクセスポイント」の情報を収集して、MQTTメッセージで返事します。メッセージブローカーを介して、得られたこの情報をもとに、Device Location で、デバイスが設置された地点の「経緯度」を取得し、その「経緯度」から、Reverse Geocodingで「住所」を検索しています。
2 構成
位置を取得する対象は、MQTTで何らかの通信を行っているデバイスをイメージしており、RaspberryPIで実装されています。
デバイスから位置情報を取得する手順は、以下の通りです。
① デバイスに対して情報取得のコマンドを送信します
② コマンドを受け取ったデバイスは、周辺のWiFiアクセスポイントの情報を取集します
③ 収集した情報をMQTT経由でレスポンスします
④ Device Locationへ、WiFiアクセスポイントの情報を送信して、経緯度を取得します
⑤ Reverse Geocodingへ、経緯度を送信して、住所情報を取得します
①のコマンドは、どのような形式でも関係ないのですが、ここでは、下記のようなメッセージを使用しています。
Topic: command/device_name
Payload: location
3 デバイスの配置
デバイスの配置場所は、近所のイオンのフードコートです。
iPhoneで確認した位置情報は、以下の通りでしたが、Device Location で取得できたものと、ほとんど相違ないイメージです。
4 iwlist
Wifiアクセスポイントの情報取得には、iwlist コマンドを使用しています。
iwlist は、ワイヤレスネットワークインタフェースの詳細情報を取得するコマンドです。
構文
iwlist [インタフェース名] [パラメータ]
パラメータ | 説明 |
---|---|
scan | スキャン |
channel | チャンネルの表示 |
rate | 通信速度の表示 |
下記のように使用する事で、詳細な情報を取得できますが、ここから Address及び、Signal level だけを取り出しています。
$ sudo iwlist wlan0 scan
wlan0 Scan completed :
Cell 01 - Address: xx:xx:xx:xx:xx:xx
Channel:1
Frequency:2.412 GHz (Channel 1)
Quality=63/70 Signal level=-47 dBm
Encryption key:on
ESSID:"xxxx"
・・・省略・・・
プログラムは、以下のようになっています。
def getWifi():
cmd = "sudo iwlist wlan0 scan"
res = subprocess.run(
cmd.split(" "),
capture_output=True,
text=True,
)
lines = res.stdout.split("\n")
addressList = []
signalList = []
for i, line in enumerate(lines):
if "Address:" in line:
# Cell 01 - Address: xx:xx:xx:xx:xx:xx
tmp = line.split("Address:")
addressList.append(tmp[1].strip(" "))
if "Signal" in line:
# Quality=26/70 Signal level=-84 dBm
tmp = line.split("level=")
# -84 dBm
tmp = tmp[1].split("dBm")
signalList.append(tmp[0].strip(" "))
timestamp = time.time()
response = {"Timestamp": timestamp, "WiFiAccessPoints": []}
for i, address in enumerate(addressList):
response["WiFiAccessPoints"].append(
{"MacAddress": address, "Rss": int(signalList[i])}
)
return response
編集されたアクアセスポイントの情報は、MQTTで送信されます。
5 Device Location API
受け取ったアクセスポイントの情報は、GetPositionEstimateに送る事で、位置情報が GeoJSONペイロードで受け取れます。
今回は、ここ、boto3のget_position_estimateで実装されています。
iotwireless = boto3.client("iotwireless")
response = iotwireless.get_position_estimate(
WiFiAccessPoints=wifi_access_points, Timestamp=timestamp
)
geoJsonPayload = json.loads(response["GeoJsonPayload"].read())
coordinates = geoJsonPayload["coordinates"]
longitude = coordinates[0]
latitude = coordinates[1]
6 Reverse Geocoding
GetPositionEstimateで取得できた経緯度から、住所を取得するには、Reverse Geocodingを使用しました。
url = (
"https://nominatim.openstreetmap.org/reverse?lat={}&lon={}&format=json".format(
coordinates[1], coordinates[0]
)
)
res = json.loads(requests.get(url).text)
print(
"country: {} ({})".format(
res["address"]["country"], res["address"]["country_code"]
)
)
print("province: {}".format(res["address"]["province"]))
print("region: {}".format(res["address"]["region"]))
print("city: {}".format(res["address"]["city"]))
print("suburb: {}".format(res["address"]["suburb"]))
print("neighbourhood: {}".format(res["address"]["neighbourhood"]))
print("postcode: {}".format(res["address"]["postcode"]))
7 最後に
今回は、Device Location で、デバイスが設置された地点を表示してみました。
このような仕組みを実装しておくと、分散配置されたIoTデバイスが、今、どこで動作しているのかいつでも確認できるようになります。
定期的に実行して、想定外の移動が行われていないかの確認も可能かも知れません。
動画で動作していたソースコードは、以下です。説明が不足している部分については、こちらを参照頂ければ幸いです。
Raspiで動作しているコード
index.py
import time
import json
import subprocess
from mqtt import Mqtt
response = None
def getWifi():
cmd = "sudo iwlist wlan0 scan"
res = subprocess.run(
cmd.split(" "),
capture_output=True,
text=True,
)
lines = res.stdout.split("\n")
addressList = []
signalList = []
for i, line in enumerate(lines):
if "Address:" in line:
# Cell 01 - Address: xx:xx:xx:xx:xx:xx
tmp = line.split("Address:")
addressList.append(tmp[1].strip(" "))
if "Signal" in line:
# Quality=26/70 Signal level=-84 dBm
tmp = line.split("level=")
# -84 dBm
tmp = tmp[1].split("dBm")
signalList.append(tmp[0].strip(" "))
timestamp = time.time()
response = {"Timestamp": timestamp, "WiFiAccessPoints": []}
for i, address in enumerate(addressList):
response["WiFiAccessPoints"].append(
{"MacAddress": address, "Rss": int(signalList[i])}
)
return response
def on_receive(publish_packet_data):
publish_packet = publish_packet_data.publish_packet
command = publish_packet.payload.decode()
print("Received the command [{}]".format(command))
if command == "location":
global response
response = getWifi()
if __name__ == "__main__":
endpoint = "xxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com"
port = 8883
client_id = "0001"
subscribe_topic = "command/{}".format(client_id)
publish_topic = "sensor/{}".format(client_id)
mqtt = Mqtt(endpoint, port, client_id, subscribe_topic, on_receive)
while True:
if response != None:
print("Send a response")
mqtt.publish(publish_topic, json.dumps(response))
response = None
time.sleep(1)
# Disconnect
mqtt.stop()
mqtt.py
import os
from concurrent.futures import Future
from awsiot import mqtt5_client_builder
from awscrt import mqtt5
class Mqtt:
future_connection_success = Future()
def __init__(self, endpoint, port, client_id, subscribe_topic, on_recv):
dir = os.path.dirname(os.path.abspath(__file__))
certs = {
"cafile": "{}/certs/AmazonRootCA1.pem".format(dir),
"certfile": "{}/certs/client-cert.pem".format(dir),
"keyfile": "{}/certs/private-key.pem".format(dir),
}
self.TIMEOUT = 100
self.client = mqtt5_client_builder.mtls_from_path(
endpoint=endpoint,
port=port,
cert_filepath=certs["certfile"],
pri_key_filepath=certs["keyfile"],
ca_filepath=certs["cafile"],
on_publish_received=on_recv,
on_lifecycle_connection_success=self.on_lifecycle_connection_success,
client_id=client_id,
)
self.client.start()
lifecycle_connect_success_data = self.future_connection_success.result(
self.TIMEOUT
)
connack_packet = lifecycle_connect_success_data.connack_packet
negotiated_settings = lifecycle_connect_success_data.negotiated_settings
# Subscribe
subscribe_future = self.client.subscribe(
subscribe_packet=mqtt5.SubscribePacket(
subscriptions=[
mqtt5.Subscription(
topic_filter=subscribe_topic, qos=mqtt5.QoS.AT_LEAST_ONCE
)
]
)
)
subscribe_future.result(self.TIMEOUT)
def on_lifecycle_connection_success(
self,
lifecycle_connect_success_data: mqtt5.LifecycleConnectSuccessData,
):
self.future_connection_success.set_result(lifecycle_connect_success_data)
def publish(self, publish_topic, payload):
publish_future = self.client.publish(
mqtt5.PublishPacket(
topic=publish_topic,
payload=payload,
qos=mqtt5.QoS.AT_LEAST_ONCE,
)
)
publish_completion_data = publish_future.result(self.TIMEOUT)
def stop(self):
self.client.stop()
MacOSで動作しているコード
command.py
import time
import json
import requests
import boto3
from mqtt import Mqtt
life = True
def get_position(timestamp, wifi_access_points):
iotwireless = boto3.client("iotwireless")
response = iotwireless.get_position_estimate(
WiFiAccessPoints=wifi_access_points, Timestamp=timestamp
)
geoJsonPayload = json.loads(response["GeoJsonPayload"].read())
coordinates = geoJsonPayload["coordinates"]
longitude = coordinates[0]
latitude = coordinates[1]
print("latitude:{}".format(latitude))
print("longitude:{}".format(longitude))
url = (
"https://nominatim.openstreetmap.org/reverse?lat={}&lon={}&format=json".format(
coordinates[1], coordinates[0]
)
)
res = json.loads(requests.get(url).text)
print(
"country: {} ({})".format(
res["address"]["country"], res["address"]["country_code"]
)
)
print("province: {}".format(res["address"]["province"]))
print("region: {}".format(res["address"]["region"]))
print("city: {}".format(res["address"]["city"]))
print("suburb: {}".format(res["address"]["suburb"]))
print("neighbourhood: {}".format(res["address"]["neighbourhood"]))
print("postcode: {}".format(res["address"]["postcode"]))
def on_receive(publish_packet_data):
publish_packet = publish_packet_data.publish_packet
payload = json.loads(publish_packet.payload.decode())
print("payload: {}".format(payload))
get_position(payload["Timestamp"], payload["WiFiAccessPoints"])
global life
life = False
if __name__ == "__main__":
endpoint = "xxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com"
port = 8883
server_id = "server"
client_id = "0001"
subscribe_topic = "sensor/{}".format(client_id)
publish_topic = "command/{}".format(client_id)
mqtt = Mqtt(endpoint, port, server_id, subscribe_topic, on_receive)
mqtt.publish(publish_topic, "location")
while life:
time.sleep(1)
8 参考リンク
AWS IoT Core Device Location
新しい AWS IoT Core Device Location 機能の使い方