
Kinesis Data Analytics の「時間」と「ウィンドウクエリパターン」について理解する
この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
先日より、Kinesis Data Analytics について学習をはじめています。
今回は、Kinesis Data Analytics のアプリケーション内データストリームで時系列データを扱うにあたって重要な「時間」と「ウィンドウクエリパターン」についての理解を深めていきます。
目次
アプリケーション内データストリームにおける「時間」
アプリケーション内データストリームでは、以下 3 タイプの時間に分類されます。正しく分析結果を得るためには、それぞれの時間についての意味を正しく理解しましょう。
 引用元:Amazon Kinesis Analytics によるストリーミングデータのリアルタイム分析
引用元:Amazon Kinesis Analytics によるストリーミングデータのリアルタイム分析
- イベント時間
- イベントが発生したタイムスタンプ(クライアント側の時間)
- 分析に利用する時間として望ましいが、クライアント側の時刻に依存するため不正確な場合がある
 
- 取り込み時間(収集時刻)
- ストリーミングソースに追加された時点のタイムスタンプ
- APPROXIMATE_ARRIVAL_TIMEフィールドが追加される
- 多くの場合でイベント発生時間にかなり近い
 
- 処理時間
- アプリケーション内ストリームに追加された時点のタイムスタンプ
- ROWTIME列で提供される
- アプリケーションが遅延している場合、イベント時間を正確に反映できなくなる可能性がある
- 経過時間に対しては正確だが、実際のイベント発生時間ではない場合がある
 
どの「時間」を使って分析する?
それぞれの時間でメリット、デメリットがありますので、公式ガイドでは「処理時間(ROWTIME)」と「イベント時間 or 取り込み時間」をあわせた 2 ウィンドウ戦略を推奨しています。
2 ウィンドウ戦略
- クエリ結果を発行する頻度を制御するために ROWTIMEを使う
- 論理時間(いつイベントが発生したか)には、「イベント時間 or 取り込み時間」を使う
この戦略のメリットは、イベントが発生した時間(論理時間)を利用できることです。もし Kinesis Analytics アプリケーションが遅延しても、論理時間を利用して適切に処理することが可能です。ROWTIME は処理の順序を保証するためにのみ利用します。(例えば取り込み時間が ROWTIME よりも早ければ、Kinesis Analytics アプリケーション内で遅延したことがわかります)
Kinesis Data Analytics で利用できるデモストリームの例で確認しましょう。ストリームデータは以下の形式で流れてきます。
{"TICKER_SYMBOL":"QXZ","SECTOR":"HEALTHCARE","CHANGE":-5.18,"PRICE":81.56},
{"TICKER_SYMBOL":"HJK","SECTOR":"TECHNOLOGY","CHANGE":-0.06,"PRICE":4.72},
{"TICKER_SYMBOL":"MMB","SECTOR":"ENERGY","CHANGE":0.72,"PRICE":22.07},
{"TICKER_SYMBOL":"IOP","SECTOR":"TECHNOLOGY","CHANGE":-1.3,"PRICE":117.61},
{"TICKER_SYMBOL":"WSB","SECTOR":"FINANCIAL","CHANGE":2.49,"PRICE":113.78},
以下は、1 分ごとのタンブリングウィンドウ(後述)で TICKER_SYMBOL をカウントする SQL アプリケーションです。
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" 
    ("ingest_time"    timestamp,
    "APPROXIMATE_ARRIVAL_TIME" timestamp,
    "ticker_symbol"  VARCHAR(12), 
    "symbol_count"        integer);
                        
CREATE OR REPLACE PUMP "STREAM_PUMP" AS
    INSERT INTO "DESTINATION_SQL_STREAM"
    SELECT STREAM STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND) AS "ingest_time",
        STEP("SOURCE_SQL_STREAM_001".APPROXIMATE_ARRIVAL_TIME BY INTERVAL '60' SECOND) AS "APPROXIMATE_ARRIVAL_TIME",
        "TICKER_SYMBOL",
        COUNT(*) AS "symbol_count"
    FROM "SOURCE_SQL_STREAM_001"
    GROUP BY "TICKER_SYMBOL",
        STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND),
        STEP("SOURCE_SQL_STREAM_001".APPROXIMATE_ARRIVAL_TIME BY INTERVAL '60' SECOND);
14-16行目の GROUP BY で TICKER_SYMBOL、 60 秒ごとのウィンドウの ROWTIME、および取り込み時間の APPROXIMATE_ARRIVAL_TIME でレコードをグループ化しています。
クエリの結果例は以下のとおりです。INGEST_TIME 17:05:00 のXYZ は、ROWTIME 17:05:00 と ROWTIME 17:06:00 に分かれてしまう場合があります。これらのデータを正しく扱うためには、ストリーミングデスティネーションでデータベースに出力するなど永続化を行って利用する必要があります。
| ROWTIME | INGEST_TIME | TICKER_SYMBOL | SYMBOL_COUNT | 
|---|---|---|---|
| 2016-07-19 17:05:00.0 | 2016-07-19 17:05:00.0 | ABC | 10 | 
| 2016-07-19 17:05:00.0 | 2016-07-19 17:05:00.0 | DEF | 15 | 
| 2016-07-19 17:05:00.0 | 2016-07-19 17:05:00.0 | XYZ | 6 | 
| 2016-07-19 17:06:00.0 | 2016-07-19 17:06:00.0 | ABC | 11 | 
| 2016-07-19 17:06:00.0 | 2016-07-19 17:06:00.0 | DEF | 11 | 
| 2016-07-19 17:06:00.0 | 2016-07-19 17:05:00.0 | XYZ | 1 | 
STEP()
先のクエリで 15-16 行目で利用している STEP 関数は、与えられたタイムスタンプまたは整数を、指定したインターバル値の倍数に切り捨てます。
例えば、STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND) は、ROWTIME をもっとも近い 60 秒の倍数で切り捨てになります。
指定可能なインターバル値は 「年/月/日/時/分/秒」 です。
| 関数 | 結果 | 
|---|---|
| STEP(CAST('2004-09-30 13:48:23' as TIMESTAMP) BY INTERVAL '10' SECOND) | '2004-09-30 13:48:20’ | 
| STEP(CAST('2004-09-30 13:48:23' as TIMESTAMP) BY INTERVAL '2' HOUR) | '2004-09-30 12:00:00’ | 
| STEP(CAST('2004-09-30 13:48:23' as TIMESTAMP) BY INTERVAL '5' MINUTE) | '2004-09-30 13:45:00’ | 
| STEP(CAST('2004-09-27 13:48:23' as TIMESTAMP) BY INTERVAL '5' DAY) | '2004-09-25 00:00:00.0’ | 
| STEP(CAST('2004-09-30 13:48:23' as TIMESTAMP) BY INTERVAL '1' YEAR) | '2004-01-01 00:00:00.0’ | 
3つのウィンドウクエリ・パターン
ストリーミングデータのように常時更新されている入力から結果セットを得るために、時間と行の条件で定義される「ウィンドウ」を使用してクエリをバインドする場合が多くあります。これをウィンドウクエリと呼びます。
Kinesis Data Analytics では以下、3 パターンのウィンドウタイプをサポートしています。
- タンブリングウィンドウ
- Stagger Windows(ずらしウィンドウ)
- スライディングウィンドウ
タンブリングウィンドウ
タンブリングウィンドウの特徴:
- ウィンドウクエリが各ウィンドウに重複しない(タンブリング)方式
- ストリーム内の各レコードは特定のウィンドウに属する
 
- ウィンドウで指定した時間ごとに結果を 1 回出力する
- GROUP BY句を使う
 引用元:Amazon Kinesis Data Analytics for SQL Applications 開発者ガイド
引用元:Amazon Kinesis Data Analytics for SQL Applications 開発者ガイド
以下は先のデモストリームに対するタンブリングウィンドウのサンプルクエリです。GROUP BY によって、60 秒のウィンドウで Ticker_Symbol と ROWTIME でグループ化し、ウィンドウ期間の各ティッカーシンボルの最大値と最小値を表示します。
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
                                   ticker_symbol VARCHAR(4), 
                                   Min_Price     DOUBLE, 
                                   Max_Price     DOUBLE);
-- CREATE OR REPLACE PUMP to insert into output
CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
  INSERT INTO "DESTINATION_SQL_STREAM" 
    SELECT STREAM Ticker_Symbol,
                  MIN(Price) AS Min_Price,
                  MAX(Price) AS Max_Price
    FROM    "SOURCE_SQL_STREAM_001"
    GROUP BY Ticker_Symbol, 
             STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND);
出力例は以下のとおりです。
Stagger Windows(ずらしウィンドウ)
Stagger Windows の特徴:
- 一貫性のない時間に届くデータの分析に適している
- 例えば VPC フローログのようにキャプチャしてから届くまでに時間差のあるデータ
 
- 固定の時間間隔ではウィンドウは開かない
- ウィンドウはイベントが届いた時点から経過時間を測定
 
- ウィンドウで指定した時間ごとに結果を 1 回出力する
- WINDOWED BY句を使う
なぜ、ずらすのか
Stagger Windows は翻訳すると「ずらしウィンドウ」と言われます。何を目的に「ずらし」を行うのか順を追って説明していきます。
まず、以下のようなイベントがあったとします。
| ROWTIME | EVENT_TIME | TICKER_SYMBOL | 
|---|---|---|
| 11:00:20 | 11:00:10 | AMZN | 
| 11:00:30 | 11:00:20 | AMZN | 
| 11:01:05 | 11:00:55 | AMZN | 
| 11:01:15 | 11:01:05 | AMZN | 
図にすると、以下のとおりです。◇ はイベントの起きた時点(EVENT_TIME)を表しています。縦の破線はイベントがアプリケーション内ストリームに追加された時点(ROWTIME)を表しています。
このストリームイベントを、まずは先程のタンブリングウィンドウで処理してみます。サンプルクエリは下記のとおり。
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
    TICKER_SYMBOL VARCHAR(4), 
    TICKER_COUNT     DOUBLE);
CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
  INSERT INTO "DESTINATION_SQL_STREAM" 
    SELECT STREAM 
        TICKER_SYMBOL,
        FLOOR(EVENT_TIME TO MINUTE),
        COUNT(TICKER_SYMBOL) AS TICKER_COUNT
    FROM "SOURCE_SQL_STREAM_001"
    GROUP BY ticker_symbol, FLOOR(EVENT_TIME TO MINUTE), STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '1' MINUTE);
結果は以下のとおりです。
| ROWTIME | EVENT_TIME | TICKER_SYMBOL | Count | 
|---|---|---|---|
| 11:01:00 | 11:00:00 | AMZN | 2 | 
| 11:02:00 | 11:00:00 | AMZN | 1 | 
| 11:02:00 | 11:01:00 | AMZN | 1 | 
3番目の入力レコードは、1 番目のウィンドウと同じ EVENT_TIME を持ちますが、ROWTIME が異なるため別にグループ化されています。これらを正しく分析するには、例えば DynamoDB などの永続的なストアに集約してから利用することになるので、アプリケーションの複雑性や処理要件などの考慮が発生してきます。
こちらのサンプルスクリプトで作ったデモストリームを使って、実際の動作を確認します。
データは以下のよう形式で流れてきます。
{"EVENT_TIME": "2020-03-28T10:00:56.509021", "TICKER": "INTC"}
{"EVENT_TIME": "2020-03-28T10:00:56.509021", "TICKER": "INTC"}
{"EVENT_TIME": "2020-03-28T10:00:56.509021", "TICKER": "INTC"}
{"EVENT_TIME": "2020-03-28T10:01:56.895686", "TICKER": "MSFT"}
{"EVENT_TIME": "2020-03-28T10:01:56.895686", "TICKER": "MSFT"}
{"EVENT_TIME": "2020-03-28T10:01:56.895686", "TICKER": "MSFT"}
{"EVENT_TIME": "2020-03-28T10:02:57.272817", "TICKER": "AMZN"}
{"EVENT_TIME": "2020-03-28T10:02:57.272817", "TICKER": "AMZN"}
{"EVENT_TIME": "2020-03-28T10:02:57.272817", "TICKER": "AMZN"}
スキーマは以下のように設定しています。
下線部分の AAPL が EVENT_TIME は同じですが、ROWTIME が異なるため別のグループとしてカウントされていることが確認できました。
Stagger Windows(ずらしウィンドウ)の場合
Stagger Windows は、パーティションキーに一致する最初のイベントが届いた時点からウィンドウが開いて測定が始まり、指定された経過時間で閉じます。
先程と同じ以下のデータストリームソースを例に考えます。
| ROWTIME | EVENT_TIME | TICKER_SYMBOL | 
|---|---|---|
| 11:00:20 | 11:00:10 | AMZN | 
| 11:00:30 | 11:00:20 | AMZN | 
| 11:01:05 | 11:00:55 | AMZN | 
| 11:01:15 | 11:01:05 | AMZN | 
最初のレコード(ROWTIME 11:00:20)が追加されたときウィンドウが開始されます。1 分間のウィンドウなので 11:01:20 に終了します。結果セットは ROWTIME と EVENT_TIME に基づきのグループ化します。最後のレコードは ROWTIME 11:01:15 なので 11:01:20 までの WINDOW1 内に入っていますが、EVENT_TIME 11:01 であるため、WINDOW1の EVENT_TIME 11:00 に含まれません。
図にすると以下のようにになります。
サンプルクエリとしては、下記のようになります。
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
    ticker_symbol    VARCHAR(4), 
    event_time       TIMESTAMP,
    ticker_count     DOUBLE);
CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
  INSERT INTO "DESTINATION_SQL_STREAM" 
    SELECT STREAM 
        TICKER_SYMBOL,
        FLOOR(EVENT_TIME TO MINUTE),
        COUNT(TICKER_SYMBOL) AS ticker_count
    FROM "SOURCE_SQL_STREAM_001"
    WINDOWED BY STAGGER (
            PARTITION BY FLOOR(EVENT_TIME TO MINUTE), TICKER_SYMBOL RANGE INTERVAL '1' MINUTE);
そして結果は、以下のとおりです。
| ROWTIME | EVENT_TIME | TICKER_SYMBOL | Count | 
|---|---|---|---|
| 11:01:20 | 11:00:00 | AMZN | 3 | 
| 11:02:15 | 11:01:00 | AMZN | 1 | 
2番目の結果セットを見ると ROWTIME 11:02:15 となっていますので、11:01:15 に入ってきたイベントが起点に Window2 が開いたと考えられます。つまり、さきほど Window1 から外れた 4 番目の入力イベントです。図にすると以下のようになります。
Stagger Windows も実際の環境で実行してみました。
タンブリングウィンドウのように、EVENT_TIME が重複しているティッカーシンボルは見当たらりません。このように Stagger Windows は時系列データの分析精度を向上させるために有用であることが確認できました。
FLOOR()
FLOOR 関数は STEP 関数と似ており、いずれの方法でもレコードをウィンドウにグループ化することができます。ただし、FLOOR は時間値を時間単位 (1年、1日、1時間、1分 など) に丸めることのみできます。STEP は、値を任意の間隔 (たとえば 30 秒など) に丸めることができるため、STEP のほうがウィンドウの指定は柔軟性が高いといえます。
| 関数 | 結果 | 
|---|---|
| FLOOR(TIMESTAMP '2004-09-30 13:48:23' TO HOUR) | TIMESTAMP '2004-09-30 13:00:00' | 
| FLOOR(TIMESTAMP '2004-09-30 13:48:23' TO MINUTE) | TIMESTAMP '2004-09-30 13:48:00' | 
| FLOOR(TIMESTAMP '2004-09-30 13:48:23' TO DAY) | TIMESTAMP '2004-09-30 00:00:00.0' | 
| FLOOR(TIMESTAMP '2004-09-30 13:48:23' TO YEAR) | TIMESTAMP '2004-01-01 00:00:00.0' | 
スライディングウィンドウ
スライディングウィンドウの特徴:
- ウィンドウが時間とともにスライドする
- ウィンドウが重複する
- ストリーム内の各レコードは複数のウィンドウの一部になる
 
- アプリケーション内ストリームにイベントが追加される都度、結果が出力される
- WIDOW句を使う
 引用元:Amazon Kinesis Data Analytics for SQL Applications 開発者ガイド
引用元:Amazon Kinesis Data Analytics for SQL Applications 開発者ガイド
文字や図で考えるとウィンドウが時間とともにスライドして・・・と解ったような、解らないような気もしますが、直感的にストリームデータが流れてきた時点から直近(ウィンドウ期間分)の集計結果を返すという理解でよいかと思います。
以下は先のデモストリームに対するスライディングウィンドウのサンプルクエリです。1 分間のウィンドウのレコードを集計し、1 分間のウィンドウ内の最大値、最小値、平均値を出力します。WINDOW によって ticker_symbol をパーティションに、1 分間のウィンドウを指定しています。W1 の名前付き WINDOW は、OVER で参照します。
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
                         ticker_symbol VARCHAR(10), 
                         Min_Price     double, 
                         Max_Price     double, 
                         Avg_Price     double);
CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
   INSERT INTO "DESTINATION_SQL_STREAM"
     SELECT STREAM ticker_symbol,
                   MIN(Price) OVER W1 AS Min_Price,
                   MAX(Price) OVER W1 AS Max_Price,
                   AVG(Price) OVER W1 AS Avg_Price
     FROM   "SOURCE_SQL_STREAM_001"
     WINDOW W1 AS (
        PARTITION BY ticker_symbol 
        RANGE INTERVAL '1' MINUTE PRECEDING);  
スライディングウィンドウはストリーム内にイベントが追加される都度、結果が出力されているのが判ります。(1つのティッカーシンボルの同じ ROWTIME で新しい結果が出力されています)
まとめ
- ストリーミング内の時間は 3 つ
- 「イベント時間」:クライアント側で実際にイベントが起きた時間
- 「取り込み時間(APPROXIMATE_ARRIVAL_TIME)」:ストリーミングソースに追加された時間
- 「処理時間(ROWTIME)」:アプリケーション内ストリームに追加された時間
 
- それぞれの時間でメリット、デメリットがあるので推奨は 2 ウィンドウ戦略
- 処理時間 + (イベント時間 or 取り込み時間)
 
- ウィンドウクエリパターンは 3 つ
- タンブリングウィンドウ
- ウィンドウが重複しないのでシンプル
- 正確な結果を得たい場合は、永続化ストアで再集計が必要
 
- Stagger Windows(ずらしウィンドウ)
- 一貫性のない時間に届くデータ分析に適している
- タンブリングウィンドウのウィンドウが分かれてしまう問題を解決
 
- スライディングウィンドウ
- ストリームデータが入ってきた時点からウィンドウ期間分の集計にむいてる(最新 5 分間のMAX, MIN ,AVE など)
- イベントの追加ごとに結果が出力される
 
 
- タンブリングウィンドウ
さいごに
ストリーミングデータを扱うにあたって「時間」といっても、どこの時点での時間を指しているのかは重要な要素ですので、違いについて理解しておきましょう。
また、ウィンドウクエリパターンの違いについても、どのようなストリームデータを扱うのか、どのようにストリーミング分析結果を得たいのかによって、適したクエリパターンの選択が必要となりますので、あわせて理解しておきましょう。
以上!大阪オフィスの丸毛(@marumo1981)でした!



















