Amazon Personalize のHRNN-Metadata レシピ触ってみた – 機械学習 on AWS Advent Calendar 2019

こんにちは、Mr.Moです。

当エントリは『機械学習 on AWS Advent Calendar 2019』の3日目のエントリです。

下記の記事を見てAmazon Personalizeに非常に興味が湧いたので、早速触ってみました。

HIGOBASHI.AWS 第12回 活用編で「Amazon Personalizeではじめるレコメンドサービス」について話しました #higobashiaws

今回は記事内にも紹介がありました「Amazon Personalize Samples」の内、下記のHRNN-Metadata レシピを使用しているノートブックを扱っていきます。

HRNN-Metadata レシピとは

HRNN- メタデータレシピは、ユーザーがやり取りするアイテムを予測します。HRNN レシピに類似していますが、コンテキスト、ユーザー、アイテムのメタデータ (それぞれソースは、インタラクション、ユーザー、アイテムのデータセット) から派生した追加の特徴を含みます。高品質なメタデータを使用できる場合、HRNN-Metadata は非メタデータモデルよりも結果が正確になります。このレシピを使用するには、より長いトレーニング時間が必要になる場合があります。

https://docs.aws.amazon.com/ja_jp/personalize/latest/dg/native-recipe-hrnn-metadata.html

さっそくノートブックを進めていく

こちらはAmazon Personalize APIを詳しく知るために用意されているものです。

https://docs.aws.amazon.com/ja_jp/personalize/latest/dg/getting-started-python.html#gs-jupyter-notebook

前提としてAmazon SageMaker上で作業をしています。

ライブラリのインポート

必要なライブラリをインポートしていきます。

import tempfile, subprocess, urllib.request, zipfile
import pandas as pd, numpy as np

import io
import scipy.sparse as ss
import json
import time
import os

import boto3

データセットの準備

MovieLensデータセットを利用します。 こちらは131,263件ほどレコードがあるようです。

Amazon Personalize では3 種類の履歴データセット型を認識しますが、今回はその中でもインタラクションデータセットとアイテムデータセット(メタデータ)を使用します。

with tempfile.TemporaryDirectory() as tmpdir:
    urllib.request.urlretrieve(
        'http://files.grouplens.org/datasets/movielens/ml-20m.zip',
        tmpdir + '/ml-20m.zip')
    zipfile.ZipFile(tmpdir + '/ml-20m.zip').extractall(tmpdir)
    df = pd.read_csv(tmpdir + '/ml-20m/ratings.csv')
    movies = pd.read_csv(tmpdir + '/ml-20m/movies.csv', index_col='movieId')
    vocab_size = df.movieId.max() + 1

print(vocab_size)

データセットをPersonalizeのフォーマットに変換

データセットをPersonalize のフォーマットに変換し、csvファイルに出力します。

下記あたりの情報を参考に。

test_time_ratio = 0.01

dfo = df.copy()
df = df[df.timestamp < df.timestamp.max() * (1-test_time_ratio) + df.timestamp.min() * test_time_ratio]

df.columns = ['USER_ID','ITEM_ID','EVENT_VALUE','TIMESTAMP']
df['EVENT_TYPE']='RATING'

df.head()

df.to_csv('interactions.csv',index=False)

image.png

メタデータの方も同じくPersonalize のフォーマットに変換します。

movies = movies.reset_index()

del movies['title']

movies.columns=['ITEM_ID','GENRE']

movies.head()

movies.to_csv('item_metadata.csv',index=False)

image.png

s3にデータをアップロード

先ほど出力したcsvファイルをs3にアップロードします。 Amazon Personalize では、データをs3から取得するためです。

os.environ['AWS_DEFAULT_REGION']="us-east-1"
suffix = str(np.random.uniform())[4:9]
bucket = "demo-temporal-holdout-metadata-"+suffix     # replace with the name of your S3 bucket
!aws s3 mb s3://{bucket}

personalize = boto3.client(service_name='personalize', endpoint_url='https://personalize.us-east-1.amazonaws.com')
personalize_runtime = boto3.client(service_name='personalize-runtime', endpoint_url='https://personalize-runtime.us-east-1.amazonaws.com')

interactions_filename = 'interactions.csv'
boto3.Session().resource('s3').Bucket(bucket).Object(interactions_filename).upload_file(interactions_filename)

item_metadata_file = 'item_metadata.csv'
boto3.Session().resource('s3').Bucket(bucket).Object(item_metadata_file).upload_file(item_metadata_file)

スキーマの作成

csvファイルに出力したデータのフォーマットと合うようにスキーマを定義して行きます。 Amazon Personalize にインポートするファイルはこのスキーマの定義に一致している必要があります。

下記の情報も参考にしてください。

schema_name="DEMO-temporal-metadata-schema-"+suffix

schema = {
    "type": "record",
    "name": "Interactions",
    "namespace": "com.amazonaws.personalize.schema",
    "fields": [
        {
            "name": "USER_ID",
            "type": "string"
        },
        {
            "name": "ITEM_ID",
            "type": "string"
        },
        {
            "name": "EVENT_VALUE",
            "type": "float"
        },
        {
            "name": "TIMESTAMP",
            "type": "long"
        },
        { 
            "name": "EVENT_TYPE",
            "type": "string"
        },
    ],
    "version": "1.0"
}

create_schema_response = personalize.create_schema(
    name = schema_name,
    schema = json.dumps(schema)
)

schema_arn = create_schema_response['schemaArn']
print(json.dumps(create_schema_response, indent=2))

メタデータの方も同様です。

metadata_schema_name="DEMO-temporal-metadata-metadataschema-"+suffix

metadata_schema = {
 "type": "record",
 "name": "Items",
 "namespace": "com.amazonaws.personalize.schema",
 "fields": [
 {
 "name": "ITEM_ID",
 "type": "string"
 },
 {
 "name": "GENRE",
 "type": "string",
 "categorical": True
 }
 ],
 "version": "1.0"
}

create_metadata_schema_response = personalize.create_schema(
    name = metadata_schema_name,
    schema = json.dumps(metadata_schema)
)

metadata_schema_arn = create_metadata_schema_response['schemaArn']
print(json.dumps(create_metadata_schema_response, indent=2))

データセットグループの作成

データセットをグルーピングするためのデータセットグループを作成します。

dataset_group_name = "DEMO-temporal-metadata-dataset-group-" + suffix

create_dataset_group_response = personalize.create_dataset_group(
    name = dataset_group_name
)

dataset_group_arn = create_dataset_group_response['datasetGroupArn']
print(json.dumps(create_dataset_group_response, indent=2))

status = None
max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:
    describe_dataset_group_response = personalize.describe_dataset_group(
        datasetGroupArn = dataset_group_arn
    )
    status = describe_dataset_group_response["datasetGroup"]["status"]
    print("DatasetGroup: {}".format(status))

    if status == "ACTIVE" or status == "CREATE FAILED":
        break

    time.sleep(20)

データセットをデータセットグループに追加

インタラクションデータセットをデータセットグループに追加しています。

dataset_type = "INTERACTIONS"
create_dataset_response = personalize.create_dataset(
    datasetType = dataset_type,
    datasetGroupArn = dataset_group_arn,
    schemaArn = schema_arn,
    name = "DEMO-temporal-metadata-dataset-interactions-" + suffix
)

interactions_dataset_arn = create_dataset_response['datasetArn']
print(json.dumps(create_dataset_response, indent=2))

アイテムデータセットをデータセットグループに追加しています。

dataset_type = "ITEMS"
create_metadata_dataset_response = personalize.create_dataset(
    datasetType = dataset_type,
    datasetGroupArn = dataset_group_arn,
    schemaArn = metadata_schema_arn,
    name = "DEMO-temporal-metadata-dataset-items-" + suffix
)

metadata_dataset_arn = create_metadata_dataset_response['datasetArn']
print(json.dumps(create_metadata_dataset_response, indent=2))

s3バケットの権限設定

s3のバケットにAmazon Personalizeがアクセスできるよう権限の設定を行います。

<br />s3 = boto3.client("s3")

policy = {
    "Version": "2012-10-17",
    "Id": "PersonalizeS3BucketAccessPolicy",
    "Statement": [
        {
            "Sid": "PersonalizeS3BucketAccessPolicy",
            "Effect": "Allow",
            "Principal": {
                "Service": "personalize.amazonaws.com"
            },
            "Action": [
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::{}".format(bucket),
                "arn:aws:s3:::{}/*".format(bucket)
            ]
        }
    ]
}

s3.put_bucket_policy(Bucket=bucket, Policy=json.dumps(policy));

IAMロールの作成

こちらも同様にs3にアクセスする際に使用するIAMロールを設定・作成します。

from botocore.exceptions import ClientError

iam = boto3.client("iam")

role_name = "PersonalizeS3Role-"+suffix
assume_role_policy_document = {
    "Version": "2012-10-17",
    "Statement": [
        {
          "Effect": "Allow",
          "Principal": {
            "Service": "personalize.amazonaws.com"
          },
          "Action": "sts:AssumeRole"
        }
    ]
}
try:
    create_role_response = iam.create_role(
        RoleName = role_name,
        AssumeRolePolicyDocument = json.dumps(assume_role_policy_document)
    );

    iam.attach_role_policy(
        RoleName = role_name,
        PolicyArn = "arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
    );

    role_arn = create_role_response["Role"]["Arn"]
except ClientError as e:
    if e.response['Error']['Code'] == 'EntityAlreadyExists':
        role_arn = iam.get_role(RoleName=role_name)['Role']['Arn']
    else:
        raise

print(role_arn)

# sometimes need to wait a bit for the role to be created
time.sleep(60)

データセットグループにデータをインポート

s3にアップロードしたインタラクションデータとアイテムデータをデータセットに追加するためのインポートジョブを作成します。これによりAmazon Personalize に学習用のデータをインポートできるようになります。

create_dataset_import_job_response = personalize.create_dataset_import_job(
    jobName = "DEMO-temporal-dataset-import-job-"+suffix,
    datasetArn = interactions_dataset_arn,
    dataSource = {
        "dataLocation": "s3://{}/{}".format(bucket, 'interactions.csv')
    },
    roleArn = role_arn
)

dataset_import_job_arn = create_dataset_import_job_response['datasetImportJobArn']
print(json.dumps(create_dataset_import_job_response, indent=2))

create_metadata_dataset_import_job_response = personalize.create_dataset_import_job(
    jobName = "DEMO-temporal-metadata-dataset-import-job-"+suffix,
    datasetArn = metadata_dataset_arn,
    dataSource = {
        "dataLocation": "s3://{}/{}".format(bucket, 'item_metadata.csv')
    },
    roleArn = role_arn
)

metadata_dataset_import_job_arn = create_metadata_dataset_import_job_response['datasetImportJobArn']
print(json.dumps(create_metadata_dataset_import_job_response, indent=2))
status = None
max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:
    describe_dataset_import_job_response = personalize.describe_dataset_import_job(
        datasetImportJobArn = dataset_import_job_arn
    )

    dataset_import_job = describe_dataset_import_job_response["datasetImportJob"]
    if "latestDatasetImportJobRun" not in dataset_import_job:
        status = dataset_import_job["status"]
        print("DatasetImportJob: {}".format(status))
    else:
        status = dataset_import_job["latestDatasetImportJobRun"]["status"]
        print("LatestDatasetImportJobRun: {}".format(status))

    if status == "ACTIVE" or status == "CREATE FAILED":
        break

    time.sleep(60)
status = None
max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:
    describe_dataset_import_job_response = personalize.describe_dataset_import_job(
        datasetImportJobArn = metadata_dataset_import_job_arn
    )

    dataset_import_job = describe_dataset_import_job_response["datasetImportJob"]
    if "latestDatasetImportJobRun" not in dataset_import_job:
        status = dataset_import_job["status"]
        print("DatasetImportJob: {}".format(status))
    else:
        status = dataset_import_job["latestDatasetImportJobRun"]["status"]
        print("LatestDatasetImportJobRun: {}".format(status))

    if status == "ACTIVE" or status == "CREATE FAILED":
        break

    time.sleep(60)

ソリューションバージョンの作成

ソリューションバージョンとはAmazon Personalizeにおいて レコメンデーションを行うためにトレーニングされた機械学習のモデルを指すようです。

ここではレシピの設定も行いますが、今回は「HRNN-Metadata」アルゴリズムを利用できる「recipeArn」を指定します。

recipe_list = personalize.list_recipes()
for recipe in recipe_list['recipes']:
    print(recipe['recipeArn'])

recipe_arn = "arn:aws:personalize:::recipe/aws-hrnn-metadata"

create_solution_response = personalize.create_solution(
    name = "DEMO-temporal-metadata-solution-"+suffix,
    datasetGroupArn = dataset_group_arn,
    recipeArn = recipe_arn
)

solution_arn = create_solution_response['solutionArn']
print(json.dumps(create_solution_response, indent=2))

create_solution_version_response = personalize.create_solution_version(
    solutionArn = solution_arn
)

solution_version_arn = create_solution_version_response['solutionVersionArn']
print(json.dumps(create_solution_version_response, indent=2))

下記はちょっと時間かかるかもです。自分がやった時は2時間ぐらいかかりました。

status = None
max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:
    describe_solution_version_response = personalize.describe_solution_version(
        solutionVersionArn = solution_version_arn
    )
    status = describe_solution_version_response["solutionVersion"]["status"]
    print("SolutionVersion: {}".format(status))

    if status == "ACTIVE" or status == "CREATE FAILED":
        break

    time.sleep(60)

メトリクスの取得

get_solution_metrics_response = personalize.get_solution_metrics(
    solutionVersionArn = solution_version_arn
)

print(json.dumps(get_solution_metrics_response, indent=2))

image.png

キャンペーン作成

レコメンデーションの取得はキャンペーンを経由して行います。

create_campaign_response = personalize.create_campaign(
    name = "DEMO-metadata-campaign-"+suffix,
    solutionVersionArn = solution_version_arn,
    minProvisionedTPS = 2,    
)

campaign_arn = create_campaign_response['campaignArn']
print(json.dumps(create_campaign_response, indent=2))
status = None
max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:
    describe_campaign_response = personalize.describe_campaign(
        campaignArn = campaign_arn
    )
    status = describe_campaign_response["campaign"]["status"]
    print("Campaign: {}".format(status))

    if status == "ACTIVE" or status == "CREATE FAILED":
        break

    time.sleep(60)

最後にテスト用のデータで検証

df = dfo.copy()
df = df[df.timestamp >= df.timestamp.max() * (1-test_time_ratio) + df.timestamp.min() * test_time_ratio]
df.columns = ['USER_ID','ITEM_ID','EVENT_VALUE','TIMESTAMP']
df['EVENT_TYPE']='RATING'
test_users = df['USER_ID'].unique()
df.head()

image.png

特定のユーザを指定してレコメンド結果を見てみましょう。

元々このユーザはアクションやアニメ系の映画を見られていた感じですね。

user_id_2 = 31
movies.loc[df[df['userId'] == user_id_2]['movieId']].head()

image.png

レコメンド結果もアクション、アニメ系が入っているようですね。

user_id_2 = 31
get_recommendations_response = personalize_runtime.get_recommendations(
    campaignArn = campaign_arn,
    userId = str(user_id_2),
)

print("Recommendations for user: ", user_id_2)

item_list = get_recommendations_response['itemList']

movies.loc[rec_items].head()

image.png

メトリクスを計算します。

必要があればtqdmのモジュールをインストールします。

!pip install --upgrade pip
!pip install tqdm

下記でインポートしているmetricsはgithub上に置いてあるpythonファイルです。

from tqdm.notebook import tqdm_notebook
import numpy as np
from metrics import mean_reciprocal_rank, ndcg_at_k, precision_at_k
relevance = []
for user_id in tqdm_notebook(test_users):
    true_items = set(df[df['USER_ID']==user_id]['ITEM_ID'].values)
    rec_response = personalize_runtime.get_recommendations(
        campaignArn = campaign_arn,
        userId = str(user_id)
    )
    rec_items = [int(x['itemId']) for x in rec_response['itemList']]
    relevance.append([int(x in true_items) for x in rec_items])
print('mean_reciprocal_rank', np.mean([mean_reciprocal_rank(r) for r in relevance]))
print('precision_at_5', np.mean([precision_at_k(r, 5) for r in relevance]))
print('precision_at_10', np.mean([precision_at_k(r, 10) for r in relevance]))
print('precision_at_25', np.mean([precision_at_k(r, 25) for r in relevance]))
print('normalized_discounted_cumulative_gain_at_5', np.mean([ndcg_at_k(r, 5) for r in relevance]))
print('normalized_discounted_cumulative_gain_at_10', np.mean([ndcg_at_k(r, 10) for r in relevance]))
print('normalized_discounted_cumulative_gain_at_25', np.mean([ndcg_at_k(r, 25) for r in relevance]))

image.png

クリーンアップ

今回実施した内容を数時間放置していたらしっかり時間分課金が発生していたので、必要なければ下記を参考に削除しておきましょう。

まとめ

今回のようにメタデータを用いることにより、さらに精度の高いレコメンド結果が期待できそうです。 非常に簡単に実施ができるので、色んなデータで試してみたくなりますね!