Timescale CloudへIoT Coreからのデータを保存してみた

2023.01.23

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは。CX事業本部Delivery部のakkyです。

IoT用途に使用可能なデータベースは用途によっていろいろありますが、特に発生したデータをそのまま保存しておくために、時系列データベースを使うことが多いのではないでしょうか。

今回は、時系列データベースであるTimescaleの公式なSaaSであるTimescale Cloudを試用してみましたので、ご紹介します。

Timescale

TimescaleはPostgreSQLの拡張として実装された時系列データベースです。

PostgreSQLのリレーショナルデータベースのメリットはそのままに、高速なクエリや水平スケーリング、データのダウンサンプリングによる圧縮などの時系列データベースの機能を付け加えているとされています。

今回は、Timescaleの開発元が提供するDBaaSであるTimescale Cloudを使いました。これはAWS上で提供されており、VPCなどへの接続もできます(検証はしていません)。

AzureやGCPでの利用、またはAWSでTimescale Cloudが提供されていないリージョンにデプロイしたい場合は、Managed Service for TimescaleDBという別メニューがあります。

データベースの作成

Timescale Cloudは検証用に30日間無料で使えます。 アカウントを作成すると、データベースインスタンスを作成できるようになります。

このままCreate Serviceしてもいいのですが、せっかくなのでAdvanced Configurationでオプションを見てみましょう。

ノード構成やリージョンやレプリカの有無を選べるようですね。リージョンはあまり選べないのですが、日本からなるべく近いオレゴン(us-west-2)にしてみましょう。

CPU/RAMはとりあえず最小のものに。ストレージも10GBで作成してみました。

デプロイが開始され、すぐに接続情報などが出てきますので、メモしておきます。

テーブルの作成

上記画面に丁寧に接続方法などが出ていますので、その通りにやれば接続できます。

接続にはPostgreSQLのコマンドがそのまま使えます。私はWSL上のUbuntuへインストールしました。

sudo apt install postgresql-client-12

接続したら説明の通りにcreate tableし、次にHyper tableを作りました。

create tableの時点ではPrimary KeyやIndexなどは要らないようです。

create table acinfotokyo (
    time timestamp primary key,
    voltage double precision,
    frequency double precision
);
SELECT create_hypertable('acinfotokyo ', 'time');

このあとはinsertでデータを突っ込んでいくだけです。

IoT Core→Lambdaでデータ挿入

IoT CoreでMQTT経由で送られてくるデータをLambdaを使ってinsertすることにします。

Pythonで以下のようなスクリプトを作成しました。今回はスクリプトへパスワードを直接書いてしまっていますが、実運用の際にはSecrets Managerなどへ保存することをおすすめします。

また、メッセージの受信頻度が高くないのでRDS Proxyは使っていません。

PostgreSQLのクライアントにはpg8000を使います。(psycopg2は標準パッケージをLambdaで動かすことができません。) pipでpg8000をインストールし、zipファイルに同梱してアップロードします。

import pg8000.dbapi
from datetime import datetime, timezone
import ssl

counter = 0

ssl_context = ssl.create_default_context()
ssl_context.verify_mode = ssl.CERT_REQUIRED
ssl_context.load_verify_locations('./*******.*******.tsdb.cloud.timescale.com.chained+root.crt')

connection = pg8000.dbapi.connect(host='*******.*******.tsdb.cloud.timescale.com',
                              port = ******,
                              user='*******',
                              password='**********',
                              database='****',
                              ssl_context=ssl_context)

def lambda_handler(event, context):
    global counter
    timestamp = datetime.fromtimestamp(event["timeInSeconds"] + event["offsetInNanos"] / 1000000000, tz=timezone.utc)
    try:
        cursor = connection.cursor()
        sql = "INSERT INTO acinfotokyo (time, voltage, frequency) VALUES (%s, %s, %s)"
        cursor.execute(sql, (timestamp, event["voltage"], event["frequency"],))
        connection.commit()
    except Exception as e:
        print(f"insert error {counter}: {e}")
    else:
        print(f"success {counter}")
    finally:
        cursor.close()
    counter += 1
    return

LambdaをデプロイしてIoT Coreのルールを設定し、しばらく経ってからselectすると、データが保存できることを確認できると思います。

まとめ

Timescale CloudへIoTデータを保存する方法をご紹介しました。

Timescaleは他の時系列DBと異なり、PostgreSQLのツールやコマンドがそのままつけるのがいい点ですね。RDBとして使いたいテーブルも同時に入れられるのは便利です。 AWSのRDSには拡張機能がインストールできないので使えないのですが、DBaaSとして使えてお手軽です。

性能やデータの圧縮については検証できなかったのですが、通常のRDBへ時系列データを入れる際に考慮が必要なパーティショニングなどを行うことなく使えそうなので、適用できるアプリケーションは多そうです。