【新機能】Kinesis Analyticsが利用可能になりました!
この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
ウィスキー、シガー、パイプをこよなく愛する大栗です。
8/10〜11に開催していたのAWS Summit New York 2016のKeynoteの中で、Kinesis Analyticsが利用可能になったという発表がありましたので試してみました。
Kinesis Analytics
Kinesis Analyticsはre:Invent 2015で発表されたストリームデータ処理サービスです。当時は来年に使用可能になるとの発表のみで詳細が不明でしたが、ようやく利用可能になりました。
Kinesis Analyticsはストリームデータに対してSQLで処理を行う機能です。Kinesis Stream + Lambdaでもストリーム処理は可能ですが、基本的には1レコード単位か1回に取り出したデータしかまとめて処理ができませんでした。Kinesis Analyticsでは、データの変換を行ったり、タイムウィンドウの操作も可能になっています。例えば1分毎の合計値を出したり1、過去1分間の最大値を取得する2こともできます。
なお、2016年8月12日現在で利用可能なリージョンは、以下の通りとなっています。
- アイルランド
- 北部バージニア
- オレゴン
Kinesis Analyticsを試す
ドキュメントのGetting Started Exerciseの内容を元に試してみます。
Getting Started Exerciseは以下の流れとなります。
- アプリケーションを作成する
- 入力を設定する
- リアルタイム分析を追加する
- アプリケーションコードを更新する
- 出力を設定する
アプリケーションを作成する
Kinesis Analyticsのアプリケーションを作成します。
Kinesis Analyticsの画面でCreate new applicationをクリックします。

アプリケーション名と説明を入力します。ここではAnalytics-01とします。

入力を設定する
Connect to a sourceをクリックします。

Configure a new streamを選択して、Create demo streamをクリックします。ここではデモ用データを使用するためCreate demo streamを選択しています。入力で新規のKinesis Firehoseや新規のKinesis Streamを作成する場合は各々のボタンをクリックします。既存のStreamを使用する場合は不要ですので、ここを飛ばして下さい。
Configure a new streamを選択するとデモ用Streamが作成されます。

デモ用Streamが作成されたら、Select a streamを選択します。kinesis-analytics-demo-streamを選択してSave and continueをクリックします。既存のStreamを使用する場合は、ここで対象のStreamを選択して下さい。
なお、kinesis-analytics-demo-streamのデータは以下のようなデータになっています。
{"TICKER_SYMBOL":"ALY","SECTOR":"ENERGY","CHANGE":1.49,"PRICE":19.74},
{"TICKER_SYMBOL":"MMB","SECTOR":"ENERGY","CHANGE":-0.73,"PRICE":22.96},
{"TICKER_SYMBOL":"BNM","SECTOR":"TECHNOLOGY","CHANGE":0.69,"PRICE":172.71},
{"TICKER_SYMBOL":"BAC","SECTOR":"FINANCIAL","CHANGE":-0.28,"PRICE":13.23}
リアルタイム分析を追加する
Go to SQL editorをクリックします。

ポップアップが表示されるのでYes, start applicationをクリックします。

Add SQL from templatesをクリックしてテンプレートからSQLを追加します。

Continuous filterを選択してAdd this SQL to the editorをクリックします。
SQLの中には以下の通りです。
このSQLはDESTINATION_SQL_STREAMStreamを作成しています。
そしてSTREAM_PUMPを作成して、SOURCE_SQL_STREAM_001からSIMILARに"TECH"が含まれているデータについてticker_symbol, sector, change, priceをDESTINATION_SQL_STREAMへ挿入しています。
-- ** Continuous Filter ** -- Performs a continuous filter based on a WHERE condition. -- .----------. .----------. .----------. -- | SOURCE | | INSERT | | DESTIN. | -- Source-->| STREAM |-->| & SELECT |-->| STREAM |-->Destination -- | | | (PUMP) | | | -- '----------' '----------' '----------' -- STREAM (in-application): a continuously updated entity that you can SELECT from and INSERT into like a TABLE -- PUMP: an entity used to continuously 'SELECT ... FROM' a source STREAM, and INSERT SQL results into an output STREAM -- Create output stream, which can be used to send to a destination CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (ticker_symbol VARCHAR(4), sector VARCHAR(12), change REAL, price REAL); -- Create pump to insert into output CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" -- Select all columns from source stream SELECT STREAM ticker_symbol, sector, change, price FROM "SOURCE_SQL_STREAM_001" -- LIKE compares a string to a string pattern (_ matches all char, % matches substring) -- SIMILAR TO compares string to a regex, may use ESCAPE WHERE sector SIMILAR TO '%TECH%';

Reading from input streamをクリックしてStreamデータを読み込みます。

Real-time analyticsタブを見るとDESTINATION_SQL_STREAMが出力されます。

アプリケーションコードを更新する
次にアプリケーションコードを更新します。
以下のSQLを追加します。
これはDESTINATION_SQL_STREAM_2Streamを作成しています。
そしてSTREAM_PUMP_2を作成して、DESTINATION_SQL_STREAMStreamからticker_symbol, change, priceをDESTINATION_SQL_STREAM_2に挿入しています。
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM_2"
(ticker_symbol VARCHAR(4),
change DOUBLE,
price DOUBLE);
CREATE OR REPLACE PUMP "STREAM_PUMP_2" AS
INSERT INTO "DESTINATION_SQL_STREAM_2"
SELECT STREAM ticker_symbol, change, price
FROM "DESTINATION_SQL_STREAM";
Save and run SQLをクリックするとDESTINATION_SQL_STREAM_2のデータが出力されます。

もししばらく待ってもデータが出力されない時には、Source dataタブでBegin populating stream with sample stock ticker dataをクリックして下さい。サンプルデータが流れ始めます。

さらにSQLを追加してみます。
出力用のStreamとしてAMZN_STREAMとTGT_STREAMを作成しています。
AMZN_PUMPでSIMILARに"AMZN"が含まれるデータをAMZN_STREAMへ挿入しています。
TGT_PUMPでSIMILARに"TGT"が含まれるデータをTGT_STREAMへ挿入しています。
CREATE OR REPLACE STREAM "AMZN_STREAM"
(ticker_symbol VARCHAR(4),
change DOUBLE,
price DOUBLE);
CREATE OR REPLACE PUMP "AMZN_PUMP" AS
INSERT INTO "AMZN_STREAM"
SELECT STREAM ticker_symbol, change, price
FROM "SOURCE_SQL_STREAM_001"
WHERE ticker_symbol SIMILAR TO '%AMZN%';
CREATE OR REPLACE STREAM "TGT_STREAM"
(ticker_symbol VARCHAR(4),
change DOUBLE,
price DOUBLE);
CREATE OR REPLACE PUMP "TGT_PUMP" AS
INSERT INTO "TGT_STREAM"
SELECT STREAM ticker_symbol, change, price
FROM "SOURCE_SQL_STREAM_001"
WHERE ticker_symbol SIMILAR TO '%TGT%';
Save and run SQLをクリックするとAMZN_STREAMとTGT_STREAMのデータが出力されます。

出力を設定する
出力先を設定します。Connect to destinationをクリックします。

ここでは新規にKinesis Firehoseを作成してS3へ保存するためConfigure a new streamを選択してGo to Kinesis Firehoseをクリックします。
既存のStreamを選択する場合はSelect a streamを選択して対象のStreamを選んで下さい。

ここでKinesis Firehoseを構成しますが手順は割愛します。詳しくは、以下のエントリーを参照ください。
[新機能]Amazon Kinesis FirehoseでS3にデータを送ってみた #reinvent

Kinesis FirehoseでS3への出力を構成したら、Select a streamを選択して作成したKinesis Firehoseを選択します。ここでは出力対象のStreamはDESTINATION_SQL_STREAMとしてSave and continueをクリックします。

これでKinesis Analyticsの設定は完了です。

出力結果を確認する
Kinesis Firehoseで出力したS3のデータを確認します。
以下の様にS3へファイルが出力されます。

ファイルの内容を確認してみます。出力形式にJSONを指定したのでJSONで出力されています。
{"TICKER_SYMBOL":"CVB","SECTOR":"TECHNOLOGY","CHANGE":"-0.23","PRICE":"60.99"}
{"TICKER_SYMBOL":"JKL","SECTOR":"TECHNOLOGY","CHANGE":"-0.02","PRICE":"10.56"}
{"TICKER_SYMBOL":"AAPL","SECTOR":"TECHNOLOGY","CHANGE":"-0.02","PRICE":"122.87"}
{"TICKER_SYMBOL":"IOP","SECTOR":"TECHNOLOGY","CHANGE":"2.43","PRICE":"151.47"}
{"TICKER_SYMBOL":"JKL","SECTOR":"TECHNOLOGY","CHANGE":"-0.13","PRICE":"10.43"}
{"TICKER_SYMBOL":"AMZN","SECTOR":"TECHNOLOGY","CHANGE":"-9.71","PRICE":"601.34"}
{"TICKER_SYMBOL":"DFG","SECTOR":"TECHNOLOGY","CHANGE":"2.34","PRICE":"144.8"}
{"TICKER_SYMBOL":"AMZN","SECTOR":"TECHNOLOGY","CHANGE":"0.23","PRICE":"601.57"}
{"TICKER_SYMBOL":"HJK","SECTOR":"TECHNOLOGY","CHANGE":"0.09","PRICE":"5.85"}
{"TICKER_SYMBOL":"BNM","SECTOR":"TECHNOLOGY","CHANGE":"1.19","PRICE":"182.6"}
{"TICKER_SYMBOL":"AAPL","SECTOR":"TECHNOLOGY","CHANGE":"1.54","PRICE":"124.41"}
{"TICKER_SYMBOL":"JKL","SECTOR":"TECHNOLOGY","CHANGE":"-0.2","PRICE":"10.23"}
{"TICKER_SYMBOL":"JKL","SECTOR":"TECHNOLOGY","CHANGE":"0.08","PRICE":"10.31"}
{"TICKER_SYMBOL":"AMZN","SECTOR":"TECHNOLOGY","CHANGE":"0.16","PRICE":"601.73"}
{"TICKER_SYMBOL":"HJK","SECTOR":"TECHNOLOGY","CHANGE":"0.1","PRICE":"5.95"}
さいごに
Kinesis Firehoseで柔軟なストリーム処理が可能になります。特にSQLという一般的なデータ処理言語を採用しているので導入のハードルが低いのではないかと思います。処理したデータをKinesis Stream経由でElasticsearch Serviceへ出力することもコーディングなしでできるので、リアルタイムダッシュボードも簡単に実現できます。
ストリームデータは処理が面倒なものですが、随分簡単に処理できるようになったみたいです。








