DynamoDB StreamsとNorikraを使ったリアルタイム分析

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

先日のアップデートでDynamoDB Streamsが利用できるようになりました。
【AWS発表】DynamoDB アップデート – Triggers (Streams + Lambda) + Cross-Region Replication App
去年のre:inventで発表されて以来、心待ちにしていた機能です。

上記発表にあるように、

(DynamoDB)テーブルのStream機能を有効にすると、すべての変更 (puts, updates, deletes)が直近の24時間分保持されるようになり、ストリーム·レコードとしてほぼリアルタイムで利用できるようになります。

DynamoDB Streamsは、APIを利用して各レコードにアクセスできるだけでなく、AWS Lambdaのイベントソースとしても利用できます。
したがって、DynamoDBテーブルの更新をトリガーに、別のリソースの更新やSNSによる通知、ログの保存など、任意の処理を実行させることができます。
DynamoDB StreamsとAWS Lambdaの連携については下記の記事もご覧ください。
【新機能】Amazon DynamoDB Triggersを使ってDynamoDB StreamsとAWS Lambdaを連携する

今回は、DynamoDBテーブルで発生した変更をNorikraでリアルタイム分析する仕組みを作ってみました。
2014年12月の弊社イベント(re:Growth2014)で発表した内容と全く同じですが、マネジメントコンソール上での手順が変わっているので改めて紹介いたします。
発表で利用したスライドはこちらにあります。

概要

下記のような構成です。
DynamoDBで発生した更新情報をDynamoDB Streams経由でLambdaが受け取り、LambdaからHTTP POSTでNorikraに送ります。 arch

今回は、あるサービスのユーザ行動履歴がDynamoDBテーブルに保存されており、その更新データをNorikraで分析するというシナリオで説明を進めていきます。
DynamoDBのキーとしては

  • ハッシュキー: user_id
  • レンジキー: created_at

で必要に応じてフィールドをつけるという想定です。
例えば、あるユーザが新規登録し、ポイントを獲得し、退会した場合、下記のようなアイテムがDynamoDBテーブルに記録されます。

{
  "user_id": "test001",
  "created_at": 1437176049,
  "action": "register"
}
{
  "user_id": "test001",
  "created_at": 1437176149,
  "action": "get_point",
  "points": 200
}
{
  "user_id": "test001",
  "created_at": 1437176249,
  "action": "delete"

}

このユーザ行動履歴テーブルの変更イベントを使って、1分ごとの新規ユーザ数をNorikraでリアルタイムに計測してみます。

手順

DynamoDBテーブルの作成

DynamoDBテーブルの作成から始めます。
テーブルの名前は「user_history」としました。
dynamodb01

作成を進めるとDynamoDB Streamsの有効化画面が出てきますので、「Enable Stream」にチェックをつけ「View Type」で「New and Old Image」を選択します。
dynamodb02

確認画面はこのようになります。 dynamodb_confirm

Lambdaファンクションの作成

次にLambdaファンクションを作成します。

イベントソースにDynamoDB Streamsを利用したいのでblueprint「dynamodb-process-stream」を選択します。
lambda01

イベントソースに先ほど作成したDynamoDBテーブル「user_history」を選択します。
Lambdaが一度に処理できるイベント数(Batch Size)は100、DynamoDB Streamの読み込み位置(Start position)は最新のレコードから(Latest)としています。
lambda02

次にファンクションの内容を設定していきます。
今回利用したスクリプトはGithubにあげています。
ファンクション名は「DynamoDBStream-to-Norikra」、Handler名は「DynamoDBStream-to-Norikra.handler」です。
スクリプトはGithubからcloneしてきてzipに固めてアップロードします。

{yokotashinsuke}% git clone https://github.com/yokota-shinsuke/aws-lambda-dynamodbstream-to-norikra.git
(省略)
{yokotashinsuke}% cd aws-lambda-dynamodbstream-to-norikra
{yokotashinsuke}% vi DynamoDBStream-to-Norikra.js
(NorikraのURLを修正)
{yokotashinsuke}% npm install
(省略)
{yokotashinsuke}% zip -r DynamoDBStream-to-Norikra.zip DynamoDBStream-to-Norikra.js node_modules

スクリプト内でやっているのは

  • DynamoDBデータ型の変換
  • Norikraターゲットの作成
  • ターゲットへのイベント送信

です。
データ型の変換というのはDynamoDB Streamのイベント内にあるこのようなデータを

"NewImage": {
  "user_id": {
    "S": "test001"
  },
  "created_at": {
    "N": "1437175626"
  },
  "action": {
    "S": "register"
  }
}

このように変換することを指しています。

"NewImage": {
  "user_id": "test001",
  "created_at": 1437175626,
  "action": "register"
}

Norikraターゲット名はDynamoDBテーブル名になるようにしました。
lambda03

Norikraの構築

最後にNorikraを構築します。
Norikraのインストールや利用方法はドキュメントを参照ください。
Norikra
インストール方法はこちらの記事でも紹介しています。
Norikra+FluentdでDoS攻撃をブロックする仕組みを作ってみた

インストール、起動が正常に終了すると、
http://(Norikraのホスト名):26578/
でウェブUIにアクセスできます。
norikra

動作確認

これでDynamoDB Streamを流れるイベントをNorikraサーバで受け取れるようになりました。
マネジメントコンソールから次のようなアイテムを追加してみます。

{
  "user_id": "test001",
  "created_at": 1437172590,
  "action": "register"
}

dynamodb_test

CloudWatch LogsでLambdaの実行ログを確認できました。
cwlog_test

Norikra側でもターゲットが追加されていることを確認できます。
norikra_target

これでイベントが正常に届いていることが確認できたので、実際に分析用のクエリを登録してみます。
今回は、1分ごとのユーザ登録数を集計したいので、次のようなクエリを登録します。

SELECT COUNT(*)
  FROM user_history.win:time_batch(1 min)
  WHERE dynamodb.NewImage.action = 'register'

norikra_query

動作確認のため、時刻に応じた数のユーザを登録するスクリプトを実行してみます。
例えば、xx時15分なら15人、xx時16分には16人が登録されます。

require 'aws-sdk-core'
require 'aws-sdk-resources'

dynamodb = Aws::DynamoDB::Client.new(region: 'us-west-2')

prng = Random.new

while true do
  Time.now.min.times do
    dynamodb.put_item({
      table_name: "user_history",
      item: {
        user_id: "#{prng.rand}",
        created_at: Time.now.to_i,
        action: "register",
      }
    })
  end
  sleep(60)
end

このスクリプトをしばらくの間実行してから、NorikraのウェブUIでクエリの実行結果を見てみます。
norikra_result
[タイムスタンプ,{"カラム名": 値}]
という形式で出力されているので、次のような結果になってました。
期待通りです。
ユーザの登録時刻と集計時刻に1分のズレがあるため、「タイムスタンプ(分) - 1」人が新規ユーザ数となっています。

タイプスタンプ 日時変換 新規ユーザ数
1437175747 2015/7/17 23:29:07 28人
1437175807 2015/7/17 23:30:07 29人
1437175867 2015/7/17 23:31:07 30人
1437175927 2015/7/17 23:32:07 31人
1437175987 2015/7/17 23:33:07 32人

最後に

DynamoDBテーブルの更新内容をNorikraで分析する仕組みを作ってみました。
DynamoDBはスキーマレスなので自由にフィールドを追加できます。
Norikraもフィールドの変更に自動で対応してくれます。
なので、例えば、経路ごとのユーザ登録数を知りたいと思ったら、アプリケーション側で経路を記録するフィールドを追加し

{
  "user_id": "test001",
  "created_at": 1437172590,
  "action": "register",
  "entry_from": "ad01"
}

Norikra側でそれに対応する下記のようなクエリを追加すれば、すぐに集計することができます。

SELECT COUNT(*)
  FROM user_history.win:time_batch(1 min)
  WHERE dynamodb.NewImage.action = 'register'
  GROUP BY dynamodb.NewImage.entry_from