Kinesis Data Streamsのデータビューワーを使って格納されたデータを確認してみた

2022.11.01

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

データアナリティクス事業本部の鈴木です。

Amazon Kinesis Data Streams adds capability to easily inspect data records in AWS Management Console』という発表があり、記載されていたKinesis Data Streamsのデータビューワーを試してみました!

開発者ガイドは以下になります。

データストリームの作成

まず、Kinesisのコンソールから、データストリームを作成します。データストリームの作成を押します。

データストリームの作成

今回は何シャードかあった方が見やすそうだったので、オンデマンドで作成してみました。データストリーム名を入れて、ほかはデフォルトのまま進み、データストリームの作成を押しました。

データストリームの作成1

データストリームの作成2

ステータスがアクティブになるまでしばらく待ちます。

データストリームの作成3

データビューワーをみてみる

作成したデータストリームを開いて、データビューワータブを押すと、このような画面になりました。

データビューワータブ

シャードと開始位置を選んでレコードを取得する形のようです。

シャードの選択

開始位置の選択

今はデータが入っていないのでなにも取得できないため、次にデータを入れていきます。

データストリームにデータを入れてみる

今回は以前の記事で作っておいたスクリプトでデータを投入しました。データを生成して、データストリーム名で指定したデータストリームに、データを送信するスクリプトです。

今回は内容にはこだわりませんが、以下のような実装になります。詳しくは上記ブログをご確認ください。

sample.py

import datetime
import json
import random
import time

import boto3
import pandas as pd


def get_random_data(trial_id, x):
    """ 前の位置からランダムに移動し、送信用のデータと位置を返却する。
    """
    data = {
            "trial_id": trial_id,
            "x": x,
            "event_time": datetime.datetime.now().isoformat()
        }
    x +=  random.choice([-1,1])
    return data, x


def send_data(stream_name, kinesis_client, trial_id, steps):
    """ ランダムウォークを計算しつつ、Kinesisデータストリームにデータを送信する。
    """

    # 初期位置と軌跡格納用のリストを設定しておく。
    x = 0
    trail = []

    # 念のため、ステップ数を制限しておく。
    if steps > 2000:
        print(f"{steps} is too much.")
        return 
    
    # シミュレーションを実行し、逐次でKinesisデータストリームにデータを送信する。
    for i in range(steps):

        # 新しい送信用データを取得する。
        data, x = get_random_data(trial_id, x)
        partition_key = str(data["trial_id"])

        # データを送信する。
        kinesis_client.put_record(
            StreamName=stream_name,
            Data=json.dumps(data),
            PartitionKey=partition_key)

        # 軌跡を記録し、少し間をあける。
        trail.append(data)
        time.sleep(0.05)
        
    # 結果は答え合わせができるようにCSVに保存しておく。
    df_trail = pd.DataFrame(trail)
    df_trail.to_csv("./trail.csv", index=False)
    return


def main():
    # シミュレーションの設定をする。
    trial_id = 1
    steps = 100

    # Kinesisデータストリームへの送信設定と準備をする。
    stream_name = ${作成したデータストリーム名}
    kinesis_client = boto3.client('kinesis')

    print("Start sending data.")

    # シミュレーションと送信を開始する。
    send_data(stream_name, kinesis_client, trial_id, steps)

    print("Successfully finished.")


if __name__ == '__main__':
    main()

ハイライトした64行目を作成したデータストリームの名前に変えて、ローカルPCからスクリプトを実行しました。

python3 sample.py
# Start sending data.
# Successfully finished.

これでデータストリームにデータが入ったので、再度データビューワーに戻り確認してみます。

データビューワーから送信したデータを確認してみる

最後に送信したデータを確認してみました。

シャードを選び、タイムスタンプでデータストリームの作成直後の開始時刻を選び、レコードを取得すると、以下のようにデータが確認できました。

データの確認結果

特に開始時刻はJSTで入力してしまって大丈夫でした。

個々のデータをクリックすると、中身を確認できました。rawデータとJSONの2パターンで確認できました。

rawデータ

JSON

データストリームの削除

検証したデータストリームが不要な場合は、忘れないうちに削除しておきました。

データストリームの削除

最後に

今回はKinesis Data Streamsのデータビューワーから、データストリームに送信したデータをコンソール上で確認してみました。UI上から簡単にデータが確認できるのでとても便利でした。この機能があればより誰でも気軽にKinesis Data Streamsを使っていけてとてもいいですね!