リアルタイム分析がやりたい!はじめての Kinesis Data Analytics

Kinesis Data Analytics はじめました。
2020.03.23

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

先日のストリーミングズンドコ記事が面白くて、Kinesis Data Analytics を触ってみたくなりました。

これまで Kinesis Data Analytics を使った経験はないので初心者目線でまとめてみたつもりです。同じように「これからはじめてみよう!」という方の参考にもなるかと思います。

Kinesis Data Analytics とは?

簡単に言ってしまうと、SQL や Java(Apache Flink) を使ってストリーミングデータに対してリアルタイム分析を行うことが出来るマネージドサービスです。

サービスとしては 2016年8月に GA になっていますが、東京リージョンで利用可能になったのは 2019年1月 ですので、まだ 1年ちょっとですね。

ユースケースとしては「ストリーミング ETL」「継続的メトリック生成」「リアルタイム応答分析」など、幅広いストリーミング処理に利用できます。

Kinesis Data Analytics の主要コンポーネントと用語

Kinesis Data Analytics の基本的な構成は下記のようになります。アプリケーションには SQL と Java がありますが、今回は SQL アプリケーションをベースにしています。

ストリーミングソース:

Kinesis Data Streams または、Kinesis Data Firehose のいずれか

アプリケーション:

ストリーミング分析のための一連の処理パイプライン。アプリケーション内部ストリーム(入力/出力/エラー)、アプリケーションコード、参照テーブルなどで構成

アプリケーション内部ストリーム:

ストリーミングソースからマッピングされたアプリケーション内だけのデータストリーム SQL を使用してクエリ実行できるテーブルのように機能する

ポンプ:

アプリケーション内ストリームから、別のアプリケーションストリームにデータを挿入するクエリの実行

ストリーミングデスティネーション:

Kinesis Data Streams、Kinesis Data Firehose、Lambda 関数に出力することで、アプリケーション内ストリームの書き込みを永続化

チュートリアルをやってみる

Kinesis Data Analytics のチュートリアル向けにデモストリームが用意されていますので、いまストリーミングデータの持ち合わせが無くても簡単に Kinesis Data Analytics を試せます。

それでは、やってみましょう!

ストリーミングソースの設定

Kinesis 管理コンソールの左メニューから [Data Analytics] を開き、Create application をクリック。[Application name] に任意の名前を入力し、[Runtime] は SQL を選択します。さいごに Create application をクリックして Kinesis Data Analytics アプリケーションを作成します。

作成したアプリケーションを開き、Connect streaming data をクリックします。Configuire a new stream を選択すると、Create a demo stream が表示されますので、これをクリックします。

この操作によって「デモ用ストリームの作成」「株のティッカーデータのストリーミング開始」「スキーマー定義」まで自動的に作成してくれます。例えばスキーマーは以下のようになります。

生データとしては以下のような JSON データが流れています。

{"TICKER_SYMBOL":"QXZ","SECTOR":"HEALTHCARE","CHANGE":-5.18,"PRICE":81.56},
{"TICKER_SYMBOL":"HJK","SECTOR":"TECHNOLOGY","CHANGE":-0.06,"PRICE":4.72},
{"TICKER_SYMBOL":"MMB","SECTOR":"ENERGY","CHANGE":0.72,"PRICE":22.07},
{"TICKER_SYMBOL":"IOP","SECTOR":"TECHNOLOGY","CHANGE":-1.3,"PRICE":117.61},
{"TICKER_SYMBOL":"WSB","SECTOR":"FINANCIAL","CHANGE":2.49,"PRICE":113.78},
{"TICKER_SYMBOL":"PJN","SECTOR":"RETAIL","CHANGE":1.56,"PRICE":34.36},
{"TICKER_SYMBOL":"WFC","SECTOR":"FINANCIAL","CHANGE":-0.01,"PRICE":46.78},
{"TICKER_SYMBOL":"XTC","SECTOR":"HEALTHCARE","CHANGE":7.16,"PRICE":114.24},
{"TICKER_SYMBOL":"PLM","SECTOR":"FINANCIAL","CHANGE":0.48,"PRICE":19.86},
{"TICKER_SYMBOL":"NFS","SECTOR":"ENERGY","CHANGE":0.87,"PRICE":101.82}

これを先程のスキーマを介することで、以下のような SQL クエリ可能なテーブルデータになり、アプリケーション内ストリームに流れます。

Save and coninue で次へ進みます

リアルタイム分析の設定

次にアプリケーションのコア部分でもある、アプリケーションコードの設定を行います。Go to SQL editor をクリック。

SQL アプリケーションを稼働することの確認メッセージが表示されるので、Yes, start application をクリック

エディタ部分に SQL アプリケーションコードを書くのですが、テンプレートを利用すると簡単に作成できます。Add SQL from templates を開き、いくつかのテンプレートが表示されます。今回は Continuous filter を選択し、Add this SQL to the edtor をクリック。

SQL をざっくり解説

-- ** Continuous Filter ** 
-- Performs a continuous filter based on a WHERE condition.
--          .----------.   .----------.   .----------.              
--          |  SOURCE  |   |  INSERT  |   |  DESTIN. |              
-- Source-->|  STREAM  |-->| & SELECT |-->|  STREAM  |-->Destination
--          |          |   |  (PUMP)  |   |          |              
--          '----------'   '----------'   '----------'               
-- STREAM (in-application): a continuously updated entity that you can SELECT from and INSERT into like a TABLE
-- PUMP: an entity used to continuously 'SELECT ... FROM' a source STREAM, and INSERT SQL results into an output STREAM
-- Create output stream, which can be used to send to a destination
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (ticker_symbol VARCHAR(4), sector VARCHAR(12), change REAL, price REAL);
-- Create pump to insert into output 
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
-- Select all columns from source stream
SELECT STREAM ticker_symbol, sector, change, price
FROM "SOURCE_SQL_STREAM_001"
-- LIKE compares a string to a string pattern (_ matches all char, % matches substring)
-- SIMILAR TO compares string to a regex, may use ESCAPE
WHERE sector SIMILAR TO '%TECH%';

11 行目の CREATE OR REPLACE STREAM がアプリケーション内(出力)ストリームを作成している部分です。

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (ticker_symbol VARCHAR(4), sector VARCHAR(12), change REAL, price REAL);

13 行目以降の CREATE OR REPLACE PUMP がアプリケーション内(入力)ストリームの SOURCE_SQL_STREAM_001 から、sectorTECH が含まれるものを吸い上げて、先述の DESTINATION_SQL_STREAM に流すためのポンプになります

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
-- Select all columns from source stream
SELECT STREAM ticker_symbol, sector, change, price
FROM "SOURCE_SQL_STREAM_001"
-- LIKE compares a string to a string pattern (_ matches all char, % matches substring)
-- SIMILAR TO compares string to a regex, may use ESCAPE
WHERE sector SIMILAR TO '%TECH%';

SQL の内容を理解したところで、実際に SQL アプリケーションに反映するために Save and run SQL をクリックします。

SQL アプリケーションが設定されたことを確認するために、Real-time analytics タブを開きます。しばらくすると、先程の SQL アプリケーションコードのとおり sectorTECH が含まれるストリームのみが DESTINATION_SQL_STREAM へと次々に流れてくることが確認できます。

SQL アプリケーションコードの更新

SQL アプリケーションコードの更新はエディタにそのまま書き込みます。以下のように ticker_symbol, change, price のみを抽出し、新たなアプリケーション内ストリーム DESTINATION_SQL_STREAM_2 を作成します。

また STREAM_PUMP_2 のソースが DESTINATION_SQL_STREAM になっていることにも注目してください。

つまり SQL アプリケーションコード内で作成したアプリケーション内ストリームを、処理の中間ストリームとして利用できるということですね。

先程の SQL 文の最終行に、以下を追加して Save and run SQL をクリックします。

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM_2" 
           (ticker_symbol VARCHAR(4), 
            change        DOUBLE, 
            price         DOUBLE);

CREATE OR REPLACE PUMP "STREAM_PUMP_2" AS 
   INSERT INTO "DESTINATION_SQL_STREAM_2"
      SELECT STREAM ticker_symbol, change, price 
      FROM   "DESTINATION_SQL_STREAM";

Real-time analytics タブを開くと、新たなアプリケーション内ストリーム DESTINATION_SQL_STREAM_2 が追加され、意図したデータのみが抽出されて流れてきていることが確認できます。

このデータには sector 列がありませんが、参照しているソースは WHERE sector SIMILAR TO '%TECH%’ で抽出したアプリケーション内ストリーム DESTINATION_SQL_STREAM になりますので、すべて sectorTECH が含まれるデータとなっています。

ストリーミングデスティネーション

このチュートリアルではデスティネーションの設定はありませんので、項目だけ確認してみます。

以下のとおり SQL アプリケーションの場合は Kinesis data stream, Kinesis Firehose delivery stream, AWS Lambda function のいずれかになります。In-application stream でアプリケーション内ストリーム名とタイプ(JSON | CSV)を選択する形式のようですね。

簡単ではありますが、以上です。

さいごに

Kinesis Data Analytics をはじめて触ってみた感想としては、「なんで、こんなにおもろいサービスを今まで触ってなかったんや・・・」です。

「あれに使えそう」「こんなことも出来るんちゃうん!?」 妄想が止まらない!!

一旦、さわってみると雰囲気がわかると思いますので、是非、チュートリアルを試してみてください。

以上!大阪オフィスの丸毛(@marumo1981)でした!