Kinesis Data Analytics Studioでストリーミングデータを可視化してみた

2021.06.21

データアナリティクス事業本部の鈴木です。

Amazon Kinesis Data Analytics Studioが利用可能になったので、さっそく東京リージョンで試してみました。

Kinesis Data Analytics Studioとは

Apache Zeppelinノートブックをインターフェースに、Kinesisデータストリームに対してアドホックにクエリを実行したり、結果を可視化したりすることができます。

Amazon Kinesis Data Analytics(ストリーミングデータをリアルタイムで処理)| AWS

ちなみに、「Apache Zeppelinってなんだ?」という方は、Apache Zeppelinをローカル環境で練習する記事も書きましたので、よければご確認ください。

Apache ZeppelinコンテナとPythonでデータを可視化してみた | DevelopersIO

検証内容

まず、検証内容を整理します。

検証概要

ローカルでPythonを使ってランダムウォークを生成し、タイムスタンプと座標をデータストリームに送信し、Kinesis Data Analytics Studioで可視化します。

Kinesis Data Analytics Studioの使用例は、Amazon Web Services ブログで公開されていたので、こちらを参考にしました。

Amazon Kinesis Data Analytics Studio の概要 – SQL、Python、または Scala を使用してストリーミングデータをすばやく操作する | Amazon Web Services ブログ

上記資料では、いくつかの機能が紹介されていましたが、本記事では以下の2点をスコープとしました。

  • ノートブックの起動
  • データストリームで受信したストリーミングデータの可視化

構成は以下です。

検証した構成

検証データについて

以下のコードを使って、データストリームにデータを送信しました。 100ステップのランダムウォークを生成し、ステップ毎にJSON形式のレコードをboto3でデータストリームに送信します。

後述の手順でスタジオノートブック側の準備が完了してから、このコードを使ってデータストリームにデータを送信します。また、ハイライトしたデータストリーム名は自分の作成したデータストリームのものに置き換えます。

random_walk.py

import datetime
import json
import random
import time

import boto3
import pandas as pd

def get_random_data(trial_id, x):
    """ 前の位置からランダムに移動し、送信用のデータと位置を返却する。
    """
    data = {
            "trial_id": trial_id,
            "x": x,
            "event_time": datetime.datetime.now().isoformat()
        }

    x +=  random.choice([-1,1])

    return data, x

def send_data(stream_name, kinesis_client, trial_id, steps):
    """ ランダムウォークを計算しつつ、Kinesisデータストリームにデータを送信する。
    """

    # 初期位置と軌跡格納用のリストを設定しておく。
    x = 0
    trail = []

    # 念のため、ステップ数を制限しておく。
    if steps > 2000:
        print(f"{steps} is too much.")
        return 
    
    # シミュレーションを実行し、逐次でKinesisデータストリームにデータを送信する。
    for i in range(steps):

        # 新しい送信用データを取得する。
        data, x = get_random_data(trial_id, x)
        partition_key = str(data["trial_id"])

        # データを送信する。
        kinesis_client.put_record(
            StreamName=stream_name,
            Data=json.dumps(data),
            PartitionKey=partition_key)

        # 軌跡を記録し、少し間をあける。
        trail.append(data)
        time.sleep(0.05)
        
    # 結果は答え合わせができるようにCSVに保存しておく。
    df_trail = pd.DataFrame(trail)
    df_trail.to_csv("./trail.csv", index=False)

    return

def main():
    # シミュレーションの設定をする。
    trial_id = 1
    steps = 100

    # Kinesisデータストリームへの送信設定と準備をする。
    stream_name = ${作成したデータストリーム名}
    kinesis_client = boto3.client('kinesis')

    print("Start sending data.")

    # シミュレーションと送信を開始する。
    send_data(stream_name, kinesis_client, trial_id, steps)

    print("Successfully finished.")

if __name__ == '__main__':
    main()

データストリームに送信されるレコードは以下のようなJSONになります。何回めの実験かをtrial_idで判別するようなイメージです。

{
    "trial_id": 1,
    "x": 0,
    "event_time": '2021-06-17T18:37:12.712154'
}

生成されるランダムウォークは、例えば以下のようになります。 サンプルの軌跡

やってみた

では、検証した内容を紹介していきます。以下の内容を順番に実施していきました。

データストリームを作成する

まずAWSマネジメントコンソールから、データ受信用のデータストリームを作成しました。 作成したkinesis data stream

Glue データカタログのデータベースを作成する

適当な空のデータベースを作成しておきました。Glueのコンソールで、[データベース]から[データベースの追加]で作成ができます。スタジオノートブックの作成画面からでも作成できます。 作成したデータベース

スタジオノートブックを作成する

作成したデータストリームを選択し、[プロセス]から[リアルタイムでデータを処理]を選択します。

プロセスからの操作

[Apache Flink - スタジオノートブック]を選択し、作成します。 スタジオノートブックを選択する

[Studioノートブックを作成]画面に移るので、ノートブック名などを入力します。 ノートブック設定1

同じ画面で、ソースのメタデータを定義するためのAWS Glueデータベースにどれを使うか聞かれるので、先ほど作成したデータベースを選択します(作成していなければ、[作成]から作成します)。

設定を確認し、ノートブックを作成します。 ノートブック設定2

ちなみに、[Studioノートブックの設定]で「並列度」の記載がありましたが、ノートブックの作成後に[スケーリング]画面で編集が可能です。

[スケーリング]画面の説明によると、

Kinesis Data Analyticsは、アプリケーションを実行するために必要なKPUを次の式を使用してプロビジョニングします。 アプリケーションのKPU = (並列処理)/(KPUあたりの並列処理数)

とのことなので、並列度が多いとその分プロビジョニングされるKPUが増えて費用がかかりそうです。

今回はあくまで検証用でKinesisデータストリームのシャードも1にしていて、トラフィックもほとんどありません。そのため、並列度を1に変更して、KPUが1になるようにしました。 スケーリングの設定

スタジオノートブックを起動する

準備ができたので、[起動]を押して、スタジオノートブックを起動します。 ノートブックの起動

開始されたことが確認できれば、[Apache Zeppelinで開く]を押すと、Zeppelin Notebookのトップ画面が開きます。

Apache Zeppelinで開く

「Create new note」ボタンから"sample_notebook"というノートブックを新規作成しました。

トップ画面

ノートブック新規作成

テーブルを作成する

%flink.ssqlインタプリタでSQLを実行し、walk_dataテーブルを作成しました。 テーブルの作成

このとき、WITH句で'stream'を指定するので、作成されたストリームの名前に変えるようにします。

%flink.ssql

CREATE TABLE walk_data (
    trial_id INTEGER,
    x FLOAT,
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
PARTITIONED BY (trial_id)
WITH (
    'connector' = 'kinesis',
    'stream' = ${作成したデータストリーム名},
    'aws.region' = 'ap-northeast-1',
    'scan.stream.initpos' = 'LATEST',
    'format' = 'json',
    'json.timestamp-format.standard' = 'ISO-8601'
)

Shift + Enterなどでパラグラフを実行した後、データベースを確認すると、テーブルが作成されたことが分かりました。

テーブルの作成結果

可視化してみる

ようやく準備が整ったので、以下のクエリをノートブックで実行し、データが来るのを待ちます。このクエリでwalk_data テーブルのすべてのコンテンツを取得します。新しいデータが到来すると、SELECTの出力が継続的に更新されます。

%flink.ssql(type=update)

SELECT * FROM walk_data;

グラフは散布図を選択し、[setting]から横軸をevent_timeに変えておきます。ちなみに、[setting]の内容はアウトプットが更新されるときに、もとに戻ってしまったので、一度パラグラフの処理を停止し、設定してから再度実行すると思った通りに行きました。

可視化の設定

続いて、ローカルでデータの送信を行います。

python random_walk.py

あとはZeppelinノートの出力を眺めていると、データの到来と共に可視化が始まります。以下のような感じになりました。 データ可視化の様子

ローカルのデータと比較すると、確かに同じデータが流れてきていることが確認できます。 結果の比較

料金

Kinesis Data Analytics Studioの利用料金については、Amazon Kinesis Data Analyticsの料金ページから確認することが出来ます。詳細は該当ページにてご確認ください。

料金 - Amazon Kinesis Data Analytics | AWS

感想

非常に簡単に、データストリームで受信したストリーミングデータをZeppelinで可視化することができました。また、今回は実施しませんでしたが、処理したデータを送信先の Kinesis データストリームに送信したり、Kinesis Analyticsアプリケーションとしてデプロイすることもできるので、そちらの機能も試していきたいです。