この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
こんにちは。CX事業本部Delivery部のakkyです。
IoTセンサーのデータベースとして、AWS IoT SiteWiseやAzure Cosmos DBを使う方法をご紹介してきました。Developers.IOにはAmazon Timestreamなどを使う方法も紹介されていますが、ほとんどが3大クラウドサービスが提供するDBでした。
今回は、InfluxDB Cloudへデータを送信し、可視化してみましたのでご紹介します。
InfluxDB
InfluxDBはInfluxData社が開発しているオープンソースの時系列データベースです。サーバはセルフホスティングするか、フルマネージドのInfluxDB Cloudというサービスがあります。
データの書き込みには、HTTPエンドポイントのほか、MQTTや各種サービスのメトリクスの取得に対応しています。クエリにはFluxまたはInfluxQLというクエリ言語を使います。Fluxは多機能で、ある程度の計算はこれだけでできてしまいます。 クエリは独自言語となりますが、各種プログラミング言語で使えるライブラリが用意されています。
可視化ツールは組み込みのものが用意されていますが、Grafanaを使うこともできます。もちろんAmazon Managed Grafanaも対応しています。
今回はInfluxDB Cloudの無料プランを使ってみました。
データ構造
InfluxDBでは、NoSQLのようなスキーマレスのほか、スキーマを強制することができます。センサやサーバのメトリクスを入れるユースケースではスキーマありで使うことが多いかもしれません。
バケットの中に各測定値の行が入る構造はRDBと似ていますが、各行のデータとして、必須の測定名、インデックスの対象になるタグ、インデックス対象外のフィールド、そして必須の時刻データがあるという形式がInflexDBの特徴です。
測定名にはデータの種類を説明する名前、タグにはデータの取得元などを識別するための値、フィールドには測定値を入れるのが想定した使い方のようです。 詳しくは公式ドキュメントを参照してください。
Pythonからの書き込み
Pine A64上のPythonで動作確認したコードから、InfluxDBの部分を抜き出しました。 Pythonのinfluxdbライブラリが必要です。
from datetime import datetime
import time
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.exceptions import InfluxDBError
influxdb_url = "https://XXXXXXX.aws.cloud2.influxdata.com"
token = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
org = "test@example.com"
bucket = "BucketName"
points = []
BATCH_CNT = 6
class BatchingCallback(object):
def success(self, conf, data: str):
#print(f"Written batch: {conf}, data: {data}")
pass
def error(self, conf, data: str, exception: InfluxDBError):
print(f"Cannot write batch: {conf}, data: {data} due: {exception}")
def retry(self, conf, data: str, exception: InfluxDBError):
print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}")
def func():
frequency = 50.0
voltage = 100.0
point = Point("ac") \
.tag("place", "home") \
.field("frequency", frequency) \
.field("voltage", voltage) \
.time(datetime.utcnow(), WritePrecision.NS)
points.append(point)
if len(points) >= BATCH_CNT:
with InfluxDBClient(url=influxdb_url, token=token, org=org) as influxdb_client:
callback = BatchingCallback()
with influxdb_client.write_api(success_callback=callback.success,
error_callback=callback.error,
retry_callback=callback.retry) as write_api:
write_api.write(bucket, org, points)
points.clear()
Pythonでは、測定値はPointというクラスで生成します。今回は周波数と電圧を測定するセンサーを例としたので、前述のとおり、measurement(コンストラクタの引数)にはac、データを識別するための値をtagとしてplaceにセットし、測定値をfieldとしてfrequencyとvoltageをセットしました。 HTTPエンドポイントに送信するので、いくつかのデータをまとめてwrite_api.write()するコードになっています。
データの可視化
InfluxDB Cloudへログインし、Data Explorerを開きます。FROMでBucketを選び、MEASUREMENTから測定値を選択し、そのほかのタグで絞り込んでからSUBMITをクリックするとグラフが表示されます。Grafanaのように表示範囲も選択できます。