InfluxDB Cloudへデータを送信&可視化してみた

2022.09.01

こんにちは。CX事業本部Delivery部のakkyです。

IoTセンサーのデータベースとして、AWS IoT SiteWiseやAzure Cosmos DBを使う方法をご紹介してきました。Developers.IOにはAmazon Timestreamなどを使う方法も紹介されていますが、ほとんどが3大クラウドサービスが提供するDBでした。

今回は、InfluxDB Cloudへデータを送信し、可視化してみましたのでご紹介します。

InfluxDB

InfluxDBはInfluxData社が開発しているオープンソースの時系列データベースです。サーバはセルフホスティングするか、フルマネージドのInfluxDB Cloudというサービスがあります。

データの書き込みには、HTTPエンドポイントのほか、MQTTや各種サービスのメトリクスの取得に対応しています。クエリにはFluxまたはInfluxQLというクエリ言語を使います。Fluxは多機能で、ある程度の計算はこれだけでできてしまいます。 クエリは独自言語となりますが、各種プログラミング言語で使えるライブラリが用意されています。

可視化ツールは組み込みのものが用意されていますが、Grafanaを使うこともできます。もちろんAmazon Managed Grafanaも対応しています。

今回はInfluxDB Cloudの無料プランを使ってみました。

データ構造

InfluxDBでは、NoSQLのようなスキーマレスのほか、スキーマを強制することができます。センサやサーバのメトリクスを入れるユースケースではスキーマありで使うことが多いかもしれません。

バケットの中に各測定値の行が入る構造はRDBと似ていますが、各行のデータとして、必須の測定名、インデックスの対象になるタグ、インデックス対象外のフィールド、そして必須の時刻データがあるという形式がInflexDBの特徴です。

測定名にはデータの種類を説明する名前、タグにはデータの取得元などを識別するための値、フィールドには測定値を入れるのが想定した使い方のようです。 詳しくは公式ドキュメントを参照してください。

Pythonからの書き込み

Pine A64上のPythonで動作確認したコードから、InfluxDBの部分を抜き出しました。 Pythonのinfluxdbライブラリが必要です。

from datetime import datetime
import time

from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.exceptions import InfluxDBError

influxdb_url = "https://XXXXXXX.aws.cloud2.influxdata.com"
token = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
org = "test@example.com"
bucket = "BucketName"
points = []
BATCH_CNT = 6

class BatchingCallback(object):
    def success(self, conf, data: str):
        #print(f"Written batch: {conf}, data: {data}")
        pass

    def error(self, conf, data: str, exception: InfluxDBError):
        print(f"Cannot write batch: {conf}, data: {data} due: {exception}")

    def retry(self, conf, data: str, exception: InfluxDBError):
        print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}")

def func():
    frequency = 50.0
    voltage = 100.0

    point = Point("ac") \
            .tag("place", "home") \
            .field("frequency", frequency) \
            .field("voltage", voltage) \
            .time(datetime.utcnow(), WritePrecision.NS)
    points.append(point)
    if len(points) >= BATCH_CNT:
        with InfluxDBClient(url=influxdb_url, token=token, org=org) as influxdb_client:
            callback = BatchingCallback()
            with influxdb_client.write_api(success_callback=callback.success,
                                error_callback=callback.error,
                                retry_callback=callback.retry) as write_api:
                write_api.write(bucket, org, points)
                points.clear()

Pythonでは、測定値はPointというクラスで生成します。今回は周波数と電圧を測定するセンサーを例としたので、前述のとおり、measurement(コンストラクタの引数)にはac、データを識別するための値をtagとしてplaceにセットし、測定値をfieldとしてfrequencyとvoltageをセットしました。 HTTPエンドポイントに送信するので、いくつかのデータをまとめてwrite_api.write()するコードになっています。

データの可視化

InfluxDB Cloudへログインし、Data Explorerを開きます。FROMでBucketを選び、MEASUREMENTから測定値を選択し、そのほかのタグで絞り込んでからSUBMITをクリックするとグラフが表示されます。Grafanaのように表示範囲も選択できます。