この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
データアナリティクス事業本部の鈴木です。
Amazon Kinesis Data Analytics Studioが利用可能になったので、さっそく東京リージョンで試してみました。
Kinesis Data Analytics Studioとは
Apache Zeppelinノートブックをインターフェースに、Kinesisデータストリームに対してアドホックにクエリを実行したり、結果を可視化したりすることができます。
ちなみに、「Apache Zeppelinってなんだ?」という方は、Apache Zeppelinをローカル環境で練習する記事も書きましたので、よければご確認ください。
検証内容
まず、検証内容を整理します。
検証概要
ローカルでPythonを使ってランダムウォークを生成し、タイムスタンプと座標をデータストリームに送信し、Kinesis Data Analytics Studioで可視化します。
Kinesis Data Analytics Studioの使用例は、Amazon Web Services ブログで公開されていたので、こちらを参考にしました。
上記資料では、いくつかの機能が紹介されていましたが、本記事では以下の2点をスコープとしました。
- ノートブックの起動
- データストリームで受信したストリーミングデータの可視化
構成は以下です。
検証データについて
以下のコードを使って、データストリームにデータを送信しました。 100ステップのランダムウォークを生成し、ステップ毎にJSON形式のレコードをboto3でデータストリームに送信します。
後述の手順でスタジオノートブック側の準備が完了してから、このコードを使ってデータストリームにデータを送信します。また、ハイライトしたデータストリーム名は自分の作成したデータストリームのものに置き換えます。
random_walk.py
import datetime
import json
import random
import time
import boto3
import pandas as pd
def get_random_data(trial_id, x):
""" 前の位置からランダムに移動し、送信用のデータと位置を返却する。
"""
data = {
"trial_id": trial_id,
"x": x,
"event_time": datetime.datetime.now().isoformat()
}
x += random.choice([-1,1])
return data, x
def send_data(stream_name, kinesis_client, trial_id, steps):
""" ランダムウォークを計算しつつ、Kinesisデータストリームにデータを送信する。
"""
# 初期位置と軌跡格納用のリストを設定しておく。
x = 0
trail = []
# 念のため、ステップ数を制限しておく。
if steps > 2000:
print(f"{steps} is too much.")
return
# シミュレーションを実行し、逐次でKinesisデータストリームにデータを送信する。
for i in range(steps):
# 新しい送信用データを取得する。
data, x = get_random_data(trial_id, x)
partition_key = str(data["trial_id"])
# データを送信する。
kinesis_client.put_record(
StreamName=stream_name,
Data=json.dumps(data),
PartitionKey=partition_key)
# 軌跡を記録し、少し間をあける。
trail.append(data)
time.sleep(0.05)
# 結果は答え合わせができるようにCSVに保存しておく。
df_trail = pd.DataFrame(trail)
df_trail.to_csv("./trail.csv", index=False)
return
def main():
# シミュレーションの設定をする。
trial_id = 1
steps = 100
# Kinesisデータストリームへの送信設定と準備をする。
stream_name = ${作成したデータストリーム名}
kinesis_client = boto3.client('kinesis')
print("Start sending data.")
# シミュレーションと送信を開始する。
send_data(stream_name, kinesis_client, trial_id, steps)
print("Successfully finished.")
if __name__ == '__main__':
main()
データストリームに送信されるレコードは以下のようなJSONになります。何回めの実験かをtrial_idで判別するようなイメージです。
{
"trial_id": 1,
"x": 0,
"event_time": '2021-06-17T18:37:12.712154'
}
生成されるランダムウォークは、例えば以下のようになります。
やってみた
では、検証した内容を紹介していきます。以下の内容を順番に実施していきました。
データストリームを作成する
まずAWSマネジメントコンソールから、データ受信用のデータストリームを作成しました。
Glue データカタログのデータベースを作成する
適当な空のデータベースを作成しておきました。Glueのコンソールで、[データベース]から[データベースの追加]で作成ができます。スタジオノートブックの作成画面からでも作成できます。
スタジオノートブックを作成する
作成したデータストリームを選択し、[プロセス]から[リアルタイムでデータを処理]を選択します。
[Apache Flink - スタジオノートブック]を選択し、作成します。
[Studioノートブックを作成]画面に移るので、ノートブック名などを入力します。
同じ画面で、ソースのメタデータを定義するためのAWS Glueデータベースにどれを使うか聞かれるので、先ほど作成したデータベースを選択します(作成していなければ、[作成]から作成します)。
設定を確認し、ノートブックを作成します。
ちなみに、[Studioノートブックの設定]で「並列度」の記載がありましたが、ノートブックの作成後に[スケーリング]画面で編集が可能です。
[スケーリング]画面の説明によると、
Kinesis Data Analyticsは、アプリケーションを実行するために必要なKPUを次の式を使用してプロビジョニングします。 アプリケーションのKPU = (並列処理)/(KPUあたりの並列処理数)
とのことなので、並列度が多いとその分プロビジョニングされるKPUが増えて費用がかかりそうです。
今回はあくまで検証用でKinesisデータストリームのシャードも1にしていて、トラフィックもほとんどありません。そのため、並列度を1に変更して、KPUが1になるようにしました。
スタジオノートブックを起動する
準備ができたので、[起動]を押して、スタジオノートブックを起動します。
開始されたことが確認できれば、[Apache Zeppelinで開く]を押すと、Zeppelin Notebookのトップ画面が開きます。
「Create new note」ボタンから"sample_notebook"というノートブックを新規作成しました。
テーブルを作成する
%flink.ssqlインタプリタでSQLを実行し、walk_dataテーブルを作成しました。
このとき、WITH句で'stream'を指定するので、作成されたストリームの名前に変えるようにします。
%flink.ssql
CREATE TABLE walk_data (
trial_id INTEGER,
x FLOAT,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
PARTITIONED BY (trial_id)
WITH (
'connector' = 'kinesis',
'stream' = ${作成したデータストリーム名},
'aws.region' = 'ap-northeast-1',
'scan.stream.initpos' = 'LATEST',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
)
Shift + Enterなどでパラグラフを実行した後、データベースを確認すると、テーブルが作成されたことが分かりました。
可視化してみる
ようやく準備が整ったので、以下のクエリをノートブックで実行し、データが来るのを待ちます。このクエリでwalk_data テーブルのすべてのコンテンツを取得します。新しいデータが到来すると、SELECTの出力が継続的に更新されます。
%flink.ssql(type=update)
SELECT * FROM walk_data;
グラフは散布図を選択し、[setting]から横軸をevent_timeに変えておきます。ちなみに、[setting]の内容はアウトプットが更新されるときに、もとに戻ってしまったので、一度パラグラフの処理を停止し、設定してから再度実行すると思った通りに行きました。
続いて、ローカルでデータの送信を行います。
python random_walk.py
あとはZeppelinノートの出力を眺めていると、データの到来と共に可視化が始まります。以下のような感じになりました。
ローカルのデータと比較すると、確かに同じデータが流れてきていることが確認できます。
料金
Kinesis Data Analytics Studioの利用料金については、Amazon Kinesis Data Analyticsの料金ページから確認することが出来ます。詳細は該当ページにてご確認ください。
感想
非常に簡単に、データストリームで受信したストリーミングデータをZeppelinで可視化することができました。また、今回は実施しませんでしたが、処理したデータを送信先の Kinesis データストリームに送信したり、Kinesis Analyticsアプリケーションとしてデプロイすることもできるので、そちらの機能も試していきたいです。