Kinesis Data Analytics Studioでストリーミングデータを可視化してみた
データアナリティクス事業本部の鈴木です。
Amazon Kinesis Data Analytics Studioが利用可能になったので、さっそく東京リージョンで試してみました。
Kinesis Data Analytics Studioとは
Apache Zeppelinノートブックをインターフェースに、Kinesisデータストリームに対してアドホックにクエリを実行したり、結果を可視化したりすることができます。
ちなみに、「Apache Zeppelinってなんだ?」という方は、Apache Zeppelinをローカル環境で練習する記事も書きましたので、よければご確認ください。
検証内容
まず、検証内容を整理します。
検証概要
ローカルでPythonを使ってランダムウォークを生成し、タイムスタンプと座標をデータストリームに送信し、Kinesis Data Analytics Studioで可視化します。
Kinesis Data Analytics Studioの使用例は、Amazon Web Services ブログで公開されていたので、こちらを参考にしました。
上記資料では、いくつかの機能が紹介されていましたが、本記事では以下の2点をスコープとしました。
- ノートブックの起動
- データストリームで受信したストリーミングデータの可視化
構成は以下です。
検証データについて
以下のコードを使って、データストリームにデータを送信しました。 100ステップのランダムウォークを生成し、ステップ毎にJSON形式のレコードをboto3でデータストリームに送信します。
後述の手順でスタジオノートブック側の準備が完了してから、このコードを使ってデータストリームにデータを送信します。また、ハイライトしたデータストリーム名は自分の作成したデータストリームのものに置き換えます。
import datetime import json import random import time import boto3 import pandas as pd def get_random_data(trial_id, x): """ 前の位置からランダムに移動し、送信用のデータと位置を返却する。 """ data = { "trial_id": trial_id, "x": x, "event_time": datetime.datetime.now().isoformat() } x += random.choice([-1,1]) return data, x def send_data(stream_name, kinesis_client, trial_id, steps): """ ランダムウォークを計算しつつ、Kinesisデータストリームにデータを送信する。 """ # 初期位置と軌跡格納用のリストを設定しておく。 x = 0 trail = [] # 念のため、ステップ数を制限しておく。 if steps > 2000: print(f"{steps} is too much.") return # シミュレーションを実行し、逐次でKinesisデータストリームにデータを送信する。 for i in range(steps): # 新しい送信用データを取得する。 data, x = get_random_data(trial_id, x) partition_key = str(data["trial_id"]) # データを送信する。 kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey=partition_key) # 軌跡を記録し、少し間をあける。 trail.append(data) time.sleep(0.05) # 結果は答え合わせができるようにCSVに保存しておく。 df_trail = pd.DataFrame(trail) df_trail.to_csv("./trail.csv", index=False) return def main(): # シミュレーションの設定をする。 trial_id = 1 steps = 100 # Kinesisデータストリームへの送信設定と準備をする。 stream_name = ${作成したデータストリーム名} kinesis_client = boto3.client('kinesis') print("Start sending data.") # シミュレーションと送信を開始する。 send_data(stream_name, kinesis_client, trial_id, steps) print("Successfully finished.") if __name__ == '__main__': main()
データストリームに送信されるレコードは以下のようなJSONになります。何回めの実験かをtrial_idで判別するようなイメージです。
{ "trial_id": 1, "x": 0, "event_time": '2021-06-17T18:37:12.712154' }
生成されるランダムウォークは、例えば以下のようになります。
やってみた
では、検証した内容を紹介していきます。以下の内容を順番に実施していきました。
データストリームを作成する
まずAWSマネジメントコンソールから、データ受信用のデータストリームを作成しました。
Glue データカタログのデータベースを作成する
適当な空のデータベースを作成しておきました。Glueのコンソールで、[データベース]から[データベースの追加]で作成ができます。スタジオノートブックの作成画面からでも作成できます。
スタジオノートブックを作成する
作成したデータストリームを選択し、[プロセス]から[リアルタイムでデータを処理]を選択します。
[Apache Flink - スタジオノートブック]を選択し、作成します。
[Studioノートブックを作成]画面に移るので、ノートブック名などを入力します。
同じ画面で、ソースのメタデータを定義するためのAWS Glueデータベースにどれを使うか聞かれるので、先ほど作成したデータベースを選択します(作成していなければ、[作成]から作成します)。
設定を確認し、ノートブックを作成します。
ちなみに、[Studioノートブックの設定]で「並列度」の記載がありましたが、ノートブックの作成後に[スケーリング]画面で編集が可能です。
[スケーリング]画面の説明によると、
Kinesis Data Analyticsは、アプリケーションを実行するために必要なKPUを次の式を使用してプロビジョニングします。 アプリケーションのKPU = (並列処理)/(KPUあたりの並列処理数)
とのことなので、並列度が多いとその分プロビジョニングされるKPUが増えて費用がかかりそうです。
今回はあくまで検証用でKinesisデータストリームのシャードも1にしていて、トラフィックもほとんどありません。そのため、並列度を1に変更して、KPUが1になるようにしました。
スタジオノートブックを起動する
準備ができたので、[起動]を押して、スタジオノートブックを起動します。
開始されたことが確認できれば、[Apache Zeppelinで開く]を押すと、Zeppelin Notebookのトップ画面が開きます。
「Create new note」ボタンから"sample_notebook"というノートブックを新規作成しました。
テーブルを作成する
%flink.ssqlインタプリタでSQLを実行し、walk_dataテーブルを作成しました。
このとき、WITH句で'stream'を指定するので、作成されたストリームの名前に変えるようにします。
%flink.ssql CREATE TABLE walk_data ( trial_id INTEGER, x FLOAT, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND ) PARTITIONED BY (trial_id) WITH ( 'connector' = 'kinesis', 'stream' = ${作成したデータストリーム名}, 'aws.region' = 'ap-northeast-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' )
Shift + Enterなどでパラグラフを実行した後、データベースを確認すると、テーブルが作成されたことが分かりました。
可視化してみる
ようやく準備が整ったので、以下のクエリをノートブックで実行し、データが来るのを待ちます。このクエリでwalk_data テーブルのすべてのコンテンツを取得します。新しいデータが到来すると、SELECTの出力が継続的に更新されます。
%flink.ssql(type=update) SELECT * FROM walk_data;
グラフは散布図を選択し、[setting]から横軸をevent_timeに変えておきます。ちなみに、[setting]の内容はアウトプットが更新されるときに、もとに戻ってしまったので、一度パラグラフの処理を停止し、設定してから再度実行すると思った通りに行きました。
続いて、ローカルでデータの送信を行います。
python random_walk.py
あとはZeppelinノートの出力を眺めていると、データの到来と共に可視化が始まります。以下のような感じになりました。
ローカルのデータと比較すると、確かに同じデータが流れてきていることが確認できます。
料金
Kinesis Data Analytics Studioの利用料金については、Amazon Kinesis Data Analyticsの料金ページから確認することが出来ます。詳細は該当ページにてご確認ください。
感想
非常に簡単に、データストリームで受信したストリーミングデータをZeppelinで可視化することができました。また、今回は実施しませんでしたが、処理したデータを送信先の Kinesis データストリームに送信したり、Kinesis Analyticsアプリケーションとしてデプロイすることもできるので、そちらの機能も試していきたいです。