Amazon Personalize でコールドアイテムに対応したレシピでレコメンドしてみた – 機械学習 on AWS Advent Calendar 2019

『機械学習 on AWS Advent Calendar 2019』の7日目のエントリです。

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

どうも、DA事業本部の大澤です。

当エントリは『機械学習 on AWS Advent Calendar 2019』の7日目のエントリです。

今回は「Amazon Personalize でコールドアイテムに対応したレシピでレコメンドしてみた」についてご紹介します。

概要

Amazon Personalize では定義済みのレシピがいくつか用意されています。

  • USER_PERSONALIZATION
    • HRNN
    • HRNN-Metadata
    • HRNN-Coldstart
    • Popularity-Count
  • PERSONALIZED_RANKING
    • Personalized-Ranking
  • RELATED_ITEMS
    • SIMS

USER_PERSONALIZATION タイプの中にHRNN-Coldstartというレシピがあります。このレシピはメタデータも使って学習するところはHRNN-Metadataと同じですが、ユーザとのインタラクションデータが少ないもしくはないアイテム(コールドアイテム)に対応したレコメンドが可能です。 このレシピには次のようなメリットがあります。

  • ユーザとのインタラクションデータがないもしくは少ない新規アイテムのレコメンドが可能
  • ユーザとのインタラクションデータが多くなく学習の際にノイズになり得るデータをコールドアイテムとして扱った学習が可能

コールドアイテム

ここではユーザとのインタラクションデータが少ないもしくはないアイテムをコールドアイテムもしくはコールドスタートアイテムとしています。HRNN-Coldstartレシピでは次の条件を満たしたアイテムをコールドアイテムとして扱います。

  • インタラクションデータ数がcold_start_max_interactionsよりも少ないアイテム
  • インタラクションデータに含まれる期間がcold_start_max_durationよりも短いアイテム

HRNN-Coldstartが使用するデータセットタイプ

  • (必須)User-item interaction
  • (必須)Item
  • User

制限

HRNN-Coldstartレシピを使う場合、ソリューションの作成に用いるデータセットに含まれるコールドアイテムの数は次の条件を満たす必要があります。コールドアイテムの数はハイパーパラメータのcold_start_max_interactionscold_start_max_durationで調整可能です。

  • 最大コールドアイテム数: 80,000
  • 最小コールドアイテム数: 100

やってみる

Amazon Personalize Samples に含まれる次のノートブックを進めていきます。

このノートブックは次のような内容で構成されています。

  • データセットのダウンロード、前処理、アップロード
  • スキーマ作成、データセットグループ作成
  • データセットのインポートと加工
  • ソリューションの作成
  • ソリューションメトリクスの確認
  • キャンペーンの作成
  • 検証
  • リソースの削除

ライブラリの読み込み

まずは使用するライブラリ/モジュールを読み込みます。

import tempfile, subprocess, urllib.request, zipfile
import pandas as pd, numpy as np
import io
import scipy.sparse as ss
import json
import time
import os
import boto3

データセットのダウンロード

MovieLens 1M Datasetのダウンロードを行い、pandasで読み込みます。

with tempfile.TemporaryDirectory() as tmpdir:
    urllib.request.urlretrieve(
        'http://files.grouplens.org/datasets/movielens/ml-1m.zip',
        tmpdir + '/ml-1m.zip')
    zipfile.ZipFile(tmpdir + '/ml-1m.zip').extractall(tmpdir)
    print(subprocess.check_output(['ls', tmpdir+'/ml-1m']).decode('utf-8'))
    
    df_all = pd.read_csv(
        tmpdir + '/ml-1m/ratings.dat',
        sep='::',
        names=['USER_ID','ITEM_ID','EVENT_VALUE', 'TIMESTAMP'])
    df_all['EVENT_TYPE']='RATING'

    items_all = pd.read_csv(
        tmpdir + '/ml-1m/movies.dat',
        sep='::', encoding='latin1',
        names=['ITEM_ID', '_TITLE', 'GENRE'],
    )
    del items_all['_TITLE']

加工用にアイテムのデータをコピーし中身を確認します。

pd.set_option('display.max_rows', 5)
items = items_all.copy()
items

インタラクションデータも同様にコピー&中身を確認します。

df = df_all.copy()
df

データセットの加工

アイテムIDをシャッフルし、コールドアイテムとそうでないアイテムに分けます。 コールドアイテムとなったアイテムをインタラクションデータから削除します。

unique_items = df['ITEM_ID'].unique()
unique_items = np.random.permutation(unique_items)
len(unique_items)

warm_items = unique_items[len(unique_items)//2:]
cold_items = unique_items[:len(unique_items)//2]

df['to_keep'] = df['ITEM_ID'].apply(lambda x:x in warm_items)
df=df[df['to_keep']]
del df['to_keep']
df

インタラクションデータの行数が約半分程度となりました。

CSV形式で書き出します。

df.to_csv('interactions.csv',index=False)

アイテムデータにはもともとインタラクションデータに含まれていないデータも含まれています。なので、そういったデータを取り除いておきます。(コールドアイテムを取り除くわけではないので注意。)

items['to_keep'] = items['ITEM_ID'].apply(lambda x:x in unique_items)
items=items[items['to_keep']]
del items['to_keep']
items

アイテムデータもCSV形式で書き出します。

items.to_csv('item_metadata.csv',index=False)

データアップロード

今回作業に使用するS3バケットを作成します。

# os.environ['AWS_DEFAULT_REGION']="us-east-1" # 必要に応じてリージョンを設定
suffix = str(np.random.uniform())[4:9]
bucket = "demo-temporal-holdout-metadata-"+suffix     # 必要に応じてバケット名を指定
!aws s3 mb s3://{bucket}

Personalizeのクライアントを作成しておきます。

personalize = boto3.client('personalize')
personalize_runtime = boto3.client('personalize-runtime')

先ほど作成したCSVをS3へアップロードします。

interactions_filename = 'interactions.csv'
boto3.Session().resource('s3').Bucket(bucket).Object(interactions_filename).upload_file(interactions_filename)
item_metadata_file = 'item_metadata.csv'
boto3.Session().resource('s3').Bucket(bucket).Object(item_metadata_file).upload_file(item_metadata_file)

スキーマの作成

インタラクションデータがどういう構成かを示すスキーマを作成します。インタラクションデータのdfと同じカラム構成を記述します。

schema_name="DEMO-temporal-metadata-schema-"+suffix

schema = {
    "type": "record",
    "name": "Interactions",
    "namespace": "com.amazonaws.personalize.schema",
    "fields": [
        {
            "name": "USER_ID",
            "type": "string"
        },
        {
            "name": "ITEM_ID",
            "type": "string"
        },
        {
            "name": "EVENT_VALUE",
            "type": "float"
        },
        {
            "name": "TIMESTAMP",
            "type": "long"
        },
        { 
            "name": "EVENT_TYPE",
            "type": "string"
        },
    ],
    "version": "1.0"
}

create_schema_response = personalize.create_schema(
    name = schema_name,
    schema = json.dumps(schema)
)

schema_arn = create_schema_response['schemaArn']

アイテムデータのスキーマも同様に作成します。

metadata_schema_name="DEMO-temporal-metadata-metadataschema-"+suffix
metadata_schema = {
    "type": "record",
    "name": "Items",
    "namespace": "com.amazonaws.personalize.schema",
    "fields": [
    {
        "name": "ITEM_ID",
        "type": "string"
    },
    {
        "name": "GENRE",
        "type": "string",
        "categorical": True
    }
    ],
    "version": "1.0"
}

create_metadata_schema_response = personalize.create_schema(
    name = metadata_schema_name,
    schema = json.dumps(metadata_schema)
)

metadata_schema_arn = create_metadata_schema_response['schemaArn']

データセットグループとデータセットの作成

データセットグループの作成を行います。作成には数十秒ほどかかります。

dataset_group_name = "DEMO-temporal-metadata-dataset-group-" + suffix

create_dataset_group_response = personalize.create_dataset_group(
    name = dataset_group_name
)

dataset_group_arn = create_dataset_group_response['datasetGroupArn']

status = None
max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:
    describe_dataset_group_response = personalize.describe_dataset_group(
        datasetGroupArn = dataset_group_arn
    )
    status = describe_dataset_group_response["datasetGroup"]["status"]
    print("DatasetGroup: {}".format(status))
    
    if status == "ACTIVE" or status == "CREATE FAILED":
        break
        
    time.sleep(20)

データセットグループの作成が完了したら、インタラクション用のデータセットとアイテム用のデータセットを作成します。

dataset_type = "INTERACTIONS"
create_dataset_response = personalize.create_dataset(
    datasetType = dataset_type,
    datasetGroupArn = dataset_group_arn,
    schemaArn = schema_arn,
    name = "DEMO-temporal-metadata-dataset-interactions-" + suffix
)

interactions_dataset_arn = create_dataset_response['datasetArn']

dataset_type = "ITEMS"
create_metadata_dataset_response = personalize.create_dataset(
    datasetType = dataset_type,
    datasetGroupArn = dataset_group_arn,
    schemaArn = metadata_schema_arn,
    name = "DEMO-temporal-metadata-dataset-items-" + suffix
)

metadata_dataset_arn = create_metadata_dataset_response['datasetArn']

バケットポリシーの設定

Personalizeからバケットにアクセスができるようにバケットポリシーを設定します。

s3 = boto3.client("s3")

policy = {
    "Version": "2012-10-17",
    "Id": "PersonalizeS3BucketAccessPolicy",
    "Statement": [
        {
            "Sid": "PersonalizeS3BucketAccessPolicy",
            "Effect": "Allow",
            "Principal": {
                "Service": "personalize.amazonaws.com"
            },
            "Action": [
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::{}".format(bucket),
                "arn:aws:s3:::{}/*".format(bucket)
            ]
        }
    ]
}

s3.put_bucket_policy(Bucket=bucket, Policy=json.dumps(policy))

IAMロールの作成

Personalizeがデータの読み込み時に使用するIAMロールを作成します。

from botocore.exceptions import ClientError
iam = boto3.client("iam")

role_name = "PersonalizeS3Role-"+suffix
assume_role_policy_document = {
    "Version": "2012-10-17",
    "Statement": [
        {
          "Effect": "Allow",
          "Principal": {
            "Service": "personalize.amazonaws.com"
          },
          "Action": "sts:AssumeRole"
        }
    ]
}
try:
    create_role_response = iam.create_role(
        RoleName = role_name,
        AssumeRolePolicyDocument = json.dumps(assume_role_policy_document)
    );

    iam.attach_role_policy(
        RoleName = role_name,
        PolicyArn = "arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
    );

    role_arn = create_role_response["Role"]["Arn"]
except ClientError as e:
    if e.response['Error']['Code'] == 'EntityAlreadyExists':
        role_arn = iam.get_role(RoleName=role_name)['Role']['Arn']
    else:
        raise

# IAMロールが使えるようになるまで時間が必要になることがあるので、少し待つ
time.sleep(20)

データセットにデータをインポートする

インタラクションデータセットとアイテムデータセットにそれぞれのデータをS3から読み込ませるために、インポートジョブを作成します。

create_dataset_import_job_response = personalize.create_dataset_import_job(
    jobName = "DEMO-temporal-dataset-import-job-"+suffix,
    datasetArn = interactions_dataset_arn,
    dataSource = {
        "dataLocation": "s3://{}/{}".format(bucket, 'interactions.csv')
    },
    roleArn = role_arn
)

dataset_import_job_arn = create_dataset_import_job_response['datasetImportJobArn']

create_dataset_import_job_response = personalize.create_dataset_import_job(
    jobName = "DEMO-temporal-dataset-import-job-"+suffix,
    datasetArn = interactions_dataset_arn,
    dataSource = {
        "dataLocation": "s3://{}/{}".format(bucket, 'interactions.csv')
    },
    roleArn = role_arn
)

dataset_import_job_arn = create_dataset_import_job_response['datasetImportJobArn']

データセットの読み込みには時間がかかるので、完了するまで待ちます。試した際には10分ほどでジョブが完了しました。

status = None
max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:
    describe_dataset_import_job_response = personalize.describe_dataset_import_job(
        datasetImportJobArn = dataset_import_job_arn
    )
    
    dataset_import_job = describe_dataset_import_job_response["datasetImportJob"]
    if "latestDatasetImportJobRun" not in dataset_import_job:
        status = dataset_import_job["status"]
        print("DatasetImportJob: {}".format(status))
    else:
        status = dataset_import_job["latestDatasetImportJobRun"]["status"]
        print("LatestDatasetImportJobRun: {}".format(status))
    
    if status == "ACTIVE" or status == "CREATE FAILED":
        break
        
    time.sleep(60)

status = None
max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:
    describe_dataset_import_job_response = personalize.describe_dataset_import_job(
        datasetImportJobArn = metadata_dataset_import_job_arn
    )
    
    dataset_import_job = describe_dataset_import_job_response["datasetImportJob"]
    if "latestDatasetImportJobRun" not in dataset_import_job:
        status = dataset_import_job["status"]
        print("DatasetImportJob: {}".format(status))
    else:
        status = dataset_import_job["latestDatasetImportJobRun"]["status"]
        print("LatestDatasetImportJobRun: {}".format(status))
    
    if status == "ACTIVE" or status == "CREATE FAILED":
        break
        
    time.sleep(60)

ソリューション作成

レシピのARNを確認します。

recipe_list = personalize.list_recipes()
for recipe in recipe_list['recipes']:
    print(recipe['recipeArn'])

コールドスタートに対応したレシピを指定して、ソリューションを作成します。

recipe_arn = "arn:aws:personalize:::recipe/aws-hrnn-coldstart"
create_solution_response = personalize.create_solution(
    name = "DEMO-temporal-metadata-solution-"+suffix,
    datasetGroupArn = dataset_group_arn,
    recipeArn = recipe_arn,
    solutionConfig = {
        "featureTransformationParameters" : {
            'cold_start_max_duration' : '5', 
            'cold_start_relative_from' : 'latestItem', 
            'cold_start_max_interactions':'15'
        }
    }
)

solution_arn = create_solution_response['solutionArn']

ソリューションバージョンを作成して、ジョブが完了するまで待ちます。

create_solution_version_response = personalize.create_solution_version(
    solutionArn = solution_arn
)

solution_version_arn = create_solution_version_response['solutionVersionArn']

status = None
max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:
    describe_solution_version_response = personalize.describe_solution_version(
        solutionVersionArn = solution_version_arn
    )
    status = describe_solution_version_response["solutionVersion"]["status"]
    print("SolutionVersion: {}".format(status))
    
    if status == "ACTIVE" or status == "CREATE FAILED":
        break
        
    time.sleep(60)

ソリューションバージョンの作成では、レコメンドに使用する機械学習モデルの学習を行うため、時間がかかります。今回試した際には40分ほどかかりました。この時間はデータの量やレシピなどに依存するため、どれくらいの時間がかかるかの見積もりは実際に試してみるのが早いかと思います。

ソリューションメトリクスの確認

作成したソリューションバージョンのメトリクスを取得します。

get_solution_metrics_response = personalize.get_solution_metrics(
    solutionVersionArn = solution_version_arn
)

print(json.dumps(get_solution_metrics_response, indent=2))

半分のアイテムに関するインタラクションデータを抜いたため、メトリクスはどれもかなり小さい値です。

ノートブックでも次のように記載されている通り、コールドスタートに対応したソリューションのメトリクスはオフラインでの評価が難しいということが分かります。

What happened here? Since we deleted all cold start items from the training set, the metrics are close to zero. An important lesson here is that metrics for cold start are hard to evaluate offline.

キャンペーンの作成

作成したソリューションバージョンをホスティングするキャンペーンを作成します。

create_campaign_response = personalize.create_campaign(
    name = "DEMO-coldstart-campaign-"+suffix,
    solutionVersionArn = solution_version_arn,
    minProvisionedTPS = 2,    
)

campaign_arn = create_campaign_response['campaignArn']

status = None
max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:
    describe_campaign_response = personalize.describe_campaign(
        campaignArn = campaign_arn
    )
    status = describe_campaign_response["campaign"]["status"]
    print("Campaign: {}".format(status))
    
    if status == "ACTIVE" or status == "CREATE FAILED":
        break
        
    time.sleep(60)

試した際には10分ほどで作成が完了しました。

検証

作成したキャンペーンを使っていくつか検証を行います。 検証のためにコールドアイテムのインタラクションデータを取り出します。

# we had saved all the data before deleting the cold items
df = df_all.copy()
df['to_keep'] = df['ITEM_ID'].apply(lambda x:x in cold_items)
df=df[df['to_keep']]
users = df['USER_ID'].unique()
del df['to_keep']
df

検証に使用するライブラリ/モジュールを読み込みます。metricsは今回使用しているノートブックがあるリポジトリに入っているものです。レコメンドの各種メトリクスを計算するための関数が含まれています。

from tqdm import tqdm_notebook
import numpy as np
from metrics import mean_reciprocal_rank, ndcg_at_k, precision_at_k

コールドアイテムに関するインタラクションデータで改めてメトリクスを計算してみます。

relevance = []
for user_id in  tqdm_notebook(users[:1000]):

    true_items = set(df[df['USER_ID']==user_id]['ITEM_ID'].values)

    rec_response = personalize_runtime.get_recommendations(
            campaignArn = campaign_arn,
            userId = str(user_id)
        )
    rec_items = [int(x['itemId']) for x in rec_response['itemList']]
    relevance.append([int(x in true_items) for x in rec_items])

print('mean_reciprocal_rank', np.mean([mean_reciprocal_rank(r) for r in relevance]))
print('precision_at_5', np.mean([precision_at_k(r, 5) for r in relevance]))
print('precision_at_10', np.mean([precision_at_k(r, 10) for r in relevance]))
print('precision_at_25', np.mean([precision_at_k(r, 25) for r in relevance]))
print('normalized_discounted_cumulative_gain_at_5', np.mean([ndcg_at_k(r, 5) for r in relevance]))
print('normalized_discounted_cumulative_gain_at_10', np.mean([ndcg_at_k(r, 10) for r in relevance]))
print('normalized_discounted_cumulative_gain_at_25', np.mean([ndcg_at_k(r, 25) for r in relevance]))

今回はちゃんと対象アイテムのインタラクションデータを元に計算しているため、ソリューションメトリクスとして取得したものに比べてよい結果が出ました。

ランダムにレコメンドした場合との比較

ランダムにレコメンドした場合のメトリクスを計算してみて、今回作成したソリューションが良いものかどうか比較してみます。

relevance = []
for user_id in  tqdm_notebook(users[:1000]):

    true_items = set(df[df['USER_ID']==user_id]['ITEM_ID'].values)
    rec_items = np.random.permutation(cold_items)[:25]
    relevance.append([int(x in true_items) for x in rec_items])

print('mean_reciprocal_rank', np.mean([mean_reciprocal_rank(r) for r in relevance]))
print('precision_at_5', np.mean([precision_at_k(r, 5) for r in relevance]))
print('precision_at_10', np.mean([precision_at_k(r, 10) for r in relevance]))
print('precision_at_25', np.mean([precision_at_k(r, 25) for r in relevance]))
print('normalized_discounted_cumulative_gain_at_5', np.mean([ndcg_at_k(r, 5) for r in relevance]))
print('normalized_discounted_cumulative_gain_at_10', np.mean([ndcg_at_k(r, 10) for r in relevance]))
print('normalized_discounted_cumulative_gain_at_25', np.mean([ndcg_at_k(r, 25) for r in relevance]))

ランダムなので先ほど計算したメトリクスに比べてよくない結果となりました。

レコメンド内容を見てみる

次にユーザを抽出して、レコメンド内容を見てみます。 対象ユーザのインタラクションデータを確認するために、ソリューション作成時に使用したインタラクションデータをタイムスタンプ順に並び替えます。

# we had saved all the data before deleting the cold items
df = df_all.copy()
df['to_keep'] = df['ITEM_ID'].apply(lambda x:x in warm_items)
df=df[df['to_keep']]
del df['to_keep']
df = df.sort_values('TIMESTAMP', kind='mergesort').copy()
df

ユーザの視聴したアイテムをみてみます。

user_id = users[1]
hist_items = df[df['USER_ID']==user_id]['ITEM_ID'].tail(5).values
items_all.set_index('ITEM_ID').loc[hist_items]

ActionThrillerのジャンルが目立ちます。

対象ユーザにレコメンドし、内容を確認します。

rec_response = personalize_runtime.get_recommendations(
            campaignArn = campaign_arn,
            userId = str(user_id)
        )
rec_items = [int(x['itemId']) for x in rec_response['itemList']]
items_all.set_index('ITEM_ID').loc[rec_items[:5]]

Action系のアイテムが主にレコメンドされているようです。

同様に別のユーザでも確認してみます。 まずは視聴履歴からみてみます。

user_id = users[2]
hist_items = df[df['USER_ID']==user_id]['ITEM_ID'].tail(5).values
items_all.set_index('ITEM_ID').loc[hist_items]

Comedy系のアイテムが目立ちます。

レコメンド内容もみてみます。

rec_response = personalize_runtime.get_recommendations(
            campaignArn = campaign_arn,
            userId = str(user_id)
        )
rec_items = [int(x['itemId']) for x in rec_response['itemList']]
items_all.set_index('ITEM_ID').loc[rec_items[:5]]

レコメンドされたアイテムもコメディ系が多いようです。

後片付け

検証用に作成した各種リソースは不要になったら削除しましょう。特にキャンペーンは時間に応じて課金されるため、注意が必要です。

personalize.delete_campaign(campaignArn=campaign_arn)
while len(personalize.list_campaigns(solutionArn=solution_arn)['campaigns']):
    time.sleep(5)

personalize.delete_solution(solutionArn=solution_arn)
while len(personalize.list_solutions(datasetGroupArn=dataset_group_arn)['solutions']):
    time.sleep(5)

for dataset in personalize.list_datasets(datasetGroupArn=dataset_group_arn)['datasets']:
    personalize.delete_dataset(datasetArn=dataset['datasetArn'])
while len(personalize.list_datasets(datasetGroupArn=dataset_group_arn)['datasets']):
    time.sleep(5)

personalize.delete_dataset_group(datasetGroupArn=dataset_group_arn)

検証用に作成したS3のバケットも削除します。必要なバケットを削除しないよう、削除対象のバケットが正しいか注意が必要です。

!#aws s3 rm s3://{bucket} --recursive

さいごに

『機械学習 on AWS Advent Calendar 2019』 7日目、「Amazon Personalize でコールドアイテムに対応したレシピでレコメンドしてみた」についてお伝えしました。 頻繁にアイテムが追加されたり、ユーザに選択されることが少ないアイテムがあるケースは多いかと思います。そういったケースではHRNN-Coldstartを使うことでレコメンドの質を高めることができる可能性があります。一度試してみてはいかがでしょうか。