[AWS IoT Core] トピックの特定桁の文字を対象にルールを設定してみました
1 はじめに
CX 事業本部のデリバリー部の平内(SIN)です。
AWS IoT Core の ルールでは、SQL 構文で、特定のトピックを指定して、アクションを設定することができます。
例としてよく見られるのは、ワイルドカード「#」、「+」を使用したものでしょう。
SELECT * FROM "sensor/#" SELECT * FROM "some/+/topic"
参考:https://docs.aws.amazon.com/ja_jp/iot/latest/developerguide/iot-sql-from.html
FROM 句で指定できるワイルドカードは、トピックのセグメント単位もしくは、後方一致のみですが、WHERE 句でさらに条件を指定することで、トピック名の特定の文字を条件に指定することも可能です。
今回は、WHERE 句を使用して、トピックの特定の桁番目の文字を条件としてルール設定を試してみました。
2 トピック仕様と条件
サンプルとして、以下のようなトピック仕様を考えてみます。
最初のセグメントは、"sensor" 固定です。そして、2番目のセグメントは、アルファベット 2 桁と数値 4 桁をハイフンで繋いだデバイス ID です。
sensor/AA-0000 sensor/AA-0001 sensor/AA-0003 ... sensor/ZZ-9999
そして、ルールで指定したい条件は、「数値部分が、1,000 番台のみ」というものです。
sensor/AA-0010 sensor/AC-1000 <= hit sensor/AZ-1010 <= hit sensor/SS-2000 sensor/QZ-1333 <= hit sensor/ZA-3300 sensor/ZC-0111 sensor/ZZ-1110 <= hit sensor/ZZ-9000
3 SQL
「数値部分が、1,000 番台のみ」という条件にマッチさせる SQL は、以下の通りとなりました。
SELECT * FROM "sensor/#" WHERE substring(topic(2), 3,4) = "1"
topic(2) で、トピックのうち 2 番目のセグメントを取得しています。
sensor/AC-1234 -> AC-1234
次に、substring(String,3,4) で、3 番目以降、かつ、4 番目以前の文字を抽出しています。
AC-1234 -> 1
参考: AWS IoT Core - デベロッパーガイド - 関数
ルールには、このSQLを指定して、アクションで CloudWatchLogs へ出力するようにしました。
4 動作確認
(1) デバイス
動作確認のためのデバイスを模擬するプログラムを作成しました。 仕様に基づくデバイス ID をランダムに生成して、100 件のデータを送信しています。
import os import time import json import random from concurrent.futures import Future from awsiot import mqtt5_client_builder from awscrt import mqtt5 endpoint = "xxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com" port = 8883 dir = os.path.dirname(os.path.abspath(__file__)) certs = { "cafile": "{}/certs/AmazonRootCA1.pem".format(dir), "certfile": "{}/certs/cert.pem".format(dir), "keyfile": "{}/certs/private-key.pem".format(dir), } client_id = "sample_device" TIMEOUT = 100 future_connection_success = Future() def on_lifecycle_connection_success(lifecycle_connect_success_data: mqtt5.LifecycleConnectSuccessData): print("on_connect_success") global future_connection_success future_connection_success.set_result(lifecycle_connect_success_data) def create_device_id(): ascii = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ' return "{}{}-{}".format( ascii[random.randint(0,25)], ascii[random.randint(0,25)], str(random.randint(1, 9999)).zfill(4)) if __name__ == '__main__': client = mqtt5_client_builder.mtls_from_path( endpoint=endpoint, port=port, cert_filepath=certs["certfile"], pri_key_filepath=certs["keyfile"], ca_filepath=certs["cafile"], on_lifecycle_connection_success=on_lifecycle_connection_success, client_id=client_id) client.start() lifecycle_connect_success_data = future_connection_success.result(TIMEOUT) connack_packet = lifecycle_connect_success_data.connack_packet negotiated_settings = lifecycle_connect_success_data.negotiated_settings for i in range(100): # Publish device_id = create_device_id() publish_future = client.publish(mqtt5.PublishPacket( topic = "sensor/{}".format(device_id), payload = json.dumps({ "device_id": device_id }), )) publish_completion_data = publish_future.result(TIMEOUT) time.sleep(0.1) # Disconnect client.stop()
(2) MQTT テストクライアント
上記のプログラムを実行すると、ランダムなデバイス ID で、100 件のデータが到着します。
(3) CludWatch Logs
アクションで指定したロググループを確認すると、条件にヒットしたもの(数値部分が 1,000 番台のもの)だけが記録されていました。
5 正規表現等による条件指定
ここまでの作業で、特定の桁の文字を条件に設定してみましたが、ルールで使用できる関数には、正規表現を扱えるものや、文字列を数値として扱うための関数などが多数用意されています。
例えば、先のように数値、1,000 番台が対象でも、ハイフンの前の英文字の桁数が一定でない場合、正規表現で数値部分を検出する必要が出てきます。
下記の WHERE 句では、ハイフンの前の英文字が 1 桁以上に対応できます。
SELECT * FROM "sensor/#" WHERE substring(regexp_replace(topic(2), "[A-Z]{1,}-([0-9]{4})", "$1"), 0,1) = "1"
また、1,000 番台ではなく、「500 以下」のようなものにも対応可能です。
SELECT * FROM "sensor/#" WHERE cast(regexp_replace(topic(2), "[A-Z]{1,}-([0-9]{4})", "$1") as Int) < 500
関数を使用すると、非常に多様な条件を設定できますが、設定したステートメントが、意図したものになっているかどうかを確認するには、SELECT 句に入れて、その内容を確認すると分かりやすいかもしれません。
下記では、1 文字以上の英文字+ハイフンの後ろの4桁の数値が、うまく検出できているかどかを確認しています。
SELECT regexp_replace(topic(2), "[A-Z]{1,}-([0-9]{4})", "$1") AS TARGET,* FROM "sensor/#"
6 最後に
今回は、ワイルドカードで表現しきれないトピックの指定方法を試してみました。
開発中に、特定のトピックだけログ取得したり、特定のデバイスだけを対象にロジックを入れたりする必要があった時、この辺の指定に慣れておくと、結構有効かもしれません。
なお、WHEREで使用した条件によって、アクションが実行されなくても、ルールにヒットした事で料金は発生すると思いますので、この点には、ちょっと注意が必要だと思います。
https://aws.amazon.com/jp/iot-core/pricing/?nc1=h_ls