この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
CX事業本部@大阪の岩田です。先日12/15付けのアップデートでKinesis Data StreamsとDynamoDB Streamsから起動するLambdaにタンブリングウィンドウの設定が可能になりました。
少し時間が空いてしまいましたが、このアップデートについてご紹介します。
概要
今回のアップデートにより、Lambdaに流れてきたストリーミングデータの状態をウインドウ単位で管理できるようになりました。Lambdaというサービスはステートレスが基本で、複数の関数呼び出しを跨いで何かしらの状態を共有するにはDynamoDBのような外部データストアを利用するのが基本でした。今後はLambdaのサービス基盤だけで各ウィンドウの状態が管理できます。
具体的なイメージとしては以下のような処理が実現可能になりました。
- ストリームに[1,2,3,4,5]というデータが投入される
- データがウィンドウに分割される
- ウィンドウA... [1,2,3]
- ウィンドウB... [4,5]
- 1回目のLambda起動
- ペイロードにウインドウAのステート(この時点では空)と生データの[1,2]が流れてくる
- ペイロードで受け取った数値を合計(1 + 2 = 3)し、ウィンドウAの集計結果として3を返す
- 2回目のLambda起動
- ペイロードにウインドウAのステートと生データの[3]が流れてくる
- ウィンドウAの集計結果(3)とペイロードで受け取った数値を合計(3 + 3 = 3)し、ウィンドウAの集計結果として6を返す
タンブリングウィンドウを利用する簡単な集計処理であれば、わざわざKinesis Data Analyticsを使わなくてもLambdaだけで実現可能になったというのが大きなメリットでしょう。
そもそもタンブリングウィンドウとは?
タンブリングウィンドウはストリーム処理における「ウィンドウ」の1種です。ウィンドウとはイベントデータを特定の条件で分割/グループ化したサブセットであり、タンブリングウィンドウは重複しないこと つまり、1つのイベントが複数のウィンドウのウィンドウに所属しないことが大きな特徴です。
AWSにおけるストリーミングデータ分析用のサービスKinesis Data Analyticsでは
- タンブリングウィンドウ
- Stagger Windows
- スライディングウィンドウ
3種類のウィンドウが利用可能です。
各ウィンドウの特徴や違いについては以下のブログを参照して下さい。
Kinesis Data Analytics の「時間」と「ウィンドウクエリパターン」について理解する
AzureのStream Analyticsではより多くのウィンドウ関数がサポートされているので、ストリーム処理全般に関する理解を深めるためには以下のドキュメントもオススメです。
タンブリングウィンドウの利用方法
タンブリングウィンドウを利用するためには、イベントソースマッピングの設定からタンブリングウィンドウの期間を指定します。
タンブリングウィンドウが有効化されたLambdaは、レスポンスとしてTimeWindowEventReponse
と呼ばれるオブジェクトを返却することで、同一ウィンドウに対する次回のLambda呼び出しに状態を引き継ぐことが可能です。TimeWindowEventReponse
は以下のような形式です。
{
"state": {
"1": 282,
"2": 715
},
"batchItemFailures": []
}
batchItemFailures
はチェックポイントを利用する場合に利用する項目なので、実質的にはstate
というキーに任意の構造でデータをセットして返却できると考えると良いでしょう。
タンブリングウィンドウには以下のような制限事項があります
- 設定可能な範囲は最大で900(15分)
- タンブリングウィンドウが有効化された場合、Lambdaのペイロードサイズ上限が6MBから5MBに減少する
- stateに保持できるデータは最大1MB
- 1MBを超過した場合、対象ウィンドウは早期終了されます
イベントデータの構造
Lambdaのイベントソースに設定したKinesis Data Streams/DynamoDB Streamsに対してタンブリングウインドウを有効化した場合、Lambdaのイベントデータは以下のような構造になります。
{
"Records":[
...略
],
"window": {
"start": "2020-07-30T17:00:00Z",
"end": "2020-07-30T17:05:00Z"
},
"state": {
"1": "state1"
},
"shardId": "shard123456789",
"eventSourceARN": "stream-ARN",
"isFinalInvokeForWindow": false,
"isWindowTerminatedEarly": false
}
Records
の部分は変更ありませんが、タンブリングウィンドウ有効化時の独自項目としていくつかの項目が増えています。
- window
- 対象タンブリングウィンドウの開始/終了日時がセットされます
- state
- 対象ウィンドウの状態を保持するオブジェクトです。対象ウィンドウに対する直前のLamba実行結果としてreturnした
TimeWindowEventReponse
の中身がここに設定されます。
- 対象ウィンドウの状態を保持するオブジェクトです。対象ウィンドウに対する直前のLamba実行結果としてreturnした
- shardId
- データ生成元のKinesis Data StreamsもしくはDynamoDB StreamsのシャードIDです
- eventSourceARN
- イベントソースとして設定されたKinesis Data StreamsもしくはDynamoDB StreamsのARNです
- isFinalInvokeForWindow
- Lambda Functionの呼び出しが対象ウィンドウ内での最後の呼び出しかを表すフラグです
- isWindowTerminatedEarly
- ウィンドウが早期終了したかを表すフラグです。stateのサイズが1Mを超えた場合、Lambdaはウィンドウを早期終了します。
これらの項目を活用してストリーミング処理を実装していくことになります。
やってみる
実際にタンブリングウインドウの設定を利用した簡易なストリーミング処理を試してみます。すでにAWSブログでサンプルが紹介されているので、以下ブログの内容に沿って進めていきます。
DynamoDBのテーブル作成
以下の2つのテーブルを作成します
- tumblingWindows
- 売上データを格納するためのテーブル
- パーティションキー:ID (文字列)
- DynamoDBストリームを有効化
- tumblingWindowsAggregation
- 売上の合計値をシャードID別に集計した結果を格納するためのテーブル
- パーティションキー:shardId (文字列)
- ソートキー:windowEnd (文字列)
Lambdaの作成
続いてtumblingWindowsテーブルのDynamoDBストリームから起動するLambdaを作成します。イベントソースマッピングで設定するタンブリングウィンドウの期間は30を指定します。
Lambdaのコードは以下通りです。ランタイムにはNode.js 12.xを利用しています。
const AWS = require('aws-sdk')
AWS.config.update({ region: process.env.AWS_REGION })
const docClient = new AWS.DynamoDB.DocumentClient()
const TableName = 'tumblingWindowsAggregation'
function isEmpty(obj) { return Object.keys(obj).length === 0 }
exports.handler = async (event) => {
// Save aggregation result in the final invocation
if (event.isFinalInvokeForWindow) {
console.log('Final: ', event)
const params = {
TableName,
Item: {
windowEnd: event.window.end,
windowStart: event.window.start,
sales: event.state.sales,
shardId: event.shardId
}
}
return await docClient.put(params).promise()
}
console.log(event)
// Create the state object on first invocation or use state passed in
let state = event.state
if (isEmpty (state)) {
state = {
sales: 0
}
}
console.log('Existing: ', state)
// Process records with custom aggregation logic
event.Records.map((item) => {
// Only processing INSERTs
if (item.eventName != "INSERT") return
// Add sales to total
let value = parseFloat(item.dynamodb.NewImage.sales.N)
console.log('Adding: ', value)
state.sales += value
})
// Return the state for the next invocation
console.log('Returning state: ', state)
return { state: state }
}
いくつかポイントを解説します。
まず
if (isEmpty (state)) {
state = {
sales: 0
}
}
の部分ですが、対象ウィンドウに対するLambdaの初回起動時はstateがセットされていないので、stateを初期化します。
次に
event.Records.map((item) => {
// Only processing INSERTs
if (item.eventName != "INSERT") return
// Add sales to total
let value = parseFloat(item.dynamodb.NewImage.sales.N)
console.log('Adding: ', value)
state.sales += value
})
の部分で対象シャードID、ウィンドウにおける売上データの合計値を加算していきます。
最後に
if (event.isFinalInvokeForWindow) {
console.log('Final: ', event)
const params = {
TableName,
Item: {
windowEnd: event.window.end,
windowStart: event.window.start,
sales: event.state.sales,
shardId: event.shardId
}
}
return await docClient.put(params).promise()
}
の部分で対象ウィンドウの処理完了を判定し、集計値をtumblingWindowsAggregation
テーブルにPutしています。
生データの投入
実際にデータを投入して動作確認します。ローカル環境から以下のコードを実行し、tumblingWindows
テーブルに売上データを投入します。こちらもNode.js 12で実行しています。
const AWS = require('aws-sdk')
//AWS.config.update({ region: 'us-east-1' })
AWS.config.update({ region: 'ap-northeast-1' })
const docClient = new AWS.DynamoDB.DocumentClient()
const TableName = 'tumblingWindows'
const ITERATIONS = 100
const SLEEP_MS = 100
let totalSales = 0
function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
const createSales = async () => {
for (let i = 0; i < ITERATIONS; i++) {
let sales = Math.round (parseFloat(100 * Math.random()))
totalSales += sales
console.log ({i, sales, totalSales})
await docClient.put ({
TableName,
Item: {
ID: Date.now().toString(),
sales,
ITERATIONStamp: new Date().toString()
}
}).promise()
await sleep(SLEEP_MS)
}
}
const main = async() => {
await createSales()
console.log('Total Sales: ', totalSales)
}
main()
実行すると以下のように出力されます。
{ i: 0, sales: 11, totalSales: 11 }
{ i: 1, sales: 30, totalSales: 41 }
{ i: 2, sales: 22, totalSales: 63 }
{ i: 3, sales: 100, totalSales: 163 }
{ i: 4, sales: 74, totalSales: 237 }
{ i: 5, sales: 64, totalSales: 301 }
{ i: 6, sales: 78, totalSales: 379 }
{ i: 7, sales: 93, totalSales: 472 }
{ i: 8, sales: 77, totalSales: 549 }
{ i: 9, sales: 5, totalSales: 554 }
{ i: 10, sales: 43, totalSales: 597 }
...略
DynamoDBのtumblingWindows
テーブルを確認するとデータが投入されていることが分かります。
しばらく待ってからtumblingWindowsAggregation
テーブルを確認してみましょう。
まとめ
Kinesis Data StreamsとDynamoDB Streamsから起動するLambdaで新たに利用可能になったタンブリングウィンドウの設定についてご紹介しました。簡易な集計処理であればKinesis Data Analyticsを利用せずにLambdaだけで完結できるようになったのは非常に便利ですね。
注意点すべきポイントとしては
- ウィンドウはタイムスタンプだけでなくシャードIDごとにも分割される
- 先程の売上データの例であれば、
tumblingWindowsAggregation
のレコードに対して集計処理を実行して初めて売上データ全体の合計が取得できる
- 先程の売上データの例であれば、
- 対応しているウィンドウはタンブリングウィンドウのみ
といったところでしょうか。
もし簡易な集計処理のためにKinesis Data Analyticsを利用しているシステムがあれば、今回のアップデートに置き換えられないか検討してみてはいかがでしょうか?