Raspberry Pi で USB 接続の加速度・角速度センサーを使ってモーターの振動を取得 & AWS IoT Core に送ってみた
はじめに
最近は、Amazon Monitron を使って製造設備の振動データを取得して予知保全などに活用する、といったことに取り組んでいます。
Amazon Monitron のデータ取得間隔は基本的に「1 時間に 1 度」になりますが、用途や要件によってはリアルタイムに取得したいというリクエストをいただくことがあります。
そこで今回は、製造設備の振動データをリアルタイムに取得してクラウドに転送するということをやってみました。加速度センサーはこれまで使ったことがなかったので、製造業の現場でも利用できそうなセンサーを探していた所、下記の記事にたどり着きました。
上記記事に関連して、センサー単体の利用に関する参考記事はこちらになります。
(メカトラックス株式会社様の記事になりますが、メカトラックス様より引用させていただく旨を確認済みです。大変ありがとうございます!)
こちらの記事の内容を参考にしつつ、今回の要件に合う形に修正しながらデータ取得・クラウド送信までやってみたので、その内容をご紹介したいと思います。
やりたいこと
今回やりたいことは以下のとおりです。
- センサーから振動データをリアルタイムに取得したい
- 取得したデータを AWS IoT Core に MQTT で送りたい
- スクリプト(Python) の実行環境はユーザー権限で構築したい
利用デバイス
今回利用したデバイス環境は次のとおりです。
- デバイス: Raspberry Pi 4B(4GB)
- センサー: JoyWarrior56FR1-WP(6軸加速度・角速度センサ)
- 詳細:https://www.codemercs.com/en/sensors/joywarrior56fr1-wp
- 同じサイトから購入できます。
- Python 3.9.2(OS デフォルト)
- OS環境: 以下の通り(Raspberry Pi OS を利用)
$ lsb_release -a
No LSB modules are available.
Distributor ID: Debian
Description: Debian GNU/Linux 11 (bullseye)
Release: 11
Codename: bullseye
今回利用したセンサーは、個人で購入するには少々高額だったのですが、次のような特徴があり高機能です。
- PC の USB を利用できる
- 3軸方向の加速度計および、3軸方向の角速度計(ジャイロセンサー)として使用できる
- 傾斜センサー、振動センサー、衝撃センサーとして利用できる
- 加速度センサーの測定レンジは、
±2g
,±4g
,±8g
,±3g
,±16g
から選択できる - 加速度は 16 ビットの分解能を持つので、細かい粒度で測定可能
- ジャイロセンサー(角速度センサー)の測定レンジは、
125dps
,250dps
,500dps
,1000dps
,2000dps
から選択できる - USB から給電可能(別途で電源不要)
- 磁石内臓: 鉄製の製造機器などに簡単に取り付けられる(ネジ固定も可能)
- -10 °C から +85°C の環境で動作可能
ハードウェア環境の構築は、センサーを Raspberry Pi の USB ポートに挿すだけです。Raspberry Pi の OS インストールの手順は割愛します。
実際には写真のようにセンサーをモーターに結束バンドで固定しています。本来なら接着剤やネジなどで固定するのがいいのですが、今回は動作検証なので後で取り外せるようにしています。
モーターの筐体もプラ製なので磁石が使えませんでした。
モーターは回転速度をコントローラーで制御できるものを用意しました。コントローラーのつまみを動かして振動の大きさを変えることで色々なデータが取れるようにしています。
Python 実行環境の整備
必要なライブラリのインストール
USB 接続したデバイスにアクセスするため Python の pyusb
ライブラリを使いますが、使い方を調べたところ多くのケースで root の Python 環境にインストールする手順となっていました。
今回は root の環境にはインストールせずログインユーザーの Python 環境にインストールするようにしました。
しかし、単純にログインユーザーの環境で pip install pyusb
でインストールしてもログインユーザーで USB 操作ができず、次のようなエラーが発生します。
usb.core.USBError: [Errno 13] Access denied (insufficient permissions)
そこで今回は udev
の編集を行い、ログインユーザーでも実行できるようにします。
最初にデバイス情報を確認します。(結果の一部を抜粋)
$ lsusb
Bus 001 Device 003: ID 07c0:111a Code Mercenaries Hard- und Software GmbH JoyWarrior56FR1
この結果より次のことが分かります。
- ベンダー ID:
07c0
- プロダクト ID:
111a
次に下記の内容で /etc/udev/rules.d/10-any_name_is_ok.rules
というファイルを作成します。
ベンダー ID とプロダクト ID は先程調べたものを指定してください。
SUBSYSTEM=="usb", ATTR{idVendor}=="07c0", ATTR{idProduct}=="111a", MODE="0666", GROUP="plugdev"
ファイルが作成できたら設定を反映します。
sudo udevadm control --reload-rules && sudo udevadm trigger
振動データの取得
Python コードの用意と実行
Python の環境整備ができたのでコードを用意します。
今回のセンサーはサンプルコードが下記の記事で紹介されているので、まずはこちらのコードをそのまま実行してみます。
#!/usr/bin/python3
import csv
import datetime
import struct
import sys
import usb.core
import usb.util
VID = 0x07c0
PID = 0x111a
ARNG = 0x00
AFILT = 0xC3
MODE = 0x00
RANGE_ACC = 2
def cal_acceleration(n):
return round((n - 32768) / 32768 * RANGE_ACC * 9.80665, 3)
if __name__ == "__main__":
dev = usb.core.find(idVendor=VID, idProduct=PID)
if dev is None:
raise ValueError("Device not found")
try:
dev.detach_kernel_driver(0)
except Exception:
pass
dev.write(2, [0x04, MODE, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
dev.write(2, [0x00, ARNG, AFILT, 0x00, 0x00, 0x00, 0x00, 0x00])
dev.write(2, [0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
cfg = dev.get_active_configuration()
intf = cfg[(0, 0)]
if dev.is_kernel_driver_active(intf.bInterfaceNumber):
dev.detach_kernel_driver(intf.bInterfaceNumber)
endpoint = usb.util.find_descriptor(
intf,
custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress)
== usb.util.ENDPOINT_IN
)
if endpoint is None:
raise ValueError('Endpoint not found')
with open("./JoyWarrior56FR1_test.csv", "w", encoding="utf_8_sig") as f:
writer = csv.writer(f)
writer.writerow(["日時", "加速度x", "加速度y", "加速度z"])
while True:
output_bytes = endpoint.read(endpoint.wMaxPacketSize).tobytes()
acc_bytes = struct.unpack("<HHH", output_bytes[:6])
accx, accy, accz = list(map(cal_acceleration, acc_bytes))
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S.%f")
print(f"加速度x[m/s^2]: {accx} 加速度y[m/s^2]: {accy} 加速度z[m/s^2]: {accz}")
writer.writerow([timestamp, accx, accy, accz])
f.flush()
振動センサーは、冒頭で紹介したようにモーターに付けていますので、モーターを事前に動かしてから上記のスクリプトを実行します。
python JoyWarrior56FR1.py
実行すると JoyWarrior56FR1_test.csv
というファイルが同じディレクトリに作成されて、3 軸の振動データが書き込まれています。
データのグラフ化
1分ほど実行して出来上がった CSV ファイルを Google スプレッドシートで見ると次のようなグラフになっていました。センサーの取り付け位置の通り X 軸方向に揺れているのが分かります。
なお、CSV ファイルは次のような内容になっています。
日時,加速度x,加速度y,加速度z
1725369745.2017078,-0.028,-0.013,0.028
1725369745.238774,-0.079,-0.876,1.669
1725369745.2416916,-2.395,-0.598,0.006
・・・
スプレッドシートに読み込むと、デフォルトで各カラムのデータが文字列として読まれてしまうので、グラフにするときはデータの種類を数字に変えるとグラフ化できます。
加速度のレンジを変えてみる
このセンサーでは加速度を ±2g
, ±4g
, ±8g
, ±16g
のいずれかのレンジで測定が可能です。最初に実行したのは ±2g
でした。
±2g
では、センサーの感度が最も高く小さな加速度の変化を詳細に測定できます。
一方で、±16g
では、センサーは最大±16gの加速度を測定できます。この範囲では、最も広い加速度の範囲を測定できますが精度は最も低くなります。非常に大きな加速度が発生する状況(例えば、衝撃や激しい動き)に適していると考えられます。
試しに ±16g
で試してみます。コードは下記のように RANGE_ACC
と ARNG
の指定を変更します。(各パラメーターの意味は後述)
RANGE_ACC = 16
ARNG = 0x01
先ほどと同じようにモーターを動かしながら測定すると下記のようなデータになりました。
加速度x[m/s^2]: -0.479 加速度y[m/s^2]: -0.599 加速度z[m/s^2]: 0.402
加速度x[m/s^2]: 9.831 加速度y[m/s^2]: 0.512 加速度z[m/s^2]: 0.761
加速度x[m/s^2]: -2.332 加速度y[m/s^2]: 0.029 加速度z[m/s^2]: -0.096
加速度x[m/s^2]: -2.241 加速度y[m/s^2]: -0.589 加速度z[m/s^2]: 0.752
加速度x[m/s^2]: 9.668 加速度y[m/s^2]: 1.796 加速度z[m/s^2]: -4.501
・・・
下記は ±2g
で取得した時のデータです。同じモーター(同じ振動)に対して測定レンジが変わることで測定結果が変わりました。今回のモーターに対しては、±2g
か ±4g
あたりで測定するのが妥当なように感じます。
加速度x[m/s^2]: 4.079 加速度y[m/s^2]: 0.009 加速度z[m/s^2]: 1.044
加速度x[m/s^2]: -1.548 加速度y[m/s^2]: 0.198 加速度z[m/s^2]: -1.034
加速度x[m/s^2]: -5.286 加速度y[m/s^2]: -0.851 加速度z[m/s^2]: 1.92
加速度x[m/s^2]: 3.97 加速度y[m/s^2]: 0.097 加速度z[m/s^2]: 0.371
加速度x[m/s^2]: -1.171 加速度y[m/s^2]: 0.394 加速度z[m/s^2]: -1.489
RANGE_ACC
と ARNG
のそれぞれの意味は次のとおりです。
RANGE_ACC
は、ソフトウェア内で使用される計算のためのパラメータです。これは、加速度の生データを物理単位(m/s²)に変換する際に使われます。
センサーから取得した生データを物理的な加速度値に変換する際に、測定レンジを考慮して正しく換算するために使用されます。RANGE_ACC
の値は、ARNGの設定に応じて設定する必要があります。
ARNG
は、センサーのハードウェア設定で、加速度測定レンジ(±2g, ±4g, ±8g, ±16g)を決定するための設定です。
センサー自体が測定できる加速度の範囲を設定します。この設定はセンサーの内部ハードウェアに対して行われ、測定できる最大加速度が決まります。
設定する時は次のような組み合わせで設定します。
RANGE_ACC |
ARNG |
---|---|
2 (±2g) |
0x00 |
4 (±4g) |
0x02 |
8 (±8g) |
0x03 |
16 (±16g) |
0x01 |
高速モードについて
なお、測定モード( Normal mode / High speed mode)は、MODE
の指定で変更できるかと思いましたが、高速モード MODE = 0x01
にするとエラーになり計測できませんでした。
「動作モード」についてマニュアルを調べると次のことが分かりました。
- センサーは 6,664 kHz でデータを生成(標準モードは 833 Hz)
- データは1つのパケットに10セット分の 3 軸データが含まれている
- 1パケットは 62 バイトで構成されている
- 高速モードに入ではセンサーは10セットのデータをまとめたパケット(62バイト)を生成する
このことを踏まえると、高速モードは通常モードとは異なるデータフォーマットになるため既存のコードでは動かないように思われました。(私の環境だけの問題かもしれません…)
しかし、高速モードでは非常に大量のデータが生成されるので、これらをすべてクラウドに送るのはあまり現実的ではない気がしました。活用する場合は次のようなパターンがあるように思います。
- デバイス上でリアルタイム推論
- 一定期間の高速モードのデータをファイルとして蓄積して都度クラウドに転送し、一定のタイミングでクラウドのマシンリソースでバッチ的に分析
今回は時間の関係上、通常モードのみを試すことにしました。
クラウド連携
振動データの取得と各種パラメータの動作確認ができたので、次は取得したデータを AWS IoT Coreに送ってみたいと思います。
事前に AWS IoT Core 上でデバイス証明書や秘密鍵を生成してデバイス上に設置しておきます。(本記事では手順の紹介は省略いたします)
Python コードの修正
AWS IoT Core にデータを送るように Raspberry Pi 上のコードを修正します。次の項目は利用環境に応じて変更してください。
ENDPOINT
: 利用している AWS IoT Core のエンドポイントCLIENT_ID
: 利用しているデバイスのクライアント IDPATH_TO_CERTIFICATE
= デバイス証明書のパスPATH_TO_PRIVATE_KEY
= 上記証明書の秘密鍵のパスPATH_TO_AMAZON_ROOT_CA_1
= Amazon CA 証明書のパス
取得したデータを Publish するトピックは blog/pub/test
としていますが、ここも必要に応じて変更いただければと思います。
#!/usr/bin/python3
from awscrt import io, mqtt, auth, http
from awsiot import mqtt_connection_builder
import csv
import struct
import sys
import usb.core
import usb.util
import time
import json
import logging
import RPi.GPIO as GPIO
# Define ENDPOINT, CLIENT_ID, PATH_TO_CERTIFICATE, PATH_TO_PRIVATE_KEY, PATH_TO_AMAZON_ROOT_CA_1, MESSAGE, TOPIC, and RANGE
ENDPOINT = "[YOUR_AWS_IoT_SUB_DOMAIN].iot.ap-northeast-1.amazonaws.com"
CLIENT_ID = "[YOUR_CLIENT_ID]"
PATH_TO_CERTIFICATE = "[YOUR_CERTIFICATE_PATH]"
PATH_TO_PRIVATE_KEY = "[YOUR_PRIVATE_KEY_PATH]"
PATH_TO_AMAZON_ROOT_CA_1 = "[AMAZON_ROOT_CA_1_PATH]"
#MESSAGE = "Hello World"
#TOPIC_PUB = "osaka/swtest02/motion/pub"
#TOPIC_SUB = "osaka/swtest02/sub/flinkresult"
TOPIC_PUB = "blog/pub/test"
TOPIC_SUB = "blog/sub/test"
RANGE = 20
# Logger Settings
logger = logging.getLogger()
logger.setLevel(logging.INFO)
streamHandler = logging.StreamHandler(stream=sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamHandler.setFormatter(formatter)
logger.addHandler(streamHandler)
# Spin up resources
event_loop_group = io.EventLoopGroup(1)
host_resolver = io.DefaultHostResolver(event_loop_group)
client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
mqtt_connection = mqtt_connection_builder.mtls_from_path(
endpoint=ENDPOINT,
cert_filepath=PATH_TO_CERTIFICATE,
pri_key_filepath=PATH_TO_PRIVATE_KEY,
client_bootstrap=client_bootstrap,
ca_filepath=PATH_TO_AMAZON_ROOT_CA_1,
client_id=CLIENT_ID,
clean_session=False,
keep_alive_secs=6
)
print("Connecting to {} with client ID '{}'...".format(
ENDPOINT, CLIENT_ID))
# Make the connect() call
connect_future = mqtt_connection.connect()
# Future.result() waits until a result is available
connect_future.result()
print("Connected!")
# JoyWarrior Settings
VID = 0x07c0
PID = 0x111a
ARNG = 0x00
AFILT = 0xC3
MODE = 0x00
#MODE = 0x01
RANGE_ACC = 2
# GPIO Settings
GPIO.setmode(GPIO.BCM)
GPIO.setup(3, GPIO.OUT)
def cal_acceleration(n):
return round((n - 32768) / 32768 * RANGE_ACC * 9.80665, 3)
# Main Logic
if __name__ == "__main__":
dev = usb.core.find(idVendor=VID, idProduct=PID)
if dev is None:
raise ValueError("Device not found")
try:
dev.detach_kernel_driver(0)
except Exception:
pass
dev.write(2, [0x04, MODE, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
dev.write(2, [0x00, ARNG, AFILT, 0x00, 0x00, 0x00, 0x00, 0x00])
dev.write(2, [0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
cfg = dev.get_active_configuration()
intf = cfg[(0, 0)]
if dev.is_kernel_driver_active(intf.bInterfaceNumber):
dev.detach_kernel_driver(intf.bInterfaceNumber)
endpoint = usb.util.find_descriptor(
intf,
custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress)
== usb.util.ENDPOINT_IN
)
if endpoint is None:
raise ValueError('Endpoint not found')
with open("./aws_iot_test.csv", "w", encoding="utf_8_sig") as f:
writer = csv.writer(f)
writer.writerow(["日時", "加速度x", "加速度y", "加速度z"])
while True:
output_bytes = endpoint.read(endpoint.wMaxPacketSize).tobytes()
acc_bytes = struct.unpack("<HHH", output_bytes[:6])
accx, accy, accz = list(map(cal_acceleration, acc_bytes))
timestamp = int(time.time())
payload = {
"timestamp": timestamp,
"accx": accx,
"accy": accy,
"accz": accz
}
mqtt_connection.publish(topic=TOPIC_PUB, payload=json.dumps(payload), qos=mqtt.QoS.AT_LEAST_ONCE)
print("Published: '" + json.dumps(payload) + "' to the topic: " + TOPIC_PUB)
writer.writerow([timestamp, accx, accy, accz])
f.flush()
time.sleep(0.2)
このコードでは 0.2 秒単位でデータを収集・送信しているので、センサーで取得できる全データの一部のみ送信しています。
これは高速モードを利用していない理由と同じです。全データが必要であれば、必要に応じてエッジデバイス上で分析したりバッファリングして送るなど要件に応じてカスタマイズして利用いただければと思います。
もちろんすべてを AWS 側に送ることも可能です。その際は通信コストや AWS IoT Core から先につなぐサービスのクォータなどに注意して設計するようにしましょう。
AWS IoT Core 上でメッセージを確認
上記コードを実行して AWS IoT Core のテストクライアントでモニタリングすると、timestamp と 3 軸のデータが AWS 上で確認できました。
最後に
加速度センサーは初めて使いましたが、思っていたよりもきれいな波形のデータを取得できました。このセンサーを使ってさらに活用できるユースケースを試していきたいと思います。
参考 URL