この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
ウィスキー、シガー、パイプをこよなく愛する大栗です。
8/10〜11に開催していたのAWS Summit New York 2016のKeynoteの中で、Kinesis Analyticsが利用可能になったという発表がありましたので試してみました。
Kinesis Analytics
Kinesis Analyticsはre:Invent 2015で発表されたストリームデータ処理サービスです。当時は来年に使用可能になるとの発表のみで詳細が不明でしたが、ようやく利用可能になりました。
Kinesis Analyticsはストリームデータに対してSQLで処理を行う機能です。Kinesis Stream + Lambdaでもストリーム処理は可能ですが、基本的には1レコード単位か1回に取り出したデータしかまとめて処理ができませんでした。Kinesis Analyticsでは、データの変換を行ったり、タイムウィンドウの操作も可能になっています。例えば1分毎の合計値を出したり1、過去1分間の最大値を取得する2こともできます。
なお、2016年8月12日現在で利用可能なリージョンは、以下の通りとなっています。
- アイルランド
- 北部バージニア
- オレゴン
Kinesis Analyticsを試す
ドキュメントのGetting Started Exerciseの内容を元に試してみます。
Getting Started Exerciseは以下の流れとなります。
- アプリケーションを作成する
- 入力を設定する
- リアルタイム分析を追加する
- アプリケーションコードを更新する
- 出力を設定する
アプリケーションを作成する
Kinesis Analyticsのアプリケーションを作成します。
Kinesis Analyticsの画面でCreate new application
をクリックします。
アプリケーション名と説明を入力します。ここではAnalytics-01
とします。
入力を設定する
Connect to a source
をクリックします。
Configure a new stream
を選択して、Create demo stream
をクリックします。ここではデモ用データを使用するためCreate demo stream
を選択しています。入力で新規のKinesis Firehoseや新規のKinesis Streamを作成する場合は各々のボタンをクリックします。既存のStreamを使用する場合は不要ですので、ここを飛ばして下さい。
Configure a new stream
を選択するとデモ用Streamが作成されます。
デモ用Streamが作成されたら、Select a stream
を選択します。kinesis-analytics-demo-stream
を選択してSave and continue
をクリックします。既存のStreamを使用する場合は、ここで対象のStreamを選択して下さい。
なお、kinesis-analytics-demo-stream
のデータは以下のようなデータになっています。
{"TICKER_SYMBOL":"ALY","SECTOR":"ENERGY","CHANGE":1.49,"PRICE":19.74},
{"TICKER_SYMBOL":"MMB","SECTOR":"ENERGY","CHANGE":-0.73,"PRICE":22.96},
{"TICKER_SYMBOL":"BNM","SECTOR":"TECHNOLOGY","CHANGE":0.69,"PRICE":172.71},
{"TICKER_SYMBOL":"BAC","SECTOR":"FINANCIAL","CHANGE":-0.28,"PRICE":13.23}
リアルタイム分析を追加する
Go to SQL editor
をクリックします。
ポップアップが表示されるのでYes, start application
をクリックします。
Add SQL from templates
をクリックしてテンプレートからSQLを追加します。
Continuous filter
を選択してAdd this SQL to the editor
をクリックします。
SQLの中には以下の通りです。
このSQLはDESTINATION_SQL_STREAM
Streamを作成しています。
そしてSTREAM_PUMP
を作成して、SOURCE_SQL_STREAM_001
からSIMILARに"TECH"が含まれているデータについてticker_symbol, sector, change, priceをDESTINATION_SQL_STREAM
へ挿入しています。
-- ** Continuous Filter **
-- Performs a continuous filter based on a WHERE condition.
-- .----------. .----------. .----------.
-- | SOURCE | | INSERT | | DESTIN. |
-- Source-->| STREAM |-->| & SELECT |-->| STREAM |-->Destination
-- | | | (PUMP) | | |
-- '----------' '----------' '----------'
-- STREAM (in-application): a continuously updated entity that you can SELECT from and INSERT into like a TABLE
-- PUMP: an entity used to continuously 'SELECT ... FROM' a source STREAM, and INSERT SQL results into an output STREAM
-- Create output stream, which can be used to send to a destination
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (ticker_symbol VARCHAR(4), sector VARCHAR(12), change REAL, price REAL);
-- Create pump to insert into output
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
-- Select all columns from source stream
SELECT STREAM ticker_symbol, sector, change, price
FROM "SOURCE_SQL_STREAM_001"
-- LIKE compares a string to a string pattern (_ matches all char, % matches substring)
-- SIMILAR TO compares string to a regex, may use ESCAPE
WHERE sector SIMILAR TO '%TECH%';
Reading from input stream
をクリックしてStreamデータを読み込みます。
Real-time analytics
タブを見るとDESTINATION_SQL_STREAM
が出力されます。
アプリケーションコードを更新する
次にアプリケーションコードを更新します。
以下のSQLを追加します。
これはDESTINATION_SQL_STREAM_2
Streamを作成しています。
そしてSTREAM_PUMP_2
を作成して、DESTINATION_SQL_STREAM
Streamからticker_symbol, change, priceをDESTINATION_SQL_STREAM_2
に挿入しています。
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM_2"
(ticker_symbol VARCHAR(4),
change DOUBLE,
price DOUBLE);
CREATE OR REPLACE PUMP "STREAM_PUMP_2" AS
INSERT INTO "DESTINATION_SQL_STREAM_2"
SELECT STREAM ticker_symbol, change, price
FROM "DESTINATION_SQL_STREAM";
Save and run SQL
をクリックするとDESTINATION_SQL_STREAM_2
のデータが出力されます。
もししばらく待ってもデータが出力されない時には、Source data
タブでBegin populating stream with sample stock ticker data
をクリックして下さい。サンプルデータが流れ始めます。
さらにSQLを追加してみます。
出力用のStreamとしてAMZN_STREAM
とTGT_STREAM
を作成しています。
AMZN_PUMP
でSIMILARに"AMZN"が含まれるデータをAMZN_STREAM
へ挿入しています。
TGT_PUMP
でSIMILARに"TGT"が含まれるデータをTGT_STREAM
へ挿入しています。
CREATE OR REPLACE STREAM "AMZN_STREAM"
(ticker_symbol VARCHAR(4),
change DOUBLE,
price DOUBLE);
CREATE OR REPLACE PUMP "AMZN_PUMP" AS
INSERT INTO "AMZN_STREAM"
SELECT STREAM ticker_symbol, change, price
FROM "SOURCE_SQL_STREAM_001"
WHERE ticker_symbol SIMILAR TO '%AMZN%';
CREATE OR REPLACE STREAM "TGT_STREAM"
(ticker_symbol VARCHAR(4),
change DOUBLE,
price DOUBLE);
CREATE OR REPLACE PUMP "TGT_PUMP" AS
INSERT INTO "TGT_STREAM"
SELECT STREAM ticker_symbol, change, price
FROM "SOURCE_SQL_STREAM_001"
WHERE ticker_symbol SIMILAR TO '%TGT%';
Save and run SQL
をクリックするとAMZN_STREAM
とTGT_STREAM
のデータが出力されます。
出力を設定する
出力先を設定します。Connect to destination
をクリックします。
ここでは新規にKinesis Firehoseを作成してS3へ保存するためConfigure a new stream
を選択してGo to Kinesis Firehose
をクリックします。
既存のStreamを選択する場合はSelect a stream
を選択して対象のStreamを選んで下さい。
ここでKinesis Firehoseを構成しますが手順は割愛します。詳しくは、以下のエントリーを参照ください。
[新機能]Amazon Kinesis FirehoseでS3にデータを送ってみた #reinvent
Kinesis FirehoseでS3への出力を構成したら、Select a stream
を選択して作成したKinesis Firehoseを選択します。ここでは出力対象のStreamはDESTINATION_SQL_STREAM
としてSave and continue
をクリックします。
これでKinesis Analyticsの設定は完了です。
出力結果を確認する
Kinesis Firehoseで出力したS3のデータを確認します。
以下の様にS3へファイルが出力されます。
ファイルの内容を確認してみます。出力形式にJSONを指定したのでJSONで出力されています。
{"TICKER_SYMBOL":"CVB","SECTOR":"TECHNOLOGY","CHANGE":"-0.23","PRICE":"60.99"}
{"TICKER_SYMBOL":"JKL","SECTOR":"TECHNOLOGY","CHANGE":"-0.02","PRICE":"10.56"}
{"TICKER_SYMBOL":"AAPL","SECTOR":"TECHNOLOGY","CHANGE":"-0.02","PRICE":"122.87"}
{"TICKER_SYMBOL":"IOP","SECTOR":"TECHNOLOGY","CHANGE":"2.43","PRICE":"151.47"}
{"TICKER_SYMBOL":"JKL","SECTOR":"TECHNOLOGY","CHANGE":"-0.13","PRICE":"10.43"}
{"TICKER_SYMBOL":"AMZN","SECTOR":"TECHNOLOGY","CHANGE":"-9.71","PRICE":"601.34"}
{"TICKER_SYMBOL":"DFG","SECTOR":"TECHNOLOGY","CHANGE":"2.34","PRICE":"144.8"}
{"TICKER_SYMBOL":"AMZN","SECTOR":"TECHNOLOGY","CHANGE":"0.23","PRICE":"601.57"}
{"TICKER_SYMBOL":"HJK","SECTOR":"TECHNOLOGY","CHANGE":"0.09","PRICE":"5.85"}
{"TICKER_SYMBOL":"BNM","SECTOR":"TECHNOLOGY","CHANGE":"1.19","PRICE":"182.6"}
{"TICKER_SYMBOL":"AAPL","SECTOR":"TECHNOLOGY","CHANGE":"1.54","PRICE":"124.41"}
{"TICKER_SYMBOL":"JKL","SECTOR":"TECHNOLOGY","CHANGE":"-0.2","PRICE":"10.23"}
{"TICKER_SYMBOL":"JKL","SECTOR":"TECHNOLOGY","CHANGE":"0.08","PRICE":"10.31"}
{"TICKER_SYMBOL":"AMZN","SECTOR":"TECHNOLOGY","CHANGE":"0.16","PRICE":"601.73"}
{"TICKER_SYMBOL":"HJK","SECTOR":"TECHNOLOGY","CHANGE":"0.1","PRICE":"5.95"}
さいごに
Kinesis Firehoseで柔軟なストリーム処理が可能になります。特にSQLという一般的なデータ処理言語を採用しているので導入のハードルが低いのではないかと思います。処理したデータをKinesis Stream経由でElasticsearch Serviceへ出力することもコーディングなしでできるので、リアルタイムダッシュボードも簡単に実現できます。
ストリームデータは処理が面倒なものですが、随分簡単に処理できるようになったみたいです。