この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
データアナリティクス事業本部の鈴木です。
『Amazon Kinesis Data Streams adds capability to easily inspect data records in AWS Management Console』という発表があり、記載されていたKinesis Data Streamsのデータビューワーを試してみました!
開発者ガイドは以下になります。
データストリームの作成
まず、Kinesisのコンソールから、データストリームを作成します。データストリームの作成
を押します。
今回は何シャードかあった方が見やすそうだったので、オンデマンドで作成してみました。データストリーム名を入れて、ほかはデフォルトのまま進み、データストリームの作成
を押しました。
ステータスがアクティブになるまでしばらく待ちます。
データビューワーをみてみる
作成したデータストリームを開いて、データビューワータブを押すと、このような画面になりました。
シャードと開始位置を選んでレコードを取得する形のようです。
今はデータが入っていないのでなにも取得できないため、次にデータを入れていきます。
データストリームにデータを入れてみる
今回は以前の記事で作っておいたスクリプトでデータを投入しました。データを生成して、データストリーム名で指定したデータストリームに、データを送信するスクリプトです。
今回は内容にはこだわりませんが、以下のような実装になります。詳しくは上記ブログをご確認ください。
sample.py
import datetime
import json
import random
import time
import boto3
import pandas as pd
def get_random_data(trial_id, x):
""" 前の位置からランダムに移動し、送信用のデータと位置を返却する。
"""
data = {
"trial_id": trial_id,
"x": x,
"event_time": datetime.datetime.now().isoformat()
}
x += random.choice([-1,1])
return data, x
def send_data(stream_name, kinesis_client, trial_id, steps):
""" ランダムウォークを計算しつつ、Kinesisデータストリームにデータを送信する。
"""
# 初期位置と軌跡格納用のリストを設定しておく。
x = 0
trail = []
# 念のため、ステップ数を制限しておく。
if steps > 2000:
print(f"{steps} is too much.")
return
# シミュレーションを実行し、逐次でKinesisデータストリームにデータを送信する。
for i in range(steps):
# 新しい送信用データを取得する。
data, x = get_random_data(trial_id, x)
partition_key = str(data["trial_id"])
# データを送信する。
kinesis_client.put_record(
StreamName=stream_name,
Data=json.dumps(data),
PartitionKey=partition_key)
# 軌跡を記録し、少し間をあける。
trail.append(data)
time.sleep(0.05)
# 結果は答え合わせができるようにCSVに保存しておく。
df_trail = pd.DataFrame(trail)
df_trail.to_csv("./trail.csv", index=False)
return
def main():
# シミュレーションの設定をする。
trial_id = 1
steps = 100
# Kinesisデータストリームへの送信設定と準備をする。
stream_name = ${作成したデータストリーム名}
kinesis_client = boto3.client('kinesis')
print("Start sending data.")
# シミュレーションと送信を開始する。
send_data(stream_name, kinesis_client, trial_id, steps)
print("Successfully finished.")
if __name__ == '__main__':
main()
ハイライトした64行目を作成したデータストリームの名前に変えて、ローカルPCからスクリプトを実行しました。
python3 sample.py
# Start sending data.
# Successfully finished.
これでデータストリームにデータが入ったので、再度データビューワーに戻り確認してみます。
データビューワーから送信したデータを確認してみる
最後に送信したデータを確認してみました。
シャードを選び、タイムスタンプでデータストリームの作成直後の開始時刻を選び、レコードを取得すると、以下のようにデータが確認できました。
特に開始時刻はJSTで入力してしまって大丈夫でした。
個々のデータをクリックすると、中身を確認できました。rawデータとJSONの2パターンで確認できました。
データストリームの削除
検証したデータストリームが不要な場合は、忘れないうちに削除しておきました。
最後に
今回はKinesis Data Streamsのデータビューワーから、データストリームに送信したデータをコンソール上で確認してみました。UI上から簡単にデータが確認できるのでとても便利でした。この機能があればより誰でも気軽にKinesis Data Streamsを使っていけてとてもいいですね!