Kinesis Data Streamsに流れてくるイベントをPersonalizeのEventTrackerに投げてみる

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

Personalizeにはイベントを収集する機能があり、イベントデータが追加されることで、キャンペーンによるリアルタイムレコメンドに反映させることができます。また蓄積したイベントデータを使ったソリューションバージョン(レコメンドモデル)の作成も可能です。その際に使用するイベントはブラウザや外部システム等からKinesisに集められて、そこから必要な処理が施されるケースが多いかと思います。

今回は、Kinesis Data Streamsにイベントデータが送られてくるシステムを想定して、PersonalizeのEventTrackerへのイベントデータの送信を試してみました。EventTrackerによってPersonalizeに蓄積したイベントデータは後から参照することができないため、Kinesis Data Firehoseを使ったS3への保存も合わせて行いました。

構成図

今回試したシステムは次のような構成です。

スクリプト

Kinesisのストリームから取得したレコードからイベントデータを取り出し、PersonalizeのEventTrackerへ投げるだけのシンプルな処理です。ストリームの後続のLambda関数がこの処理を実行します。

import json
import boto3
import base64
import os

TRACKING_ID = os.environ['TRACKING_ID']

personalize_events = boto3.client('personalize-events')

def handler(event, _):
    for record in event['Records']:
        parsed_event = json.loads(
            base64.b64decode(record['kinesis']['data']))

        personalize_events.put_events(
            trackingId=TRACKING_ID,
            userId=str(parsed_event['userId']),
            sessionId=str(parsed_event['sessionId']),
            eventList=[{
                'sentAt': str(parsed_event['sentAt']),
                'eventType': parsed_event['eventType'],
                'properties': json.dumps({
                    'itemId': str(parsed_event['itemId'])
                })
            }]
        )

試してみる

PersonalizeでEventTrackerを作成し、Serverless Frameworkを使ってKinesis等のスタックをデプロイします。 その後、Kinesis Data Generatorを使って、Kinesisのストリームにデータを流して動きを確認してみます。 使用するスクリプトはGitHubで公開しています。

EventTrackerを作成する

まずはPersonalizeのEventTrackerを作成します。EventTrackerの作成にはデータセットグループとインタラクション用データセットが必要なので、先に作成します。 今の所、PersonalizeはCloudFormationが対応していないので、マネジメントコンソールで作業します。

まずはデータセットグループを作成します。

インタラクションタイプのデータセットも作成します。

データセットのスキーマはデフォルトのものを利用します。

データセットを作成すると、データをインポートする画面になりますが、今回は不要なのでキャンセルします。

ダッシュボード画面からEventTrackerを作成します。

名前を入力するだけで作成できます。

表示されたトラッキングIDはこの後使うため、控えておきます。

スタックを作成する

リポジトリをクローンしてきて、Serverless Frameworkを使ってデプロイします。デプロイの際には先ほど作成したEventTrackerのトラッキングIDを指定します。

git clone https://github.com/tandfy/sample-for-kinesis-to-personalize-event.git
cd sample-for-kinesis-to-personalize-event
yarn install
TRACKING_ID={トラッキングID} sls deploy

しばらくすると、スタックの作成が完了します。

イベントデータを流す

Kinesis Data Generatorを使ってイベントデータを流します。まずはKinesis Data Generatorを使えるようにするために必要なスタックを作成します。

Create a Cognito User with CloudFormation をクリックして、流れに沿って、スタックを作成します。

作成ができたら、スタックの出力にリンクが表示されているので、アクセスします。 リンク先はKinesis Data Generateで、URLにCognito Userに関する設定情報が入力されており、自動的に設定が施された状態になります。

スタック作成の中で設定したUsernamePasswordを入力してログインします。

リージョンとストリームを選択します。

秒間あたりのレコード数は10件で試します。レコードの圧縮はしません。

Record templateに次のようなイベント用のテンプレートデータを入力します。

{
    "sessionId": {{random.number(50)}},
    "userId": {{random.number(1000)}},
    "itemId": {{random.number(1000)}},
    "eventType": "{{random.arrayElement(
        ["CLICK","PAGE_VIEW"]
    )}}",
    "sentAt": {{random.number(
        {
            "min":1585200000,
            "max":1585300000
        }
    )}}
}

Send dataを押すと、イベントデータがKinesisのストリームに送られます。

ある程度イベントデータが送られたら、送信を停止します。

送信されたイベントの確認

ClowdWatchからPersonalizeのEventTrackerのメトリクスを確認できます。今回はPutEventsRequestsを見てみます。 今回は各イベントを1件ずつPutしているため、リクエスト数=イベント数となります。

200件のリクエストが成功していることを確認できました。

続いて、S3に保存されたイベントを確認してみます。 次のように日時ごとにイベントが保存されています。今回は短時間しかイベントを送っていないので、1つしかデータが保存されていません。

Kinesisに送信したイベントデータがJSON形式のため、S3に保存されたイベントデータは次のようなJSON Lines形式になっています。

さいごに

Kinesisのストリームに流れてきたイベントをPersonalizeに蓄積する流れを紹介しました。イベントをPutするためのLambda関数、S3にデータを保存するFirehoseというシンプルな構成ながら、Personalizeに蓄積したデータを使ってレコメンドを作成したり、S3に蓄積したデータを使って分析するなど幅広い活用が考えられます。

参考