Amazon Personalize でコールドアイテムに対応したレシピでレコメンドしてみた – 機械学習 on AWS Advent Calendar 2019
どうも、DA事業本部の大澤です。
当エントリは『機械学習 on AWS Advent Calendar 2019』の7日目のエントリです。
- クラスメソッド 機械学習 on AWS Advent Calendar 2019 - Qiita
- クラスメソッド 機械学習 on AWS Advent Calendar 2019 | シリーズ | Developers.IO
今回は「Amazon Personalize でコールドアイテムに対応したレシピでレコメンドしてみた」についてご紹介します。
概要
Amazon Personalize では定義済みのレシピがいくつか用意されています。
- USER_PERSONALIZATION
- HRNN
- HRNN-Metadata
- HRNN-Coldstart
- Popularity-Count
- PERSONALIZED_RANKING
- Personalized-Ranking
- RELATED_ITEMS
- SIMS
USER_PERSONALIZATION タイプの中にHRNN-Coldstart
というレシピがあります。このレシピはメタデータも使って学習するところはHRNN-Metadata
と同じですが、ユーザとのインタラクションデータが少ないもしくはないアイテム(コールドアイテム)に対応したレコメンドが可能です。
このレシピには次のようなメリットがあります。
- ユーザとのインタラクションデータがないもしくは少ない新規アイテムのレコメンドが可能
- ユーザとのインタラクションデータが多くなく学習の際にノイズになり得るデータをコールドアイテムとして扱った学習が可能
コールドアイテム
ここではユーザとのインタラクションデータが少ないもしくはないアイテムをコールドアイテムもしくはコールドスタートアイテムとしています。HRNN-Coldstart
レシピでは次の条件を満たしたアイテムをコールドアイテムとして扱います。
- インタラクションデータ数が
cold_start_max_interactions
よりも少ないアイテム - インタラクションデータに含まれる期間が
cold_start_max_duration
よりも短いアイテム
HRNN-Coldstartが使用するデータセットタイプ
- (必須)User-item interaction
- (必須)Item
- User
制限
HRNN-Coldstart
レシピを使う場合、ソリューションの作成に用いるデータセットに含まれるコールドアイテムの数は次の条件を満たす必要があります。コールドアイテムの数はハイパーパラメータのcold_start_max_interactions
とcold_start_max_duration
で調整可能です。
- 最大コールドアイテム数: 80,000
- 最小コールドアイテム数: 100
やってみる
Amazon Personalize Samples に含まれる次のノートブックを進めていきます。
このノートブックは次のような内容で構成されています。
- データセットのダウンロード、前処理、アップロード
- スキーマ作成、データセットグループ作成
- データセット: MovieLens 1M Dataset
- データセットのインポートと加工
- ソリューションの作成
- ソリューションメトリクスの確認
- キャンペーンの作成
- 検証
- リソースの削除
ライブラリの読み込み
まずは使用するライブラリ/モジュールを読み込みます。
import tempfile, subprocess, urllib.request, zipfile import pandas as pd, numpy as np import io import scipy.sparse as ss import json import time import os import boto3
データセットのダウンロード
MovieLens 1M Datasetのダウンロードを行い、pandasで読み込みます。
with tempfile.TemporaryDirectory() as tmpdir: urllib.request.urlretrieve( 'http://files.grouplens.org/datasets/movielens/ml-1m.zip', tmpdir + '/ml-1m.zip') zipfile.ZipFile(tmpdir + '/ml-1m.zip').extractall(tmpdir) print(subprocess.check_output(['ls', tmpdir+'/ml-1m']).decode('utf-8')) df_all = pd.read_csv( tmpdir + '/ml-1m/ratings.dat', sep='::', names=['USER_ID','ITEM_ID','EVENT_VALUE', 'TIMESTAMP']) df_all['EVENT_TYPE']='RATING' items_all = pd.read_csv( tmpdir + '/ml-1m/movies.dat', sep='::', encoding='latin1', names=['ITEM_ID', '_TITLE', 'GENRE'], ) del items_all['_TITLE']
加工用にアイテムのデータをコピーし中身を確認します。
pd.set_option('display.max_rows', 5) items = items_all.copy() items
インタラクションデータも同様にコピー&中身を確認します。
df = df_all.copy() df
データセットの加工
アイテムIDをシャッフルし、コールドアイテムとそうでないアイテムに分けます。 コールドアイテムとなったアイテムをインタラクションデータから削除します。
unique_items = df['ITEM_ID'].unique() unique_items = np.random.permutation(unique_items) len(unique_items) warm_items = unique_items[len(unique_items)//2:] cold_items = unique_items[:len(unique_items)//2] df['to_keep'] = df['ITEM_ID'].apply(lambda x:x in warm_items) df=df[df['to_keep']] del df['to_keep'] df
インタラクションデータの行数が約半分程度となりました。
CSV形式で書き出します。
df.to_csv('interactions.csv',index=False)
アイテムデータにはもともとインタラクションデータに含まれていないデータも含まれています。なので、そういったデータを取り除いておきます。(コールドアイテムを取り除くわけではないので注意。)
items['to_keep'] = items['ITEM_ID'].apply(lambda x:x in unique_items) items=items[items['to_keep']] del items['to_keep'] items
アイテムデータもCSV形式で書き出します。
items.to_csv('item_metadata.csv',index=False)
データアップロード
今回作業に使用するS3バケットを作成します。
# os.environ['AWS_DEFAULT_REGION']="us-east-1" # 必要に応じてリージョンを設定 suffix = str(np.random.uniform())[4:9] bucket = "demo-temporal-holdout-metadata-"+suffix # 必要に応じてバケット名を指定 !aws s3 mb s3://{bucket}
Personalizeのクライアントを作成しておきます。
personalize = boto3.client('personalize') personalize_runtime = boto3.client('personalize-runtime')
先ほど作成したCSVをS3へアップロードします。
interactions_filename = 'interactions.csv' boto3.Session().resource('s3').Bucket(bucket).Object(interactions_filename).upload_file(interactions_filename) item_metadata_file = 'item_metadata.csv' boto3.Session().resource('s3').Bucket(bucket).Object(item_metadata_file).upload_file(item_metadata_file)
スキーマの作成
インタラクションデータがどういう構成かを示すスキーマを作成します。インタラクションデータのdfと同じカラム構成を記述します。
schema_name="DEMO-temporal-metadata-schema-"+suffix schema = { "type": "record", "name": "Interactions", "namespace": "com.amazonaws.personalize.schema", "fields": [ { "name": "USER_ID", "type": "string" }, { "name": "ITEM_ID", "type": "string" }, { "name": "EVENT_VALUE", "type": "float" }, { "name": "TIMESTAMP", "type": "long" }, { "name": "EVENT_TYPE", "type": "string" }, ], "version": "1.0" } create_schema_response = personalize.create_schema( name = schema_name, schema = json.dumps(schema) ) schema_arn = create_schema_response['schemaArn']
アイテムデータのスキーマも同様に作成します。
metadata_schema_name="DEMO-temporal-metadata-metadataschema-"+suffix metadata_schema = { "type": "record", "name": "Items", "namespace": "com.amazonaws.personalize.schema", "fields": [ { "name": "ITEM_ID", "type": "string" }, { "name": "GENRE", "type": "string", "categorical": True } ], "version": "1.0" } create_metadata_schema_response = personalize.create_schema( name = metadata_schema_name, schema = json.dumps(metadata_schema) ) metadata_schema_arn = create_metadata_schema_response['schemaArn']
データセットグループとデータセットの作成
データセットグループの作成を行います。作成には数十秒ほどかかります。
dataset_group_name = "DEMO-temporal-metadata-dataset-group-" + suffix create_dataset_group_response = personalize.create_dataset_group( name = dataset_group_name ) dataset_group_arn = create_dataset_group_response['datasetGroupArn'] status = None max_time = time.time() + 3*60*60 # 3 hours while time.time() < max_time: describe_dataset_group_response = personalize.describe_dataset_group( datasetGroupArn = dataset_group_arn ) status = describe_dataset_group_response["datasetGroup"]["status"] print("DatasetGroup: {}".format(status)) if status == "ACTIVE" or status == "CREATE FAILED": break time.sleep(20)
データセットグループの作成が完了したら、インタラクション用のデータセットとアイテム用のデータセットを作成します。
dataset_type = "INTERACTIONS" create_dataset_response = personalize.create_dataset( datasetType = dataset_type, datasetGroupArn = dataset_group_arn, schemaArn = schema_arn, name = "DEMO-temporal-metadata-dataset-interactions-" + suffix ) interactions_dataset_arn = create_dataset_response['datasetArn'] dataset_type = "ITEMS" create_metadata_dataset_response = personalize.create_dataset( datasetType = dataset_type, datasetGroupArn = dataset_group_arn, schemaArn = metadata_schema_arn, name = "DEMO-temporal-metadata-dataset-items-" + suffix ) metadata_dataset_arn = create_metadata_dataset_response['datasetArn']
バケットポリシーの設定
Personalizeからバケットにアクセスができるようにバケットポリシーを設定します。
s3 = boto3.client("s3") policy = { "Version": "2012-10-17", "Id": "PersonalizeS3BucketAccessPolicy", "Statement": [ { "Sid": "PersonalizeS3BucketAccessPolicy", "Effect": "Allow", "Principal": { "Service": "personalize.amazonaws.com" }, "Action": [ "s3:GetObject", "s3:ListBucket" ], "Resource": [ "arn:aws:s3:::{}".format(bucket), "arn:aws:s3:::{}/*".format(bucket) ] } ] } s3.put_bucket_policy(Bucket=bucket, Policy=json.dumps(policy))
IAMロールの作成
Personalizeがデータの読み込み時に使用するIAMロールを作成します。
from botocore.exceptions import ClientError iam = boto3.client("iam") role_name = "PersonalizeS3Role-"+suffix assume_role_policy_document = { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "personalize.amazonaws.com" }, "Action": "sts:AssumeRole" } ] } try: create_role_response = iam.create_role( RoleName = role_name, AssumeRolePolicyDocument = json.dumps(assume_role_policy_document) ); iam.attach_role_policy( RoleName = role_name, PolicyArn = "arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess" ); role_arn = create_role_response["Role"]["Arn"] except ClientError as e: if e.response['Error']['Code'] == 'EntityAlreadyExists': role_arn = iam.get_role(RoleName=role_name)['Role']['Arn'] else: raise # IAMロールが使えるようになるまで時間が必要になることがあるので、少し待つ time.sleep(20)
データセットにデータをインポートする
インタラクションデータセットとアイテムデータセットにそれぞれのデータをS3から読み込ませるために、インポートジョブを作成します。
create_dataset_import_job_response = personalize.create_dataset_import_job( jobName = "DEMO-temporal-dataset-import-job-"+suffix, datasetArn = interactions_dataset_arn, dataSource = { "dataLocation": "s3://{}/{}".format(bucket, 'interactions.csv') }, roleArn = role_arn ) dataset_import_job_arn = create_dataset_import_job_response['datasetImportJobArn'] create_dataset_import_job_response = personalize.create_dataset_import_job( jobName = "DEMO-temporal-dataset-import-job-"+suffix, datasetArn = interactions_dataset_arn, dataSource = { "dataLocation": "s3://{}/{}".format(bucket, 'interactions.csv') }, roleArn = role_arn ) dataset_import_job_arn = create_dataset_import_job_response['datasetImportJobArn']
データセットの読み込みには時間がかかるので、完了するまで待ちます。試した際には10分ほどでジョブが完了しました。
status = None max_time = time.time() + 3*60*60 # 3 hours while time.time() < max_time: describe_dataset_import_job_response = personalize.describe_dataset_import_job( datasetImportJobArn = dataset_import_job_arn ) dataset_import_job = describe_dataset_import_job_response["datasetImportJob"] if "latestDatasetImportJobRun" not in dataset_import_job: status = dataset_import_job["status"] print("DatasetImportJob: {}".format(status)) else: status = dataset_import_job["latestDatasetImportJobRun"]["status"] print("LatestDatasetImportJobRun: {}".format(status)) if status == "ACTIVE" or status == "CREATE FAILED": break time.sleep(60) status = None max_time = time.time() + 3*60*60 # 3 hours while time.time() < max_time: describe_dataset_import_job_response = personalize.describe_dataset_import_job( datasetImportJobArn = metadata_dataset_import_job_arn ) dataset_import_job = describe_dataset_import_job_response["datasetImportJob"] if "latestDatasetImportJobRun" not in dataset_import_job: status = dataset_import_job["status"] print("DatasetImportJob: {}".format(status)) else: status = dataset_import_job["latestDatasetImportJobRun"]["status"] print("LatestDatasetImportJobRun: {}".format(status)) if status == "ACTIVE" or status == "CREATE FAILED": break time.sleep(60)
ソリューション作成
レシピのARNを確認します。
recipe_list = personalize.list_recipes() for recipe in recipe_list['recipes']: print(recipe['recipeArn'])
コールドスタートに対応したレシピを指定して、ソリューションを作成します。
recipe_arn = "arn:aws:personalize:::recipe/aws-hrnn-coldstart" create_solution_response = personalize.create_solution( name = "DEMO-temporal-metadata-solution-"+suffix, datasetGroupArn = dataset_group_arn, recipeArn = recipe_arn, solutionConfig = { "featureTransformationParameters" : { 'cold_start_max_duration' : '5', 'cold_start_relative_from' : 'latestItem', 'cold_start_max_interactions':'15' } } ) solution_arn = create_solution_response['solutionArn']
ソリューションバージョンを作成して、ジョブが完了するまで待ちます。
create_solution_version_response = personalize.create_solution_version( solutionArn = solution_arn ) solution_version_arn = create_solution_version_response['solutionVersionArn'] status = None max_time = time.time() + 3*60*60 # 3 hours while time.time() < max_time: describe_solution_version_response = personalize.describe_solution_version( solutionVersionArn = solution_version_arn ) status = describe_solution_version_response["solutionVersion"]["status"] print("SolutionVersion: {}".format(status)) if status == "ACTIVE" or status == "CREATE FAILED": break time.sleep(60)
ソリューションバージョンの作成では、レコメンドに使用する機械学習モデルの学習を行うため、時間がかかります。今回試した際には40分ほどかかりました。この時間はデータの量やレシピなどに依存するため、どれくらいの時間がかかるかの見積もりは実際に試してみるのが早いかと思います。
ソリューションメトリクスの確認
作成したソリューションバージョンのメトリクスを取得します。
get_solution_metrics_response = personalize.get_solution_metrics( solutionVersionArn = solution_version_arn ) print(json.dumps(get_solution_metrics_response, indent=2))
半分のアイテムに関するインタラクションデータを抜いたため、メトリクスはどれもかなり小さい値です。
ノートブックでも次のように記載されている通り、コールドスタートに対応したソリューションのメトリクスはオフラインでの評価が難しいということが分かります。
What happened here? Since we deleted all cold start items from the training set, the metrics are close to zero. An important lesson here is that metrics for cold start are hard to evaluate offline.
キャンペーンの作成
作成したソリューションバージョンをホスティングするキャンペーンを作成します。
create_campaign_response = personalize.create_campaign( name = "DEMO-coldstart-campaign-"+suffix, solutionVersionArn = solution_version_arn, minProvisionedTPS = 2, ) campaign_arn = create_campaign_response['campaignArn'] status = None max_time = time.time() + 3*60*60 # 3 hours while time.time() < max_time: describe_campaign_response = personalize.describe_campaign( campaignArn = campaign_arn ) status = describe_campaign_response["campaign"]["status"] print("Campaign: {}".format(status)) if status == "ACTIVE" or status == "CREATE FAILED": break time.sleep(60)
試した際には10分ほどで作成が完了しました。
検証
作成したキャンペーンを使っていくつか検証を行います。 検証のためにコールドアイテムのインタラクションデータを取り出します。
# we had saved all the data before deleting the cold items df = df_all.copy() df['to_keep'] = df['ITEM_ID'].apply(lambda x:x in cold_items) df=df[df['to_keep']] users = df['USER_ID'].unique() del df['to_keep'] df
検証に使用するライブラリ/モジュールを読み込みます。metricsは今回使用しているノートブックがあるリポジトリに入っているものです。レコメンドの各種メトリクスを計算するための関数が含まれています。
from tqdm import tqdm_notebook import numpy as np from metrics import mean_reciprocal_rank, ndcg_at_k, precision_at_k
コールドアイテムに関するインタラクションデータで改めてメトリクスを計算してみます。
relevance = [] for user_id in tqdm_notebook(users[:1000]): true_items = set(df[df['USER_ID']==user_id]['ITEM_ID'].values) rec_response = personalize_runtime.get_recommendations( campaignArn = campaign_arn, userId = str(user_id) ) rec_items = [int(x['itemId']) for x in rec_response['itemList']] relevance.append([int(x in true_items) for x in rec_items]) print('mean_reciprocal_rank', np.mean([mean_reciprocal_rank(r) for r in relevance])) print('precision_at_5', np.mean([precision_at_k(r, 5) for r in relevance])) print('precision_at_10', np.mean([precision_at_k(r, 10) for r in relevance])) print('precision_at_25', np.mean([precision_at_k(r, 25) for r in relevance])) print('normalized_discounted_cumulative_gain_at_5', np.mean([ndcg_at_k(r, 5) for r in relevance])) print('normalized_discounted_cumulative_gain_at_10', np.mean([ndcg_at_k(r, 10) for r in relevance])) print('normalized_discounted_cumulative_gain_at_25', np.mean([ndcg_at_k(r, 25) for r in relevance]))
今回はちゃんと対象アイテムのインタラクションデータを元に計算しているため、ソリューションメトリクスとして取得したものに比べてよい結果が出ました。
ランダムにレコメンドした場合との比較
ランダムにレコメンドした場合のメトリクスを計算してみて、今回作成したソリューションが良いものかどうか比較してみます。
relevance = [] for user_id in tqdm_notebook(users[:1000]): true_items = set(df[df['USER_ID']==user_id]['ITEM_ID'].values) rec_items = np.random.permutation(cold_items)[:25] relevance.append([int(x in true_items) for x in rec_items]) print('mean_reciprocal_rank', np.mean([mean_reciprocal_rank(r) for r in relevance])) print('precision_at_5', np.mean([precision_at_k(r, 5) for r in relevance])) print('precision_at_10', np.mean([precision_at_k(r, 10) for r in relevance])) print('precision_at_25', np.mean([precision_at_k(r, 25) for r in relevance])) print('normalized_discounted_cumulative_gain_at_5', np.mean([ndcg_at_k(r, 5) for r in relevance])) print('normalized_discounted_cumulative_gain_at_10', np.mean([ndcg_at_k(r, 10) for r in relevance])) print('normalized_discounted_cumulative_gain_at_25', np.mean([ndcg_at_k(r, 25) for r in relevance]))
ランダムなので先ほど計算したメトリクスに比べてよくない結果となりました。
レコメンド内容を見てみる
次にユーザを抽出して、レコメンド内容を見てみます。 対象ユーザのインタラクションデータを確認するために、ソリューション作成時に使用したインタラクションデータをタイムスタンプ順に並び替えます。
# we had saved all the data before deleting the cold items df = df_all.copy() df['to_keep'] = df['ITEM_ID'].apply(lambda x:x in warm_items) df=df[df['to_keep']] del df['to_keep'] df = df.sort_values('TIMESTAMP', kind='mergesort').copy() df
ユーザの視聴したアイテムをみてみます。
user_id = users[1] hist_items = df[df['USER_ID']==user_id]['ITEM_ID'].tail(5).values items_all.set_index('ITEM_ID').loc[hist_items]
Action
とThriller
のジャンルが目立ちます。
対象ユーザにレコメンドし、内容を確認します。
rec_response = personalize_runtime.get_recommendations( campaignArn = campaign_arn, userId = str(user_id) ) rec_items = [int(x['itemId']) for x in rec_response['itemList']] items_all.set_index('ITEM_ID').loc[rec_items[:5]]
Action
系のアイテムが主にレコメンドされているようです。
同様に別のユーザでも確認してみます。 まずは視聴履歴からみてみます。
user_id = users[2] hist_items = df[df['USER_ID']==user_id]['ITEM_ID'].tail(5).values items_all.set_index('ITEM_ID').loc[hist_items]
Comedy
系のアイテムが目立ちます。
レコメンド内容もみてみます。
rec_response = personalize_runtime.get_recommendations( campaignArn = campaign_arn, userId = str(user_id) ) rec_items = [int(x['itemId']) for x in rec_response['itemList']] items_all.set_index('ITEM_ID').loc[rec_items[:5]]
レコメンドされたアイテムもコメディ系が多いようです。
後片付け
検証用に作成した各種リソースは不要になったら削除しましょう。特にキャンペーンは時間に応じて課金されるため、注意が必要です。
personalize.delete_campaign(campaignArn=campaign_arn) while len(personalize.list_campaigns(solutionArn=solution_arn)['campaigns']): time.sleep(5) personalize.delete_solution(solutionArn=solution_arn) while len(personalize.list_solutions(datasetGroupArn=dataset_group_arn)['solutions']): time.sleep(5) for dataset in personalize.list_datasets(datasetGroupArn=dataset_group_arn)['datasets']: personalize.delete_dataset(datasetArn=dataset['datasetArn']) while len(personalize.list_datasets(datasetGroupArn=dataset_group_arn)['datasets']): time.sleep(5) personalize.delete_dataset_group(datasetGroupArn=dataset_group_arn)
検証用に作成したS3のバケットも削除します。必要なバケットを削除しないよう、削除対象のバケットが正しいか注意が必要です。
!#aws s3 rm s3://{bucket} --recursive
さいごに
『機械学習 on AWS Advent Calendar 2019』 7日目、「Amazon Personalize でコールドアイテムに対応したレシピでレコメンドしてみた」についてお伝えしました。
頻繁にアイテムが追加されたり、ユーザに選択されることが少ないアイテムがあるケースは多いかと思います。そういったケースではHRNN-Coldstart
を使うことでレコメンドの質を高めることができる可能性があります。一度試してみてはいかがでしょうか。