GrafanaのデータソースにDynamoDBを利用できるようにしてみた

2022.08.16

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは。CX事業本部Delivery部のakkyです。

GrafanaはIoTでの利用にも便利で、データソースとしてRDBはもちろん、SiteWiseなどさまざまなデータソースが使えます。しかし、DynamoDBもよく使うデータベースにもかかわらず、データソースプラグインが用意されていません。GrafanaのためにRDSに格納するのもちょっと面倒ですし、Amazon Managed Grafanaではプラグインを自分でインストールすることもできません。

しかし、GrafanaにはJSONというJSON形式でデータを取得することができるプラグインが標準で付属しています。もちろんAmazon Managed Grafanaでも使えます。

今回は、DynamoDBとZappaの勉強を兼ねて、Python(+API Gateway+Lambda)をバックエンドに使った、JSON経由でDynamoDBをGrafanaで可視化するプロキシを作ってみましたので、ご紹介します。

Zappa

ZappaはPythonで作られたWSGI対応WebアプリケーションをAPI Gateway+Lambda構成のサーバレスアプリとして簡単にデプロイできるライブラリです。

Zappaを使ってFlask appをAWS Lambda と API Gatewayにデプロイしてみた

今回は、Flaskを使ってDynamoDBへアクセスするAPIを作ります。

JSON

GrafanaのJSONプラグインを使うと、JSONを返すAPIをデータソースにすることができます。

JSONは/、/query、/searchの3つのAPIを実装すればよいので、これらの(実質2つ)のAPIを実装しました。

/

APIへの疎通を確認するアドレスです。200で応答すればよいだけです。

/query

実際の検索クエリが来るAPIです。JSONでPOSTされます。ここから表示対象と表示範囲を取得し、DynamoDBへリクエストします。

パーティションキー列名、ソートキー列名はあまり変更がないと想定しURLへ含めることにし、実際のパーティションキーとテーブル名はGrafanaのクエリ作成画面から入力できるpayloadを利用することにしました。

応答としては、targetにリクエスト名、datapointに測定値とUNIX時刻(ミリ秒)の配列を入れ、それらをさらに配列にして返せばOKです。

/search

グラフに表示するメトリックの選択肢に表示される値の配列を返します。DynamoDBでは、RDBと異なりスキーマを取得できないため、URLに含めることにします。

 

以上3つのURLを実装します。JSONではエンドポイントURLはまとめて一つしか指定できないため、メトリックのリスト、パーティションキー列名、ソートキー列名はURLへ含めてすべてのURLでのリクエストに含めてしまいます。

DynamoDBキー構造の想定

今回はDynamoDBに時系列のデータを記録していることを想定し、テーブル名とパーティションキー:測定地点名やデータの名前などの固有値、ソートキー:時刻データ(UNIX時間など)、その他の列:実際の測定データとして利用している構造で利用するようにしました。

Zappa+Flaskでの実装

pipで必要なライブラリをインストールします。

pip install zappa flask boto3 python-dateutil

それぞれ以下のバージョンを使用しました。

zappa 0.55.0
Flask 2.2.2
boto3 1.24.48
python-dateutil 2.8.2

zappa initを実行して、プロジェクトを初期設定してください。app_functionはmyapp.appとして、以下のソースコードを保存しました。意味のある作業をしているのはほとんどqueryメソッドだけです。

myapp.py

from flask import Flask, request, jsonify
from dateutil.parser import isoparse
from decimal import Decimal
import boto3
from boto3.dynamodb.conditions import Key

app = Flask(__name__)
dynamodb = boto3.resource('dynamodb')

def sortkey_convert(val):
    return int(val*1000)

def value_convert(val):
    return float(val)

@app.route("/<attributes>/<partiton_key>/<sort_key>/")
def index2(attributes, partiton_key, sort_key):
    return f"OK:{attributes}:{partiton_key}:{sort_key}\n"

@app.post("/<attributes>/<partiton_key>/<sort_key>/search")
def search(attributes, partiton_key, sort_key):
    return jsonify(attributes.split(','))

@app.post("/<attributes>/<partiton_key>/<sort_key>/query")
def query(attributes, partiton_key, sort_key):
    targets = request.json["targets"]
    range_from = Decimal(isoparse(request.json["range"]["from"]).timestamp())
    range_to = Decimal(isoparse(request.json["range"]["to"]).timestamp())

    ret_dict = []
    for target in targets:
        target_name = target["target"]
        payload = target["payload"]
        table_name = payload["table"]
        partiton_key_name = payload["partiton_key"]
        target_strlist = target_name + ',#sk'
        option = {
            'ExpressionAttributeNames': {"#sk": sort_key},
            'KeyConditionExpression': \
                Key(partiton_key).eq(partiton_key_name) &
                Key(sort_key).between(range_from, range_to),
            'ProjectionExpression': target_strlist
        }
        table = dynamodb.Table(table_name)
        res = table.query(**option)

        item_list = []
        for item in res["Items"]:
            item_list.append([value_convert(item[target_name]), sortkey_convert(item[sort_key])])
        ret_dict.append({"target": target_name, "datapoints":item_list})
    return ret_dict

以下のコマンドでデプロイします。成功するとURLが表示されます。なお、デフォルト設定ではだれでもアクセスできてしまうので、実運用する際には必ずアクセス制限や認証を行ってください!

 zappa deploy dev

Grafanaの設定

GrafanaのConfigurationメニュー→Data sourcesを開き、検索ボックスにJSONと入力します。

設定画面が表示されるので、URLに「先ほどデプロイしたAPIのURL/<メトリック一覧>/<パーティションキー列名>/<ソートキー列名>/」を設定します。具体的には「https://XXXXXXXXXX.execute-api.ap-northeast-1.amazonaws.com/dev/temperature,humidity/devicename/timestamp/」のようになります。

ユーザー認証を設定した場合は、Authで設定します。入力したらSave&testをクリックし、成功することを確認します。

次に、新しくダッシュボードを作成し、パネルを追加します。Data sourceはJSONとし、表示したいMetric(ここに/searchの結果が利用されます)を選択、Payloadには「{"table":"DynamoDBのテーブル名","partiton_key":"DynamoDBのパーティションキー名"}」を入力します。右上のRefreshボタンをクリックすると、グラフが表示されるはずです。

おわりに

JSONプラグインは送信されてくる項目名が多いため実装には少し悩みましたが、わかってしまえばメトリック名と範囲を見て、ごく少数の項目に実際のデータを詰めて返すだけなので、なんてことはないのでした。

これを応用すれば、いろいろなデータソース(といってもほとんどの物は網羅されていますが…)を利用して、Grafanaの強力な可視化機能を利用することができるようになると思います。