PersonalizeのバッチレコメンドジョブをBoto3を使って試してみた

PersonalizeのバッチレコメンドをBoto3を使って試してみました。
2020.02.20

Personalizeはレコメンドモデルをソリューションという形で作成し、ソリューションをキャンペーンという形でホスティングすることでリアルタイムレコメンドを利用できます。キャンペーンによるリアルタイムレコメンド以外にも、S3に配置したユーザID一覧ファイル(JSON Lines/JSONL形式)に対して、レコメンドをまとめて作成することができるバッチレコメンド機能もあります。

バッチレコメンドの主なユースケースとしては、定期ジョブとしてLambdaやワークフローシステムなどで動かしたり、レコメンド内容の検証や評価のためにノートブック等から動かすことが考えられます。 今回は主に検証時にスクリプトからバッチレコメンドを行うことを想定して、Boto3を使ったバッチレコメンドの一通りの流れを試してみたいと思います。

今回は作成済みのHRNN(USER_PERSONALIZATIONタイプ)のソリューションバージョンを利用します。Personalizeの諸概念やソリューションの作成等については以下のエントリをご参照ください。

やってみる

MovieLens 100K Datasetを利用して作成したソリューション(ソリューションバージョン)を用いて、複数のユーザIDに対してバッチレコメンドを行い、そのレコメンド内容を確認するという流れで進めていきます。

準備

まずは使用するモジュールやライブラリを読み込み、入出力ファイルの保存場所等を定めます。

import pandas as pd
import json
from os import path
import boto3
from datetime import datetime
import time

current_dt = datetime.now().strftime('%Y%m%d-%H%M%S')
bucket_name = '<バケット名>'
prefix = 'ml-100k'
user_ids_json_s3_path = f'personalize-work/{prefix}/user_ids.jsonl'

# 作成済みのソリューションバージョンのARN(レコメンドを作成するソリューションバージョン)
solution_version_arn = 'arn:aws:personalize:ap-northeast-1:123456789:solution/DEMO-solution/12321063'

# Personalizeがバッチレコメンド時に利用するIAMロール
role_arn = 'arn:aws:iam::123456789:role/service-role/AmazonPersonalize-ExecutionRole'

personalize = boto3.Session().client('personalize')
bucket = boto3.Session().resource('s3').Bucket(bucket_name)

※バッチレコメンドの際に入力/出力ファイルの保存場所として利用するバケットはs3:GetObjects3:PutObjects3:ListBucket操作をPersonalizeに許可してある必要があります。詳細については以下のドキュメントをご確認ください。

続いて、レコメンド情報の確認やレコメンド対象のユーザID取得のために、データ(MovieLens 100K Dataset)をダウンロードしてきます。

wget -N http://files.grouplens.org/datasets/movielens/ml-100k.zip
unzip -o ml-100k.zip

ユーザによる映画の評価データ(インタラクションデータ)と映画のメタデータ(アイテムデータ)をデータフレームに読み込みます。

# ユーザによる映画の評価データ
df = pd.read_csv('./ml-100k/u.data', sep='\t', names=['USER_ID', 'ITEM_ID', 'RATING', 'TIMESTAMP'])

# 映画データ
items = pd.read_csv('./ml-100k/u.item', sep='|', names=[
    'ITEM_ID', 'TITLE', 'RELEASE_DATE', 'VIDEO_RELEASE_DATE', 'IMDB_URL', 'UNKNOWN', 'ACTION', 'ADVENTURE', 'ANIMATION', "CHILDREN'S", 'COMEDY', 'CRIME', 'DOCUMENTARY', 'DRAMA', 'FANTASY', 'FILM-NOIR', 'HORROR', 'MUSICAL', 'MYSTERY', 'ROMANCE', 'SCI-FI', 'THRILLER', 'WAR', 'WESTERN'
], encoding='latin-1')
items.set_index('ITEM_ID', inplace=True)

# 映画のジャンルを分かりやすいように'|'で結合する
def extract_genre(row):
    return '|'.join([i for i, v in row[5:].items() if v == 1 ])
items['GENRE'] = items.apply(extract_genre, axis=1)
items = items[['TITLE', 'GENRE']]

インタラクションデータからユーザIDを取り出して、Personalizeのバッチレコメンドが対応しているJSONL形式に変換し、S3にアップロードします。

target_user_ids = list(df.USER_ID.unique())
user_ids = [json.dumps({'userId': str(user_id)}) for user_id in target_user_ids]
bucket.Object(user_ids_json_s3_path).put(Body='\n'.join(user_ids))

バッチレコメンド

バッチレコメンドジョブを実行します。s3DataDestinationで指定するpathはオブジェクトでないことを示すために、末尾に/をつける必要があります。バッチレコメンドによって作成されるレコメンドデータは<入力ファイル名>.outに保存されます。

user_ids_json_s3_uri = f's3://{bucket_name}/{user_ids_json_s3_path}'
response = personalize.create_batch_inference_job(
    jobName=f'{prefix}-{current_dt}',
    solutionVersionArn=solution_version_arn,
    numResults=100, # 作成するレコメンド数
    jobInput={
        's3DataSource': {
            'path': user_ids_json_s3_uri # 入力データの場所
        }
    },
    jobOutput={
        's3DataDestination': {
            'path': path.join(path.dirname(user_ids_json_s3_uri), '') # レコメンドデータの出力場所
        }
    },
    roleArn=role_arn
)
job_arn = response['batchInferenceJobArn']

ジョブが完了するまで待機します。

max_time = time.time() + 3600
while time.time() < max_time:
    response = personalize.describe_batch_inference_job(
        batchInferenceJobArn=job_arn
    )
    status = response["batchInferenceJob"]["status"]
    print("DatasetGroup: {}".format(status))
    
    if status == "ACTIVE" or status == "CREATE FAILED":
        break
        
    time.sleep(60)

ジョブの実行時間はモデルや実行タイミング、対象ユーザ数などいろいろな要因によって変動するものだと思います。参考までに、今回の943ユーザに対してそれぞれ100個のレコメンドを作成する場合だと、30分弱程度かかりました。

レコメンド内容の確認

S3に保存されたレコメンドデータをダウンロードし、扱いやすいように変換します。

def transform_recommendation(dic):
    return (
        int(dic['input']['userId']), list(map(lambda x: int(x), dic['output']['recommendedItems']))
    )
    
user_base_recommendation = bucket.Object(user_ids_json_s3_path + '.out').get()['Body'].read()
user_base_recommendation = dict([transform_recommendation(json.loads(ss)) for ss in user_base_recommendation.splitlines()])

まずは対象ユーザのインタラクションデータを確認します。

def fetch_interaction(user_id):
    return df[df.USER_ID == user_id].join(items, on='ITEM_ID').sort_values('TIMESTAMP', ascending=False).set_index('ITEM_ID').loc[:, ['TITLE', 'GENRE', 'RATING', 'TIMESTAMP']]
fetch_interaction(1)[:20]

レコメンドされた映画を確認してみます。

def fetch_recommendation(user_id):
    return items[items.index.isin(user_base_recommendation[user_id])]
fetch_recommendation(1)[:20]

さいごに

Boto3を使ってPersonalizeのバッチレコメンドを行う流れについて紹介しました。用途によってはリアルタイムレコメンドに比べて、バッチレコメンドの方が楽な場合もあるかと思いますが、ジョブ実行時間のオーバーヘッドがそれなりにあります。それぞれの特徴を理解した上で活用していきたいですね。

参考