Kinesis Data Streamsのデータビューワーを使って格納されたデータを確認してみた
データアナリティクス事業本部の鈴木です。
『Amazon Kinesis Data Streams adds capability to easily inspect data records in AWS Management Console』という発表があり、記載されていたKinesis Data Streamsのデータビューワーを試してみました!
開発者ガイドは以下になります。
データストリームの作成
まず、Kinesisのコンソールから、データストリームを作成します。データストリームの作成
を押します。
今回は何シャードかあった方が見やすそうだったので、オンデマンドで作成してみました。データストリーム名を入れて、ほかはデフォルトのまま進み、データストリームの作成
を押しました。
ステータスがアクティブになるまでしばらく待ちます。
データビューワーをみてみる
作成したデータストリームを開いて、データビューワータブを押すと、このような画面になりました。
シャードと開始位置を選んでレコードを取得する形のようです。
今はデータが入っていないのでなにも取得できないため、次にデータを入れていきます。
データストリームにデータを入れてみる
今回は以前の記事で作っておいたスクリプトでデータを投入しました。データを生成して、データストリーム名で指定したデータストリームに、データを送信するスクリプトです。
今回は内容にはこだわりませんが、以下のような実装になります。詳しくは上記ブログをご確認ください。
import datetime import json import random import time import boto3 import pandas as pd def get_random_data(trial_id, x): """ 前の位置からランダムに移動し、送信用のデータと位置を返却する。 """ data = { "trial_id": trial_id, "x": x, "event_time": datetime.datetime.now().isoformat() } x += random.choice([-1,1]) return data, x def send_data(stream_name, kinesis_client, trial_id, steps): """ ランダムウォークを計算しつつ、Kinesisデータストリームにデータを送信する。 """ # 初期位置と軌跡格納用のリストを設定しておく。 x = 0 trail = [] # 念のため、ステップ数を制限しておく。 if steps > 2000: print(f"{steps} is too much.") return # シミュレーションを実行し、逐次でKinesisデータストリームにデータを送信する。 for i in range(steps): # 新しい送信用データを取得する。 data, x = get_random_data(trial_id, x) partition_key = str(data["trial_id"]) # データを送信する。 kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey=partition_key) # 軌跡を記録し、少し間をあける。 trail.append(data) time.sleep(0.05) # 結果は答え合わせができるようにCSVに保存しておく。 df_trail = pd.DataFrame(trail) df_trail.to_csv("./trail.csv", index=False) return def main(): # シミュレーションの設定をする。 trial_id = 1 steps = 100 # Kinesisデータストリームへの送信設定と準備をする。 stream_name = ${作成したデータストリーム名} kinesis_client = boto3.client('kinesis') print("Start sending data.") # シミュレーションと送信を開始する。 send_data(stream_name, kinesis_client, trial_id, steps) print("Successfully finished.") if __name__ == '__main__': main()
ハイライトした64行目を作成したデータストリームの名前に変えて、ローカルPCからスクリプトを実行しました。
python3 sample.py # Start sending data. # Successfully finished.
これでデータストリームにデータが入ったので、再度データビューワーに戻り確認してみます。
データビューワーから送信したデータを確認してみる
最後に送信したデータを確認してみました。
シャードを選び、タイムスタンプでデータストリームの作成直後の開始時刻を選び、レコードを取得すると、以下のようにデータが確認できました。
特に開始時刻はJSTで入力してしまって大丈夫でした。
個々のデータをクリックすると、中身を確認できました。rawデータとJSONの2パターンで確認できました。
データストリームの削除
検証したデータストリームが不要な場合は、忘れないうちに削除しておきました。
最後に
今回はKinesis Data Streamsのデータビューワーから、データストリームに送信したデータをコンソール上で確認してみました。UI上から簡単にデータが確認できるのでとても便利でした。この機能があればより誰でも気軽にKinesis Data Streamsを使っていけてとてもいいですね!