Kinesis Data Analytics不要?Kinesis Data StreamsとDynamoDB Streamsから起動するLambdaでタンブリングウィンドウが利用可能になりました #reinvent

ストリーミングデータに対する簡易な集計処理がLambda単体で実現できるようになりました
2020.12.27

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

CX事業本部@大阪の岩田です。先日12/15付けのアップデートでKinesis Data StreamsとDynamoDB Streamsから起動するLambdaにタンブリングウィンドウの設定が可能になりました。

少し時間が空いてしまいましたが、このアップデートについてご紹介します。

概要

今回のアップデートにより、Lambdaに流れてきたストリーミングデータの状態をウインドウ単位で管理できるようになりました。Lambdaというサービスはステートレスが基本で、複数の関数呼び出しを跨いで何かしらの状態を共有するにはDynamoDBのような外部データストアを利用するのが基本でした。今後はLambdaのサービス基盤だけで各ウィンドウの状態が管理できます。

具体的なイメージとしては以下のような処理が実現可能になりました。

  • ストリームに[1,2,3,4,5]というデータが投入される
    • データがウィンドウに分割される
    • ウィンドウA... [1,2,3]
    • ウィンドウB... [4,5]
  • 1回目のLambda起動
    • ペイロードにウインドウAのステート(この時点では空)と生データの[1,2]が流れてくる
    • ペイロードで受け取った数値を合計(1 + 2 = 3)し、ウィンドウAの集計結果として3を返す
  • 2回目のLambda起動
    • ペイロードにウインドウAのステートと生データの[3]が流れてくる
    • ウィンドウAの集計結果(3)とペイロードで受け取った数値を合計(3 + 3 = 3)し、ウィンドウAの集計結果として6を返す

タンブリングウィンドウを利用する簡単な集計処理であれば、わざわざKinesis Data Analyticsを使わなくてもLambdaだけで実現可能になったというのが大きなメリットでしょう。

そもそもタンブリングウィンドウとは?

タンブリングウィンドウはストリーム処理における「ウィンドウ」の1種です。ウィンドウとはイベントデータを特定の条件で分割/グループ化したサブセットであり、タンブリングウィンドウは重複しないこと つまり、1つのイベントが複数のウィンドウのウィンドウに所属しないことが大きな特徴です。

AWSにおけるストリーミングデータ分析用のサービスKinesis Data Analyticsでは

  • タンブリングウィンドウ
  • Stagger Windows
  • スライディングウィンドウ

3種類のウィンドウが利用可能です。

各ウィンドウの特徴や違いについては以下のブログを参照して下さい。

Kinesis Data Analytics の「時間」と「ウィンドウクエリパターン」について理解する

AzureのStream Analyticsではより多くのウィンドウ関数がサポートされているので、ストリーム処理全般に関する理解を深めるためには以下のドキュメントもオススメです。

Stream Analytics ウィンドウ関数の概要

タンブリングウィンドウの利用方法

タンブリングウィンドウを利用するためには、イベントソースマッピングの設定からタンブリングウィンドウの期間を指定します。

タンブリングウィンドウが有効化されたLambdaは、レスポンスとしてTimeWindowEventReponseと呼ばれるオブジェクトを返却することで、同一ウィンドウに対する次回のLambda呼び出しに状態を引き継ぐことが可能です。TimeWindowEventReponseは以下のような形式です。

{
    "state": {
        "1": 282,
        "2": 715
    },
    "batchItemFailures": []
}

batchItemFailuresはチェックポイントを利用する場合に利用する項目なので、実質的にはstateというキーに任意の構造でデータをセットして返却できると考えると良いでしょう。

タンブリングウィンドウには以下のような制限事項があります

  • 設定可能な範囲は最大で900(15分)
  • タンブリングウィンドウが有効化された場合、Lambdaのペイロードサイズ上限が6MBから5MBに減少する
  • stateに保持できるデータは最大1MB
    • 1MBを超過した場合、対象ウィンドウは早期終了されます

イベントデータの構造

Lambdaのイベントソースに設定したKinesis Data Streams/DynamoDB Streamsに対してタンブリングウインドウを有効化した場合、Lambdaのイベントデータは以下のような構造になります。

{
   "Records":[
		...略
   ],
    "window": {
        "start": "2020-07-30T17:00:00Z",
        "end": "2020-07-30T17:05:00Z"
    },
    "state": {
        "1": "state1"
    },
    "shardId": "shard123456789",
    "eventSourceARN": "stream-ARN",
    "isFinalInvokeForWindow": false,
    "isWindowTerminatedEarly": false
}

Recordsの部分は変更ありませんが、タンブリングウィンドウ有効化時の独自項目としていくつかの項目が増えています。

  • window
    • 対象タンブリングウィンドウの開始/終了日時がセットされます
  • state
    • 対象ウィンドウの状態を保持するオブジェクトです。対象ウィンドウに対する直前のLamba実行結果としてreturnしたTimeWindowEventReponseの中身がここに設定されます。
  • shardId
    • データ生成元のKinesis Data StreamsもしくはDynamoDB StreamsのシャードIDです
  • eventSourceARN
    • イベントソースとして設定されたKinesis Data StreamsもしくはDynamoDB StreamsのARNです
  • isFinalInvokeForWindow
    • Lambda Functionの呼び出しが対象ウィンドウ内での最後の呼び出しかを表すフラグです
  • isWindowTerminatedEarly
    • ウィンドウが早期終了したかを表すフラグです。stateのサイズが1Mを超えた場合、Lambdaはウィンドウを早期終了します。

これらの項目を活用してストリーミング処理を実装していくことになります。

やってみる

実際にタンブリングウインドウの設定を利用した簡易なストリーミング処理を試してみます。すでにAWSブログでサンプルが紹介されているので、以下ブログの内容に沿って進めていきます。

DynamoDBのテーブル作成

以下の2つのテーブルを作成します

  • tumblingWindows
    • 売上データを格納するためのテーブル
    • パーティションキー:ID (文字列)
    • DynamoDBストリームを有効化
  • tumblingWindowsAggregation
    • 売上の合計値をシャードID別に集計した結果を格納するためのテーブル
    • パーティションキー:shardId (文字列)
    • ソートキー:windowEnd (文字列)

Lambdaの作成

続いてtumblingWindowsテーブルのDynamoDBストリームから起動するLambdaを作成します。イベントソースマッピングで設定するタンブリングウィンドウの期間は30を指定します。

Lambdaのコードは以下通りです。ランタイムにはNode.js 12.xを利用しています。

const AWS = require('aws-sdk')
AWS.config.update({ region: process.env.AWS_REGION })
const docClient = new AWS.DynamoDB.DocumentClient()
const TableName = 'tumblingWindowsAggregation'

function isEmpty(obj) { return Object.keys(obj).length === 0 }

exports.handler = async (event) => {
    // Save aggregation result in the final invocation
    if (event.isFinalInvokeForWindow) {
        console.log('Final: ', event)
        
        const params = {
          TableName,
          Item: {
            windowEnd: event.window.end,
            windowStart: event.window.start,
            sales: event.state.sales,
            shardId: event.shardId
          }
        }
        return await docClient.put(params).promise()
    }
    console.log(event)
    
    // Create the state object on first invocation or use state passed in
    let state = event.state

    if (isEmpty (state)) {
        state = {
            sales: 0
        }
    }
    console.log('Existing: ', state)

    // Process records with custom aggregation logic

    event.Records.map((item) => {
        // Only processing INSERTs
        if (item.eventName != "INSERT") return
        
        // Add sales to total
        let value = parseFloat(item.dynamodb.NewImage.sales.N)
        console.log('Adding: ', value)
        state.sales += value
    })

    // Return the state for the next invocation
    console.log('Returning state: ', state)
    return { state: state }
}

いくつかポイントを解説します。

まず

    if (isEmpty (state)) {
        state = {
            sales: 0
        }
    }

の部分ですが、対象ウィンドウに対するLambdaの初回起動時はstateがセットされていないので、stateを初期化します。

次に

    event.Records.map((item) => {
        // Only processing INSERTs
        if (item.eventName != "INSERT") return
        
        // Add sales to total
        let value = parseFloat(item.dynamodb.NewImage.sales.N)
        console.log('Adding: ', value)
        state.sales += value
    })

の部分で対象シャードID、ウィンドウにおける売上データの合計値を加算していきます。

最後に

    if (event.isFinalInvokeForWindow) {
        console.log('Final: ', event)
        
        const params = {
          TableName,
          Item: {
            windowEnd: event.window.end,
            windowStart: event.window.start,
            sales: event.state.sales,
            shardId: event.shardId
          }
        }
        return await docClient.put(params).promise()
    }

の部分で対象ウィンドウの処理完了を判定し、集計値をtumblingWindowsAggregationテーブルにPutしています。

生データの投入

実際にデータを投入して動作確認します。ローカル環境から以下のコードを実行し、tumblingWindowsテーブルに売上データを投入します。こちらもNode.js 12で実行しています。

const AWS = require('aws-sdk')
//AWS.config.update({ region: 'us-east-1' })
AWS.config.update({ region: 'ap-northeast-1' })
const docClient = new AWS.DynamoDB.DocumentClient()

const TableName = 'tumblingWindows'
const ITERATIONS = 100
const SLEEP_MS = 100

let totalSales = 0

function sleep(ms) { 
  return new Promise(resolve => setTimeout(resolve, ms));
}

const createSales = async () => {
  for (let i = 0; i < ITERATIONS; i++) {

    let sales = Math.round (parseFloat(100 * Math.random()))
    totalSales += sales
    console.log ({i, sales, totalSales})

    await docClient.put ({
      TableName,
      Item: {
        ID: Date.now().toString(),
        sales,
        ITERATIONStamp: new Date().toString()
      }
    }).promise()
    await sleep(SLEEP_MS)
  }
}

const main = async() => {
  await createSales()
  console.log('Total Sales: ', totalSales)
}

main()

実行すると以下のように出力されます。

{ i: 0, sales: 11, totalSales: 11 }
{ i: 1, sales: 30, totalSales: 41 }
{ i: 2, sales: 22, totalSales: 63 }
{ i: 3, sales: 100, totalSales: 163 }
{ i: 4, sales: 74, totalSales: 237 }
{ i: 5, sales: 64, totalSales: 301 }
{ i: 6, sales: 78, totalSales: 379 }
{ i: 7, sales: 93, totalSales: 472 }
{ i: 8, sales: 77, totalSales: 549 }
{ i: 9, sales: 5, totalSales: 554 }
{ i: 10, sales: 43, totalSales: 597 }
...略

DynamoDBのtumblingWindowsテーブルを確認するとデータが投入されていることが分かります。

しばらく待ってからtumblingWindowsAggregationテーブルを確認してみましょう。

このようにシャードID別の合計値が保存されているはずです。

まとめ

Kinesis Data StreamsとDynamoDB Streamsから起動するLambdaで新たに利用可能になったタンブリングウィンドウの設定についてご紹介しました。簡易な集計処理であればKinesis Data Analyticsを利用せずにLambdaだけで完結できるようになったのは非常に便利ですね。

注意点すべきポイントとしては

  • ウィンドウはタイムスタンプだけでなくシャードIDごとにも分割される
    • 先程の売上データの例であれば、tumblingWindowsAggregationのレコードに対して集計処理を実行して初めて売上データ全体の合計が取得できる
  • 対応しているウィンドウはタンブリングウィンドウのみ

といったところでしょうか。

もし簡易な集計処理のためにKinesis Data Analyticsを利用しているシステムがあれば、今回のアップデートに置き換えられないか検討してみてはいかがでしょうか?