【新機能】Kinesis Analyticsが利用可能になりました!

2016.08.12

ウィスキー、シガー、パイプをこよなく愛する大栗です。

8/10〜11に開催していたのAWS Summit New York 2016のKeynoteの中で、Kinesis Analyticsが利用可能になったという発表がありましたので試してみました。

Kinesis Analytics

Kinesis Analyticsはre:Invent 2015で発表されたストリームデータ処理サービスです。当時は来年に使用可能になるとの発表のみで詳細が不明でしたが、ようやく利用可能になりました。

Kinesis Analyticsはストリームデータに対してSQLで処理を行う機能です。Kinesis Stream + Lambdaでもストリーム処理は可能ですが、基本的には1レコード単位か1回に取り出したデータしかまとめて処理ができませんでした。Kinesis Analyticsでは、データの変換を行ったり、タイムウィンドウの操作も可能になっています。例えば1分毎の合計値を出したり1、過去1分間の最大値を取得する2こともできます。

なお、2016年8月12日現在で利用可能なリージョンは、以下の通りとなっています。

  • アイルランド
  • 北部バージニア
  • オレゴン

Kinesis Analyticsを試す

ドキュメントのGetting Started Exerciseの内容を元に試してみます。
Getting Started Exerciseは以下の流れとなります。

  1. アプリケーションを作成する
  2. 入力を設定する
  3. リアルタイム分析を追加する
  4. アプリケーションコードを更新する
  5. 出力を設定する

アプリケーションを作成する

Kinesis Analyticsのアプリケーションを作成します。
Kinesis Analyticsの画面でCreate new applicationをクリックします。

Amazon_Kinesis_Analytics

アプリケーション名と説明を入力します。ここではAnalytics-01とします。

Amazon_Kinesis_Analytics

入力を設定する

Connect to a sourceをクリックします。

Amazon_Kinesis_Analytics

Configure a new streamを選択して、Create demo streamをクリックします。ここではデモ用データを使用するためCreate demo streamを選択しています。入力で新規のKinesis Firehoseや新規のKinesis Streamを作成する場合は各々のボタンをクリックします。既存のStreamを使用する場合は不要ですので、ここを飛ばして下さい。

Configure a new streamを選択するとデモ用Streamが作成されます。

Amazon_Kinesis_Analytics

デモ用Streamが作成されたら、Select a streamを選択します。kinesis-analytics-demo-streamを選択してSave and continueをクリックします。既存のStreamを使用する場合は、ここで対象のStreamを選択して下さい。
なお、kinesis-analytics-demo-streamのデータは以下のようなデータになっています。

{"TICKER_SYMBOL":"ALY","SECTOR":"ENERGY","CHANGE":1.49,"PRICE":19.74},
{"TICKER_SYMBOL":"MMB","SECTOR":"ENERGY","CHANGE":-0.73,"PRICE":22.96},
{"TICKER_SYMBOL":"BNM","SECTOR":"TECHNOLOGY","CHANGE":0.69,"PRICE":172.71},
{"TICKER_SYMBOL":"BAC","SECTOR":"FINANCIAL","CHANGE":-0.28,"PRICE":13.23}

Amazon_Kinesis_Analytics

リアルタイム分析を追加する

Go to SQL editorをクリックします。

Amazon_Kinesis_Analytics

ポップアップが表示されるのでYes, start applicationをクリックします。

Amazon_Kinesis_Analytics

Add SQL from templatesをクリックしてテンプレートからSQLを追加します。

Amazon_Kinesis_Analytics

Continuous filterを選択してAdd this SQL to the editorをクリックします。
SQLの中には以下の通りです。
このSQLはDESTINATION_SQL_STREAMStreamを作成しています。
そしてSTREAM_PUMPを作成して、SOURCE_SQL_STREAM_001からSIMILARに"TECH"が含まれているデータについてticker_symbol, sector, change, priceをDESTINATION_SQL_STREAMへ挿入しています。

-- ** Continuous Filter ** 
-- Performs a continuous filter based on a WHERE condition.
--          .----------.   .----------.   .----------.              
--          |  SOURCE  |   |  INSERT  |   |  DESTIN. |              
-- Source-->|  STREAM  |-->| & SELECT |-->|  STREAM  |-->Destination
--          |          |   |  (PUMP)  |   |          |              
--          '----------'   '----------'   '----------'               
-- STREAM (in-application): a continuously updated entity that you can SELECT from and INSERT into like a TABLE
-- PUMP: an entity used to continuously 'SELECT ... FROM' a source STREAM, and INSERT SQL results into an output STREAM
-- Create output stream, which can be used to send to a destination
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (ticker_symbol VARCHAR(4), sector VARCHAR(12), change REAL, price REAL);
-- Create pump to insert into output 
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
-- Select all columns from source stream
SELECT STREAM ticker_symbol, sector, change, price
FROM "SOURCE_SQL_STREAM_001"
-- LIKE compares a string to a string pattern (_ matches all char, % matches substring)
-- SIMILAR TO compares string to a regex, may use ESCAPE
WHERE sector SIMILAR TO '%TECH%';

Amazon_Kinesis_Analytics

Reading from input streamをクリックしてStreamデータを読み込みます。

Amazon_Kinesis_Analytics

Real-time analyticsタブを見るとDESTINATION_SQL_STREAMが出力されます。

Amazon_Kinesis_Analytics

アプリケーションコードを更新する

次にアプリケーションコードを更新します。
以下のSQLを追加します。
これはDESTINATION_SQL_STREAM_2Streamを作成しています。
そしてSTREAM_PUMP_2を作成して、DESTINATION_SQL_STREAMStreamからticker_symbol, change, priceをDESTINATION_SQL_STREAM_2に挿入しています。

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM_2" 
           (ticker_symbol VARCHAR(4), 
            change        DOUBLE, 
            price         DOUBLE);

CREATE OR REPLACE PUMP "STREAM_PUMP_2" AS 
   INSERT INTO "DESTINATION_SQL_STREAM_2"
      SELECT STREAM ticker_symbol, change, price 
      FROM   "DESTINATION_SQL_STREAM";

Save and run SQLをクリックするとDESTINATION_SQL_STREAM_2のデータが出力されます。

Amazon_Kinesis_Analytics

もししばらく待ってもデータが出力されない時には、Source dataタブでBegin populating stream with sample stock ticker dataをクリックして下さい。サンプルデータが流れ始めます。

Amazon_Kinesis_Analytics

さらにSQLを追加してみます。
出力用のStreamとしてAMZN_STREAMTGT_STREAMを作成しています。
AMZN_PUMPでSIMILARに"AMZN"が含まれるデータをAMZN_STREAMへ挿入しています。
TGT_PUMPでSIMILARに"TGT"が含まれるデータをTGT_STREAMへ挿入しています。

CREATE OR REPLACE STREAM "AMZN_STREAM" 
           (ticker_symbol VARCHAR(4), 
            change        DOUBLE, 
            price         DOUBLE);

CREATE OR REPLACE PUMP "AMZN_PUMP" AS 
   INSERT INTO "AMZN_STREAM"
      SELECT STREAM ticker_symbol, change, price 
      FROM   "SOURCE_SQL_STREAM_001"
      WHERE  ticker_symbol SIMILAR TO '%AMZN%';

CREATE OR REPLACE STREAM "TGT_STREAM" 
           (ticker_symbol VARCHAR(4), 
            change        DOUBLE, 
            price         DOUBLE);

CREATE OR REPLACE PUMP "TGT_PUMP" AS 
   INSERT INTO "TGT_STREAM"
      SELECT STREAM ticker_symbol, change, price 
      FROM   "SOURCE_SQL_STREAM_001"
      WHERE  ticker_symbol SIMILAR TO '%TGT%';

Save and run SQLをクリックするとAMZN_STREAMTGT_STREAMのデータが出力されます。

Amazon_Kinesis_Analytics

出力を設定する

出力先を設定します。Connect to destinationをクリックします。

Amazon_Kinesis_Analytics

ここでは新規にKinesis Firehoseを作成してS3へ保存するためConfigure a new streamを選択してGo to Kinesis Firehoseをクリックします。
既存のStreamを選択する場合はSelect a streamを選択して対象のStreamを選んで下さい。

Amazon_Kinesis_Analytics

ここでKinesis Firehoseを構成しますが手順は割愛します。詳しくは、以下のエントリーを参照ください。

[新機能]Amazon Kinesis FirehoseでS3にデータを送ってみた #reinvent

Amazon_Kinesis_Firehose

Kinesis FirehoseでS3への出力を構成したら、Select a streamを選択して作成したKinesis Firehoseを選択します。ここでは出力対象のStreamはDESTINATION_SQL_STREAMとしてSave and continueをクリックします。

Amazon_Kinesis_Analytics

これでKinesis Analyticsの設定は完了です。

Amazon_Kinesis_Analytics

出力結果を確認する

Kinesis Firehoseで出力したS3のデータを確認します。
以下の様にS3へファイルが出力されます。

S3_Management_Console

ファイルの内容を確認してみます。出力形式にJSONを指定したのでJSONで出力されています。

{"TICKER_SYMBOL":"CVB","SECTOR":"TECHNOLOGY","CHANGE":"-0.23","PRICE":"60.99"}
{"TICKER_SYMBOL":"JKL","SECTOR":"TECHNOLOGY","CHANGE":"-0.02","PRICE":"10.56"}
{"TICKER_SYMBOL":"AAPL","SECTOR":"TECHNOLOGY","CHANGE":"-0.02","PRICE":"122.87"}
{"TICKER_SYMBOL":"IOP","SECTOR":"TECHNOLOGY","CHANGE":"2.43","PRICE":"151.47"}
{"TICKER_SYMBOL":"JKL","SECTOR":"TECHNOLOGY","CHANGE":"-0.13","PRICE":"10.43"}
{"TICKER_SYMBOL":"AMZN","SECTOR":"TECHNOLOGY","CHANGE":"-9.71","PRICE":"601.34"}
{"TICKER_SYMBOL":"DFG","SECTOR":"TECHNOLOGY","CHANGE":"2.34","PRICE":"144.8"}
{"TICKER_SYMBOL":"AMZN","SECTOR":"TECHNOLOGY","CHANGE":"0.23","PRICE":"601.57"}
{"TICKER_SYMBOL":"HJK","SECTOR":"TECHNOLOGY","CHANGE":"0.09","PRICE":"5.85"}
{"TICKER_SYMBOL":"BNM","SECTOR":"TECHNOLOGY","CHANGE":"1.19","PRICE":"182.6"}
{"TICKER_SYMBOL":"AAPL","SECTOR":"TECHNOLOGY","CHANGE":"1.54","PRICE":"124.41"}
{"TICKER_SYMBOL":"JKL","SECTOR":"TECHNOLOGY","CHANGE":"-0.2","PRICE":"10.23"}
{"TICKER_SYMBOL":"JKL","SECTOR":"TECHNOLOGY","CHANGE":"0.08","PRICE":"10.31"}
{"TICKER_SYMBOL":"AMZN","SECTOR":"TECHNOLOGY","CHANGE":"0.16","PRICE":"601.73"}
{"TICKER_SYMBOL":"HJK","SECTOR":"TECHNOLOGY","CHANGE":"0.1","PRICE":"5.95"}

さいごに

Kinesis Firehoseで柔軟なストリーム処理が可能になります。特にSQLという一般的なデータ処理言語を採用しているので導入のハードルが低いのではないかと思います。処理したデータをKinesis Stream経由でElasticsearch Serviceへ出力することもコーディングなしでできるので、リアルタイムダッシュボードも簡単に実現できます。
ストリームデータは処理が面倒なものですが、随分簡単に処理できるようになったみたいです。