[AWS IoT Core Device Location] 今、何処で動作しているのかをデバイスに聞いてみた

2023.07.02

1 はじめに

CX 事業本部 delivery部の平内(SIN)です。

昨年の、AWS re:Invent 2023 で公開された、AWS IoT Core Device Location (以下、Device Location)を使用すると、サードパーティーのソルバーを使用して IoT デバイスの位置を取得する事ができます。

ソースは、「Wifi アクセスポイント」「セルラー無線タワー」「IP アドレス」及び、「GNSS スキャンデータ」の4種類となっており、単体もしくは、組み合わせでリクエストできます。


https://docs.aws.amazon.com/ja_jp/iot/latest/developerguide/device-location.html

今回は、「Wifi アクセスポイント」の情報を使用して、デバイスの位置を取得してみました。

下記は、動作している様子です。

左の黒色のコンソールは、設置されたデバイス(RaspberryPI)で動作しているプログラムです。そして、右の青色のコンソールが、デバイスの位置を確認しようとしているプログラムです。 デバイスへの指示は、MQTTメッセージで行われ、デバイス側は、リクエストを受けると、現在の「Wifiアクセスポイント」の情報を収集して、MQTTメッセージで返事します。メッセージブローカーを介して、得られたこの情報をもとに、Device Location で、デバイスが設置された地点の「経緯度」を取得し、その「経緯度」から、Reverse Geocodingで「住所」を検索しています。

2 構成

位置を取得する対象は、MQTTで何らかの通信を行っているデバイスをイメージしており、RaspberryPIで実装されています。

デバイスから位置情報を取得する手順は、以下の通りです。
① デバイスに対して情報取得のコマンドを送信します
② コマンドを受け取ったデバイスは、周辺のWiFiアクセスポイントの情報を取集します
③ 収集した情報をMQTT経由でレスポンスします
④ Device Locationへ、WiFiアクセスポイントの情報を送信して、経緯度を取得します
⑤ Reverse Geocodingへ、経緯度を送信して、住所情報を取得します

①のコマンドは、どのような形式でも関係ないのですが、ここでは、下記のようなメッセージを使用しています。

Topic: command/device_name
Payload: location

3 デバイスの配置

デバイスの配置場所は、近所のイオンのフードコートです。

iPhoneで確認した位置情報は、以下の通りでしたが、Device Location で取得できたものと、ほとんど相違ないイメージです。

4 iwlist

Wifiアクセスポイントの情報取得には、iwlist コマンドを使用しています。

iwlist は、ワイヤレスネットワークインタフェースの詳細情報を取得するコマンドです。

構文

iwlist [インタフェース名] [パラメータ]
パラメータ 説明
scan スキャン
channel チャンネルの表示
rate 通信速度の表示

下記のように使用する事で、詳細な情報を取得できますが、ここから Address及び、Signal level だけを取り出しています。

$ sudo iwlist wlan0 scan
wlan0     Scan completed :
          Cell 01 - Address: xx:xx:xx:xx:xx:xx
                    Channel:1
                    Frequency:2.412 GHz (Channel 1)
                    Quality=63/70  Signal level=-47 dBm
                    Encryption key:on
                    ESSID:"xxxx"
・・・省略・・・

プログラムは、以下のようになっています。

def getWifi():
    cmd = "sudo iwlist wlan0 scan"
    res = subprocess.run(
        cmd.split(" "),
        capture_output=True,
        text=True,
    )
    lines = res.stdout.split("\n")
    addressList = []
    signalList = []
    for i, line in enumerate(lines):
        if "Address:" in line:
            # Cell 01 - Address: xx:xx:xx:xx:xx:xx
            tmp = line.split("Address:")
            addressList.append(tmp[1].strip(" "))
        if "Signal" in line:
            # Quality=26/70  Signal level=-84 dBm
            tmp = line.split("level=")
            #  -84 dBm
            tmp = tmp[1].split("dBm")
            signalList.append(tmp[0].strip(" "))

    timestamp = time.time()
    response = {"Timestamp": timestamp, "WiFiAccessPoints": []}
    for i, address in enumerate(addressList):
        response["WiFiAccessPoints"].append(
            {"MacAddress": address, "Rss": int(signalList[i])}
        )
    return response

編集されたアクアセスポイントの情報は、MQTTで送信されます。

5 Device Location API

受け取ったアクセスポイントの情報は、GetPositionEstimateに送る事で、位置情報が GeoJSONペイロードで受け取れます。

今回は、ここ、boto3のget_position_estimateで実装されています。

iotwireless = boto3.client("iotwireless")
response = iotwireless.get_position_estimate(
    WiFiAccessPoints=wifi_access_points, Timestamp=timestamp
)
geoJsonPayload = json.loads(response["GeoJsonPayload"].read())
coordinates = geoJsonPayload["coordinates"]

longitude = coordinates[0]
latitude = coordinates[1]

6 Reverse Geocoding

GetPositionEstimateで取得できた経緯度から、住所を取得するには、Reverse Geocodingを使用しました。

url = (
    "https://nominatim.openstreetmap.org/reverse?lat={}&lon={}&format=json".format(
        coordinates[1], coordinates[0]
    )
)
res = json.loads(requests.get(url).text)
print(
    "country: {} ({})".format(
        res["address"]["country"], res["address"]["country_code"]
    )
)
print("province: {}".format(res["address"]["province"]))
print("region: {}".format(res["address"]["region"]))
print("city: {}".format(res["address"]["city"]))
print("suburb: {}".format(res["address"]["suburb"]))
print("neighbourhood: {}".format(res["address"]["neighbourhood"]))
print("postcode: {}".format(res["address"]["postcode"]))

7 最後に

今回は、Device Location で、デバイスが設置された地点を表示してみました。

このような仕組みを実装しておくと、分散配置されたIoTデバイスが、今、どこで動作しているのかいつでも確認できるようになります。

定期的に実行して、想定外の移動が行われていないかの確認も可能かも知れません。

動画で動作していたソースコードは、以下です。説明が不足している部分については、こちらを参照頂ければ幸いです。

Raspiで動作しているコード

index.py
import time
import json
import subprocess
from mqtt import Mqtt

response = None


def getWifi():
    cmd = "sudo iwlist wlan0 scan"
    res = subprocess.run(
        cmd.split(" "),
        capture_output=True,
        text=True,
    )
    lines = res.stdout.split("\n")
    addressList = []
    signalList = []
    for i, line in enumerate(lines):
        if "Address:" in line:
            # Cell 01 - Address: xx:xx:xx:xx:xx:xx
            tmp = line.split("Address:")
            addressList.append(tmp[1].strip(" "))
        if "Signal" in line:
            # Quality=26/70  Signal level=-84 dBm
            tmp = line.split("level=")
            #  -84 dBm
            tmp = tmp[1].split("dBm")
            signalList.append(tmp[0].strip(" "))

    timestamp = time.time()
    response = {"Timestamp": timestamp, "WiFiAccessPoints": []}
    for i, address in enumerate(addressList):
        response["WiFiAccessPoints"].append(
            {"MacAddress": address, "Rss": int(signalList[i])}
        )
    return response


def on_receive(publish_packet_data):
    publish_packet = publish_packet_data.publish_packet
    command = publish_packet.payload.decode()
    print("Received the command [{}]".format(command))
    if command == "location":
        global response
        response = getWifi()


if __name__ == "__main__":
    endpoint = "xxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com"
    port = 8883

    client_id = "0001"
    subscribe_topic = "command/{}".format(client_id)
    publish_topic = "sensor/{}".format(client_id)

    mqtt = Mqtt(endpoint, port, client_id, subscribe_topic, on_receive)
    while True:
        if response != None:
            print("Send a response")
            mqtt.publish(publish_topic, json.dumps(response))
            response = None
        time.sleep(1)
    # Disconnect
    mqtt.stop()
mqtt.py
import os
from concurrent.futures import Future
from awsiot import mqtt5_client_builder
from awscrt import mqtt5


class Mqtt:
    future_connection_success = Future()

    def __init__(self, endpoint, port, client_id, subscribe_topic, on_recv):
        dir = os.path.dirname(os.path.abspath(__file__))
        certs = {
            "cafile": "{}/certs/AmazonRootCA1.pem".format(dir),
            "certfile": "{}/certs/client-cert.pem".format(dir),
            "keyfile": "{}/certs/private-key.pem".format(dir),
        }
        self.TIMEOUT = 100

        self.client = mqtt5_client_builder.mtls_from_path(
            endpoint=endpoint,
            port=port,
            cert_filepath=certs["certfile"],
            pri_key_filepath=certs["keyfile"],
            ca_filepath=certs["cafile"],
            on_publish_received=on_recv,
            on_lifecycle_connection_success=self.on_lifecycle_connection_success,
            client_id=client_id,
        )

        self.client.start()
        lifecycle_connect_success_data = self.future_connection_success.result(
            self.TIMEOUT
        )
        connack_packet = lifecycle_connect_success_data.connack_packet
        negotiated_settings = lifecycle_connect_success_data.negotiated_settings

        # Subscribe
        subscribe_future = self.client.subscribe(
            subscribe_packet=mqtt5.SubscribePacket(
                subscriptions=[
                    mqtt5.Subscription(
                        topic_filter=subscribe_topic, qos=mqtt5.QoS.AT_LEAST_ONCE
                    )
                ]
            )
        )
        subscribe_future.result(self.TIMEOUT)

    def on_lifecycle_connection_success(
        self,
        lifecycle_connect_success_data: mqtt5.LifecycleConnectSuccessData,
    ):
        self.future_connection_success.set_result(lifecycle_connect_success_data)

    def publish(self, publish_topic, payload):
        publish_future = self.client.publish(
            mqtt5.PublishPacket(
                topic=publish_topic,
                payload=payload,
                qos=mqtt5.QoS.AT_LEAST_ONCE,
            )
        )
        publish_completion_data = publish_future.result(self.TIMEOUT)

    def stop(self):
        self.client.stop()

MacOSで動作しているコード

command.py
import time
import json
import requests
import boto3
from mqtt import Mqtt

life = True


def get_position(timestamp, wifi_access_points):
    iotwireless = boto3.client("iotwireless")
    response = iotwireless.get_position_estimate(
        WiFiAccessPoints=wifi_access_points, Timestamp=timestamp
    )
    geoJsonPayload = json.loads(response["GeoJsonPayload"].read())
    coordinates = geoJsonPayload["coordinates"]

    longitude = coordinates[0]
    latitude = coordinates[1]

    print("latitude:{}".format(latitude))
    print("longitude:{}".format(longitude))

    url = (
        "https://nominatim.openstreetmap.org/reverse?lat={}&lon={}&format=json".format(
            coordinates[1], coordinates[0]
        )
    )
    res = json.loads(requests.get(url).text)
    print(
        "country: {} ({})".format(
            res["address"]["country"], res["address"]["country_code"]
        )
    )
    print("province: {}".format(res["address"]["province"]))
    print("region: {}".format(res["address"]["region"]))
    print("city: {}".format(res["address"]["city"]))
    print("suburb: {}".format(res["address"]["suburb"]))
    print("neighbourhood: {}".format(res["address"]["neighbourhood"]))
    print("postcode: {}".format(res["address"]["postcode"]))


def on_receive(publish_packet_data):
    publish_packet = publish_packet_data.publish_packet
    payload = json.loads(publish_packet.payload.decode())
    print("payload: {}".format(payload))

    get_position(payload["Timestamp"], payload["WiFiAccessPoints"])

    global life
    life = False


if __name__ == "__main__":
    endpoint = "xxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com"
    port = 8883

    server_id = "server"
    client_id = "0001"
    subscribe_topic = "sensor/{}".format(client_id)
    publish_topic = "command/{}".format(client_id)

    mqtt = Mqtt(endpoint, port, server_id, subscribe_topic, on_receive)
    mqtt.publish(publish_topic, "location")

    while life:
        time.sleep(1)

8 参考リンク


AWS IoT Core Device Location
新しい AWS IoT Core Device Location 機能の使い方