Amazon Personalize のHRNN-Metadata レシピ触ってみた – 機械学習 on AWS Advent Calendar 2019
こんにちは、Mr.Moです。
当エントリは『機械学習 on AWS Advent Calendar 2019』の3日目のエントリです。
- クラスメソッド 機械学習 on AWS Advent Calendar 2019 - Qiita
- クラスメソッド 機械学習 on AWS Advent Calendar 2019 | シリーズ | Developers.IO
下記の記事を見てAmazon Personalizeに非常に興味が湧いたので、早速触ってみました。
HIGOBASHI.AWS 第12回 活用編で「Amazon Personalizeではじめるレコメンドサービス」について話しました #higobashiaws
今回は記事内にも紹介がありました「Amazon Personalize Samples」の内、下記のHRNN-Metadata レシピを使用しているノートブックを扱っていきます。
HRNN-Metadata レシピとは
HRNN- メタデータレシピは、ユーザーがやり取りするアイテムを予測します。HRNN レシピに類似していますが、コンテキスト、ユーザー、アイテムのメタデータ (それぞれソースは、インタラクション、ユーザー、アイテムのデータセット) から派生した追加の特徴を含みます。高品質なメタデータを使用できる場合、HRNN-Metadata は非メタデータモデルよりも結果が正確になります。このレシピを使用するには、より長いトレーニング時間が必要になる場合があります。
https://docs.aws.amazon.com/ja_jp/personalize/latest/dg/native-recipe-hrnn-metadata.html
さっそくノートブックを進めていく
こちらはAmazon Personalize APIを詳しく知るために用意されているものです。
前提としてAmazon SageMaker上で作業をしています。
ライブラリのインポート
必要なライブラリをインポートしていきます。
import tempfile, subprocess, urllib.request, zipfile import pandas as pd, numpy as np import io import scipy.sparse as ss import json import time import os import boto3
データセットの準備
MovieLensデータセットを利用します。 こちらは131,263件ほどレコードがあるようです。
Amazon Personalize では3 種類の履歴データセット型を認識しますが、今回はその中でもインタラクションデータセットとアイテムデータセット(メタデータ)を使用します。
with tempfile.TemporaryDirectory() as tmpdir: urllib.request.urlretrieve( 'http://files.grouplens.org/datasets/movielens/ml-20m.zip', tmpdir + '/ml-20m.zip') zipfile.ZipFile(tmpdir + '/ml-20m.zip').extractall(tmpdir) df = pd.read_csv(tmpdir + '/ml-20m/ratings.csv') movies = pd.read_csv(tmpdir + '/ml-20m/movies.csv', index_col='movieId') vocab_size = df.movieId.max() + 1 print(vocab_size)
データセットをPersonalizeのフォーマットに変換
データセットをPersonalize のフォーマットに変換し、csvファイルに出力します。
下記あたりの情報を参考に。
- https://docs.aws.amazon.com/ja_jp/personalize/latest/dg/how-it-works-dataset-schema.html
- https://docs.aws.amazon.com/ja_jp/personalize/latest/dg/data-prep-formatting.html
test_time_ratio = 0.01 dfo = df.copy() df = df[df.timestamp < df.timestamp.max() * (1-test_time_ratio) + df.timestamp.min() * test_time_ratio] df.columns = ['USER_ID','ITEM_ID','EVENT_VALUE','TIMESTAMP'] df['EVENT_TYPE']='RATING' df.head() df.to_csv('interactions.csv',index=False)
メタデータの方も同じくPersonalize のフォーマットに変換します。
movies = movies.reset_index() del movies['title'] movies.columns=['ITEM_ID','GENRE'] movies.head() movies.to_csv('item_metadata.csv',index=False)
s3にデータをアップロード
先ほど出力したcsvファイルをs3にアップロードします。 Amazon Personalize では、データをs3から取得するためです。
os.environ['AWS_DEFAULT_REGION']="us-east-1" suffix = str(np.random.uniform())[4:9] bucket = "demo-temporal-holdout-metadata-"+suffix # replace with the name of your S3 bucket !aws s3 mb s3://{bucket} personalize = boto3.client(service_name='personalize', endpoint_url='https://personalize.us-east-1.amazonaws.com') personalize_runtime = boto3.client(service_name='personalize-runtime', endpoint_url='https://personalize-runtime.us-east-1.amazonaws.com') interactions_filename = 'interactions.csv' boto3.Session().resource('s3').Bucket(bucket).Object(interactions_filename).upload_file(interactions_filename) item_metadata_file = 'item_metadata.csv' boto3.Session().resource('s3').Bucket(bucket).Object(item_metadata_file).upload_file(item_metadata_file)
スキーマの作成
csvファイルに出力したデータのフォーマットと合うようにスキーマを定義して行きます。 Amazon Personalize にインポートするファイルはこのスキーマの定義に一致している必要があります。
下記の情報も参考にしてください。
schema_name="DEMO-temporal-metadata-schema-"+suffix schema = { "type": "record", "name": "Interactions", "namespace": "com.amazonaws.personalize.schema", "fields": [ { "name": "USER_ID", "type": "string" }, { "name": "ITEM_ID", "type": "string" }, { "name": "EVENT_VALUE", "type": "float" }, { "name": "TIMESTAMP", "type": "long" }, { "name": "EVENT_TYPE", "type": "string" }, ], "version": "1.0" } create_schema_response = personalize.create_schema( name = schema_name, schema = json.dumps(schema) ) schema_arn = create_schema_response['schemaArn'] print(json.dumps(create_schema_response, indent=2))
メタデータの方も同様です。
metadata_schema_name="DEMO-temporal-metadata-metadataschema-"+suffix metadata_schema = { "type": "record", "name": "Items", "namespace": "com.amazonaws.personalize.schema", "fields": [ { "name": "ITEM_ID", "type": "string" }, { "name": "GENRE", "type": "string", "categorical": True } ], "version": "1.0" } create_metadata_schema_response = personalize.create_schema( name = metadata_schema_name, schema = json.dumps(metadata_schema) ) metadata_schema_arn = create_metadata_schema_response['schemaArn'] print(json.dumps(create_metadata_schema_response, indent=2))
データセットグループの作成
データセットをグルーピングするためのデータセットグループを作成します。
dataset_group_name = "DEMO-temporal-metadata-dataset-group-" + suffix create_dataset_group_response = personalize.create_dataset_group( name = dataset_group_name ) dataset_group_arn = create_dataset_group_response['datasetGroupArn'] print(json.dumps(create_dataset_group_response, indent=2)) status = None max_time = time.time() + 3*60*60 # 3 hours while time.time() < max_time: describe_dataset_group_response = personalize.describe_dataset_group( datasetGroupArn = dataset_group_arn ) status = describe_dataset_group_response["datasetGroup"]["status"] print("DatasetGroup: {}".format(status)) if status == "ACTIVE" or status == "CREATE FAILED": break time.sleep(20)
データセットをデータセットグループに追加
インタラクションデータセットをデータセットグループに追加しています。
dataset_type = "INTERACTIONS" create_dataset_response = personalize.create_dataset( datasetType = dataset_type, datasetGroupArn = dataset_group_arn, schemaArn = schema_arn, name = "DEMO-temporal-metadata-dataset-interactions-" + suffix ) interactions_dataset_arn = create_dataset_response['datasetArn'] print(json.dumps(create_dataset_response, indent=2))
アイテムデータセットをデータセットグループに追加しています。
dataset_type = "ITEMS" create_metadata_dataset_response = personalize.create_dataset( datasetType = dataset_type, datasetGroupArn = dataset_group_arn, schemaArn = metadata_schema_arn, name = "DEMO-temporal-metadata-dataset-items-" + suffix ) metadata_dataset_arn = create_metadata_dataset_response['datasetArn'] print(json.dumps(create_metadata_dataset_response, indent=2))
s3バケットの権限設定
s3のバケットにAmazon Personalizeがアクセスできるよう権限の設定を行います。
<br />s3 = boto3.client("s3") policy = { "Version": "2012-10-17", "Id": "PersonalizeS3BucketAccessPolicy", "Statement": [ { "Sid": "PersonalizeS3BucketAccessPolicy", "Effect": "Allow", "Principal": { "Service": "personalize.amazonaws.com" }, "Action": [ "s3:GetObject", "s3:ListBucket" ], "Resource": [ "arn:aws:s3:::{}".format(bucket), "arn:aws:s3:::{}/*".format(bucket) ] } ] } s3.put_bucket_policy(Bucket=bucket, Policy=json.dumps(policy));
IAMロールの作成
こちらも同様にs3にアクセスする際に使用するIAMロールを設定・作成します。
from botocore.exceptions import ClientError iam = boto3.client("iam") role_name = "PersonalizeS3Role-"+suffix assume_role_policy_document = { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "personalize.amazonaws.com" }, "Action": "sts:AssumeRole" } ] } try: create_role_response = iam.create_role( RoleName = role_name, AssumeRolePolicyDocument = json.dumps(assume_role_policy_document) ); iam.attach_role_policy( RoleName = role_name, PolicyArn = "arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess" ); role_arn = create_role_response["Role"]["Arn"] except ClientError as e: if e.response['Error']['Code'] == 'EntityAlreadyExists': role_arn = iam.get_role(RoleName=role_name)['Role']['Arn'] else: raise print(role_arn) # sometimes need to wait a bit for the role to be created time.sleep(60)
データセットグループにデータをインポート
s3にアップロードしたインタラクションデータとアイテムデータをデータセットに追加するためのインポートジョブを作成します。これによりAmazon Personalize に学習用のデータをインポートできるようになります。
create_dataset_import_job_response = personalize.create_dataset_import_job( jobName = "DEMO-temporal-dataset-import-job-"+suffix, datasetArn = interactions_dataset_arn, dataSource = { "dataLocation": "s3://{}/{}".format(bucket, 'interactions.csv') }, roleArn = role_arn ) dataset_import_job_arn = create_dataset_import_job_response['datasetImportJobArn'] print(json.dumps(create_dataset_import_job_response, indent=2)) create_metadata_dataset_import_job_response = personalize.create_dataset_import_job( jobName = "DEMO-temporal-metadata-dataset-import-job-"+suffix, datasetArn = metadata_dataset_arn, dataSource = { "dataLocation": "s3://{}/{}".format(bucket, 'item_metadata.csv') }, roleArn = role_arn ) metadata_dataset_import_job_arn = create_metadata_dataset_import_job_response['datasetImportJobArn'] print(json.dumps(create_metadata_dataset_import_job_response, indent=2))
status = None max_time = time.time() + 3*60*60 # 3 hours while time.time() < max_time: describe_dataset_import_job_response = personalize.describe_dataset_import_job( datasetImportJobArn = dataset_import_job_arn ) dataset_import_job = describe_dataset_import_job_response["datasetImportJob"] if "latestDatasetImportJobRun" not in dataset_import_job: status = dataset_import_job["status"] print("DatasetImportJob: {}".format(status)) else: status = dataset_import_job["latestDatasetImportJobRun"]["status"] print("LatestDatasetImportJobRun: {}".format(status)) if status == "ACTIVE" or status == "CREATE FAILED": break time.sleep(60)
status = None max_time = time.time() + 3*60*60 # 3 hours while time.time() < max_time: describe_dataset_import_job_response = personalize.describe_dataset_import_job( datasetImportJobArn = metadata_dataset_import_job_arn ) dataset_import_job = describe_dataset_import_job_response["datasetImportJob"] if "latestDatasetImportJobRun" not in dataset_import_job: status = dataset_import_job["status"] print("DatasetImportJob: {}".format(status)) else: status = dataset_import_job["latestDatasetImportJobRun"]["status"] print("LatestDatasetImportJobRun: {}".format(status)) if status == "ACTIVE" or status == "CREATE FAILED": break time.sleep(60)
ソリューションバージョンの作成
ソリューションバージョンとはAmazon Personalizeにおいて レコメンデーションを行うためにトレーニングされた機械学習のモデルを指すようです。
ここではレシピの設定も行いますが、今回は「HRNN-Metadata」アルゴリズムを利用できる「recipeArn」を指定します。
recipe_list = personalize.list_recipes() for recipe in recipe_list['recipes']: print(recipe['recipeArn']) recipe_arn = "arn:aws:personalize:::recipe/aws-hrnn-metadata" create_solution_response = personalize.create_solution( name = "DEMO-temporal-metadata-solution-"+suffix, datasetGroupArn = dataset_group_arn, recipeArn = recipe_arn ) solution_arn = create_solution_response['solutionArn'] print(json.dumps(create_solution_response, indent=2)) create_solution_version_response = personalize.create_solution_version( solutionArn = solution_arn ) solution_version_arn = create_solution_version_response['solutionVersionArn'] print(json.dumps(create_solution_version_response, indent=2))
下記はちょっと時間かかるかもです。自分がやった時は2時間ぐらいかかりました。
status = None max_time = time.time() + 3*60*60 # 3 hours while time.time() < max_time: describe_solution_version_response = personalize.describe_solution_version( solutionVersionArn = solution_version_arn ) status = describe_solution_version_response["solutionVersion"]["status"] print("SolutionVersion: {}".format(status)) if status == "ACTIVE" or status == "CREATE FAILED": break time.sleep(60)
メトリクスの取得
get_solution_metrics_response = personalize.get_solution_metrics( solutionVersionArn = solution_version_arn ) print(json.dumps(get_solution_metrics_response, indent=2))
キャンペーン作成
レコメンデーションの取得はキャンペーンを経由して行います。
- https://docs.aws.amazon.com/ja_jp/personalize/latest/dg/campaigns.html
- https://docs.aws.amazon.com/ja_jp/personalize/latest/dg/getting-recommendations.html
create_campaign_response = personalize.create_campaign( name = "DEMO-metadata-campaign-"+suffix, solutionVersionArn = solution_version_arn, minProvisionedTPS = 2, ) campaign_arn = create_campaign_response['campaignArn'] print(json.dumps(create_campaign_response, indent=2))
status = None max_time = time.time() + 3*60*60 # 3 hours while time.time() < max_time: describe_campaign_response = personalize.describe_campaign( campaignArn = campaign_arn ) status = describe_campaign_response["campaign"]["status"] print("Campaign: {}".format(status)) if status == "ACTIVE" or status == "CREATE FAILED": break time.sleep(60)
最後にテスト用のデータで検証
df = dfo.copy() df = df[df.timestamp >= df.timestamp.max() * (1-test_time_ratio) + df.timestamp.min() * test_time_ratio] df.columns = ['USER_ID','ITEM_ID','EVENT_VALUE','TIMESTAMP'] df['EVENT_TYPE']='RATING' test_users = df['USER_ID'].unique() df.head()
特定のユーザを指定してレコメンド結果を見てみましょう。
元々このユーザはアクションやアニメ系の映画を見られていた感じですね。
user_id_2 = 31 movies.loc[df[df['userId'] == user_id_2]['movieId']].head()
レコメンド結果もアクション、アニメ系が入っているようですね。
user_id_2 = 31 get_recommendations_response = personalize_runtime.get_recommendations( campaignArn = campaign_arn, userId = str(user_id_2), ) print("Recommendations for user: ", user_id_2) item_list = get_recommendations_response['itemList'] movies.loc[rec_items].head()
メトリクスを計算します。
必要があればtqdmのモジュールをインストールします。
!pip install --upgrade pip !pip install tqdm
下記でインポートしているmetricsはgithub上に置いてあるpythonファイルです。
from tqdm.notebook import tqdm_notebook import numpy as np from metrics import mean_reciprocal_rank, ndcg_at_k, precision_at_k
relevance = [] for user_id in tqdm_notebook(test_users): true_items = set(df[df['USER_ID']==user_id]['ITEM_ID'].values) rec_response = personalize_runtime.get_recommendations( campaignArn = campaign_arn, userId = str(user_id) ) rec_items = [int(x['itemId']) for x in rec_response['itemList']] relevance.append([int(x in true_items) for x in rec_items])
print('mean_reciprocal_rank', np.mean([mean_reciprocal_rank(r) for r in relevance])) print('precision_at_5', np.mean([precision_at_k(r, 5) for r in relevance])) print('precision_at_10', np.mean([precision_at_k(r, 10) for r in relevance])) print('precision_at_25', np.mean([precision_at_k(r, 25) for r in relevance])) print('normalized_discounted_cumulative_gain_at_5', np.mean([ndcg_at_k(r, 5) for r in relevance])) print('normalized_discounted_cumulative_gain_at_10', np.mean([ndcg_at_k(r, 10) for r in relevance])) print('normalized_discounted_cumulative_gain_at_25', np.mean([ndcg_at_k(r, 25) for r in relevance]))
クリーンアップ
今回実施した内容を数時間放置していたらしっかり時間分課金が発生していたので、必要なければ下記を参考に削除しておきましょう。
まとめ
今回のようにメタデータを用いることにより、さらに精度の高いレコメンド結果が期待できそうです。 非常に簡単に実施ができるので、色んなデータで試してみたくなりますね!