[AWS IoT Core Device Location] 今、何処で動作しているのかをデバイスに聞いてみた
1 はじめに
CX 事業本部 delivery部の平内(SIN)です。
昨年の、AWS re:Invent 2023 で公開された、AWS IoT Core Device Location (以下、Device Location)を使用すると、サードパーティーのソルバーを使用して IoT デバイスの位置を取得する事ができます。
ソースは、「Wifi アクセスポイント」「セルラー無線タワー」「IP アドレス」及び、「GNSS スキャンデータ」の4種類となっており、単体もしくは、組み合わせでリクエストできます。
https://docs.aws.amazon.com/ja_jp/iot/latest/developerguide/device-location.html
今回は、「Wifi アクセスポイント」の情報を使用して、デバイスの位置を取得してみました。
下記は、動作している様子です。
左の黒色のコンソールは、設置されたデバイス(RaspberryPI)で動作しているプログラムです。そして、右の青色のコンソールが、デバイスの位置を確認しようとしているプログラムです。 デバイスへの指示は、MQTTメッセージで行われ、デバイス側は、リクエストを受けると、現在の「Wifiアクセスポイント」の情報を収集して、MQTTメッセージで返事します。メッセージブローカーを介して、得られたこの情報をもとに、Device Location で、デバイスが設置された地点の「経緯度」を取得し、その「経緯度」から、Reverse Geocodingで「住所」を検索しています。
2 構成
位置を取得する対象は、MQTTで何らかの通信を行っているデバイスをイメージしており、RaspberryPIで実装されています。
デバイスから位置情報を取得する手順は、以下の通りです。
① デバイスに対して情報取得のコマンドを送信します
② コマンドを受け取ったデバイスは、周辺のWiFiアクセスポイントの情報を取集します
③ 収集した情報をMQTT経由でレスポンスします
④ Device Locationへ、WiFiアクセスポイントの情報を送信して、経緯度を取得します
⑤ Reverse Geocodingへ、経緯度を送信して、住所情報を取得します
①のコマンドは、どのような形式でも関係ないのですが、ここでは、下記のようなメッセージを使用しています。
Topic: command/device_name Payload: location
3 デバイスの配置
デバイスの配置場所は、近所のイオンのフードコートです。
iPhoneで確認した位置情報は、以下の通りでしたが、Device Location で取得できたものと、ほとんど相違ないイメージです。
4 iwlist
Wifiアクセスポイントの情報取得には、iwlist コマンドを使用しています。
iwlist は、ワイヤレスネットワークインタフェースの詳細情報を取得するコマンドです。
構文
iwlist [インタフェース名] [パラメータ]
パラメータ | 説明 |
---|---|
scan | スキャン |
channel | チャンネルの表示 |
rate | 通信速度の表示 |
下記のように使用する事で、詳細な情報を取得できますが、ここから Address及び、Signal level だけを取り出しています。
$ sudo iwlist wlan0 scan wlan0 Scan completed : Cell 01 - Address: xx:xx:xx:xx:xx:xx Channel:1 Frequency:2.412 GHz (Channel 1) Quality=63/70 Signal level=-47 dBm Encryption key:on ESSID:"xxxx" ・・・省略・・・
プログラムは、以下のようになっています。
def getWifi(): cmd = "sudo iwlist wlan0 scan" res = subprocess.run( cmd.split(" "), capture_output=True, text=True, ) lines = res.stdout.split("\n") addressList = [] signalList = [] for i, line in enumerate(lines): if "Address:" in line: # Cell 01 - Address: xx:xx:xx:xx:xx:xx tmp = line.split("Address:") addressList.append(tmp[1].strip(" ")) if "Signal" in line: # Quality=26/70 Signal level=-84 dBm tmp = line.split("level=") # -84 dBm tmp = tmp[1].split("dBm") signalList.append(tmp[0].strip(" ")) timestamp = time.time() response = {"Timestamp": timestamp, "WiFiAccessPoints": []} for i, address in enumerate(addressList): response["WiFiAccessPoints"].append( {"MacAddress": address, "Rss": int(signalList[i])} ) return response
編集されたアクアセスポイントの情報は、MQTTで送信されます。
5 Device Location API
受け取ったアクセスポイントの情報は、GetPositionEstimateに送る事で、位置情報が GeoJSONペイロードで受け取れます。
今回は、ここ、boto3のget_position_estimateで実装されています。
iotwireless = boto3.client("iotwireless") response = iotwireless.get_position_estimate( WiFiAccessPoints=wifi_access_points, Timestamp=timestamp ) geoJsonPayload = json.loads(response["GeoJsonPayload"].read()) coordinates = geoJsonPayload["coordinates"] longitude = coordinates[0] latitude = coordinates[1]
6 Reverse Geocoding
GetPositionEstimateで取得できた経緯度から、住所を取得するには、Reverse Geocodingを使用しました。
url = ( "https://nominatim.openstreetmap.org/reverse?lat={}&lon={}&format=json".format( coordinates[1], coordinates[0] ) ) res = json.loads(requests.get(url).text) print( "country: {} ({})".format( res["address"]["country"], res["address"]["country_code"] ) ) print("province: {}".format(res["address"]["province"])) print("region: {}".format(res["address"]["region"])) print("city: {}".format(res["address"]["city"])) print("suburb: {}".format(res["address"]["suburb"])) print("neighbourhood: {}".format(res["address"]["neighbourhood"])) print("postcode: {}".format(res["address"]["postcode"]))
7 最後に
今回は、Device Location で、デバイスが設置された地点を表示してみました。
このような仕組みを実装しておくと、分散配置されたIoTデバイスが、今、どこで動作しているのかいつでも確認できるようになります。
定期的に実行して、想定外の移動が行われていないかの確認も可能かも知れません。
動画で動作していたソースコードは、以下です。説明が不足している部分については、こちらを参照頂ければ幸いです。
Raspiで動作しているコード
index.py
import time import json import subprocess from mqtt import Mqtt response = None def getWifi(): cmd = "sudo iwlist wlan0 scan" res = subprocess.run( cmd.split(" "), capture_output=True, text=True, ) lines = res.stdout.split("\n") addressList = [] signalList = [] for i, line in enumerate(lines): if "Address:" in line: # Cell 01 - Address: xx:xx:xx:xx:xx:xx tmp = line.split("Address:") addressList.append(tmp[1].strip(" ")) if "Signal" in line: # Quality=26/70 Signal level=-84 dBm tmp = line.split("level=") # -84 dBm tmp = tmp[1].split("dBm") signalList.append(tmp[0].strip(" ")) timestamp = time.time() response = {"Timestamp": timestamp, "WiFiAccessPoints": []} for i, address in enumerate(addressList): response["WiFiAccessPoints"].append( {"MacAddress": address, "Rss": int(signalList[i])} ) return response def on_receive(publish_packet_data): publish_packet = publish_packet_data.publish_packet command = publish_packet.payload.decode() print("Received the command [{}]".format(command)) if command == "location": global response response = getWifi() if __name__ == "__main__": endpoint = "xxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com" port = 8883 client_id = "0001" subscribe_topic = "command/{}".format(client_id) publish_topic = "sensor/{}".format(client_id) mqtt = Mqtt(endpoint, port, client_id, subscribe_topic, on_receive) while True: if response != None: print("Send a response") mqtt.publish(publish_topic, json.dumps(response)) response = None time.sleep(1) # Disconnect mqtt.stop()
mqtt.py
import os from concurrent.futures import Future from awsiot import mqtt5_client_builder from awscrt import mqtt5 class Mqtt: future_connection_success = Future() def __init__(self, endpoint, port, client_id, subscribe_topic, on_recv): dir = os.path.dirname(os.path.abspath(__file__)) certs = { "cafile": "{}/certs/AmazonRootCA1.pem".format(dir), "certfile": "{}/certs/client-cert.pem".format(dir), "keyfile": "{}/certs/private-key.pem".format(dir), } self.TIMEOUT = 100 self.client = mqtt5_client_builder.mtls_from_path( endpoint=endpoint, port=port, cert_filepath=certs["certfile"], pri_key_filepath=certs["keyfile"], ca_filepath=certs["cafile"], on_publish_received=on_recv, on_lifecycle_connection_success=self.on_lifecycle_connection_success, client_id=client_id, ) self.client.start() lifecycle_connect_success_data = self.future_connection_success.result( self.TIMEOUT ) connack_packet = lifecycle_connect_success_data.connack_packet negotiated_settings = lifecycle_connect_success_data.negotiated_settings # Subscribe subscribe_future = self.client.subscribe( subscribe_packet=mqtt5.SubscribePacket( subscriptions=[ mqtt5.Subscription( topic_filter=subscribe_topic, qos=mqtt5.QoS.AT_LEAST_ONCE ) ] ) ) subscribe_future.result(self.TIMEOUT) def on_lifecycle_connection_success( self, lifecycle_connect_success_data: mqtt5.LifecycleConnectSuccessData, ): self.future_connection_success.set_result(lifecycle_connect_success_data) def publish(self, publish_topic, payload): publish_future = self.client.publish( mqtt5.PublishPacket( topic=publish_topic, payload=payload, qos=mqtt5.QoS.AT_LEAST_ONCE, ) ) publish_completion_data = publish_future.result(self.TIMEOUT) def stop(self): self.client.stop()
MacOSで動作しているコード
command.py
import time import json import requests import boto3 from mqtt import Mqtt life = True def get_position(timestamp, wifi_access_points): iotwireless = boto3.client("iotwireless") response = iotwireless.get_position_estimate( WiFiAccessPoints=wifi_access_points, Timestamp=timestamp ) geoJsonPayload = json.loads(response["GeoJsonPayload"].read()) coordinates = geoJsonPayload["coordinates"] longitude = coordinates[0] latitude = coordinates[1] print("latitude:{}".format(latitude)) print("longitude:{}".format(longitude)) url = ( "https://nominatim.openstreetmap.org/reverse?lat={}&lon={}&format=json".format( coordinates[1], coordinates[0] ) ) res = json.loads(requests.get(url).text) print( "country: {} ({})".format( res["address"]["country"], res["address"]["country_code"] ) ) print("province: {}".format(res["address"]["province"])) print("region: {}".format(res["address"]["region"])) print("city: {}".format(res["address"]["city"])) print("suburb: {}".format(res["address"]["suburb"])) print("neighbourhood: {}".format(res["address"]["neighbourhood"])) print("postcode: {}".format(res["address"]["postcode"])) def on_receive(publish_packet_data): publish_packet = publish_packet_data.publish_packet payload = json.loads(publish_packet.payload.decode()) print("payload: {}".format(payload)) get_position(payload["Timestamp"], payload["WiFiAccessPoints"]) global life life = False if __name__ == "__main__": endpoint = "xxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com" port = 8883 server_id = "server" client_id = "0001" subscribe_topic = "sensor/{}".format(client_id) publish_topic = "command/{}".format(client_id) mqtt = Mqtt(endpoint, port, server_id, subscribe_topic, on_receive) mqtt.publish(publish_topic, "location") while life: time.sleep(1)
8 参考リンク
AWS IoT Core Device Location
新しい AWS IoT Core Device Location 機能の使い方