Personalizeのユーザーセグメンテーションを使ってみた

購入してほしい商品に合うユーザーを見つけるAmazon PersonalizeのUSER_SEGMENTATIONを紹介
2022.03.01

データアナリティクス事業本部機械学習チームの中村です。

今回は、Amazon Personalizeで使用可能となったUSER_SEGMENTATION機能をご紹介し、実際に使ってみます。

冒頭まとめ

  • USER_SEGMENTATIONは、Itemに関する情報を入力とし、ユーザーのリストを出力とする機能を持つ。
  • これにより、例えば、購入してほしい商品に合うユーザーを見つけることができる。
  • USER_SEGMENTATIONは現在バッチ推論にのみ対応しており、入出力はS3上で実施する必要がある。

★本記事の注意点★

本記事のソースコードは、AWS公式リリースで言及されているnotebook環境通りに行っていますが、 このまま実行すると料金が$1,000以上かかってしまいます。料金についての注意事項や下げるための方法を赤字で示しておりますので、参考にされて負担のならない範囲で実行ください。

リリースについて

Amazon Personalizeは、開発者がパーソナライズされた体験をユーザーに提供することを容易にする、フルマネージドな機械学習サービスであり、Amazonがパーソナライゼーションシステム構築で培った知識と経験が反映された技術を利用可能です。

以下のアップデートにより、USER_SEGMENTATION機能が使用可能となりました。

この記事ではUSER_SEGMENTATION機能を整理し、実際にUSER_SEGMENTATIONのレシピでバッチ推論までを実施する環境を構築してみます。
構築は、PythonのSDK(boto3)を用いて実施しますが、マネジメントコンソールからでもほぼ同等のことが実施可能です。

USER_SEGMENTATIONとは

USER_SEGMENTATIONという言葉だけではイメージしづらい部分があるのですが、以下にあるような入出力を持ちます。

ですので実際には以下の図のように、Itemに関する情報を入力とし、ユーザーのリストを出力とするような機能を持ちます。

使い方としては、売りたい商品があって、これをどのユーザーにレコメンドすれば効果的なのかを調べることができます。 従来の通常のレコメンド(Item recommendation)は、userIdを元にレコメンドする商品が得られますが、これとは逆の入力・出力となるイメージでとらえれば良いと思います。

また、Item-Attribute-Affinityレシピを使えば、特定のItemのみに合うユーザーを得るだけでなく、Itemの属性(metadataなど)を入力として、それに合うユーザーを見つけることも可能です。

ワークフロー

ワークフローは通常のレコメンデーションを構築するCustom Domain Dataset Groupと大きく違いはありません。
USER_SEGMENTATIONに特有な点としては、リアルタイム推論(campaign)は作成できず、変わりにバッチ推論(bach_segmentation_job)を作成するという部分となります。

必要なスキーマ

スキーマは、学習するデータなどのフォーマットを定義したものです。
S3に配置するデータはこのスキーマに沿ったカラムを有している必要があります。

Dataset type Required fields Reserved keywords
Interactions USER_ID (string)
ITEM_ID (string)
TIMESTAMP (long)
EVENT_TYPE (string)
EVENT_VALUE (float, null)
IMPRESSION (string)
RECOMMENDATION_ID (string, null)
Users USER_ID (string)
1 metadata field
Items ITEM_ID

1 metadata field

CREATION_TIMESTAMP (long)

レシピについて

  • 以下がUSER_SEGMENTATIONとして追加されたレシピです。
Recipe Recipe Types Required datasets 説明
Item-Affinity USER_SEGMENTATION Interactions 特定のITEM_IDからユーザーのセグメンテーション結果を得ます。
主にInteractionsを用いて学習しますが、Items情報を使うことも可能です。
Item-Attribute-Affinity USER_SEGMENTATION Interactions
Items
InteractionsデータとItemの属性からユーザーをセグメンテーションします。
バッチジョブにのみ対応しているため、詳細は省略いたします。
  • 上記に紐づく、recipeのARNは以下となります。
    • arn:aws:personalize:::recipe/aws-item-affinity
    • arn:aws:personalize:::recipe/aws-item-attribute-affinity

料金体系について

ここでは現時点での算出の方法について説明しますので、最新の情報や具体的な金額については以下を参照ください。

USER_SEGMENTATIONの料金体系は以下となります。
(バッチ推論のみ対応しているため、それに応じた料金体系となっています。)

公式リリースのnotebookでは、ユーザー数が20万人ほどのデータセットを使っており、バッチ推論の料金が結構大きめに発生します。

項目 説明
データ取り込み S3からPersonalizeにアップロードされるデータに1GB単位で料金が発生します。
トレーニング SolutionVersionを作成するトレーニング時間数に応じて1時間毎に料金が発生します。
バッチ推論 データセットに含まれるユニークなUSER_ID数に応じて料金が変わります。
また、バッチ推論時の入力データの行数に応じて料金が変わります。
最終的な料金は、この2点の掛け算となりますので、バッチ推論を全く実施しなかった場合は、この部分についての料金は発生しません。

メトリクスについて

解決する問題が変わるのでメトリクスも変わります。
USER_SEGMENTATIONのメトリクスは以下の3つとなります。

Metrics 特性 詳細
coverage セグメンテーション結果として多様なユーザーを得られているかどうか

セグメンテーション結果に含まれるユニークなUSER_ID数を、実在するUSER_IDの総数で割った値。
hits_at_1_percent 取得したユーザーに何個正解ユーザーがあるか、正確性を表す指標

与えられたITEM_IDに対するinteractionする可能性のあるユーザーを、上位からユーザー数全体の1%取得した場合に、何個実際にinteractionをした正解があるか。
recall_at_1_percent どれだけ漏れなく関連の高いユーザーを推定できたかの指標

与えられたITEM_IDとinteractionする可能性のあるユーザーを、上位からユーザー数全体の1%取得した場合に、どの程度実際にinteractionしたユーザーを推定できているか。

参考までに、通常のレコメンデーションのメトリクスは以下の通りです。

Metrics 特性 詳細
coverage 多様なアイテムをレコメンドしているかどうか

レコメンド結果に含まれるユニークなITEM_ID数を、ItemsにあるITEM_IDの総数で割った値。
mean_reciprocal_rank_at_25
(MRR)
レコメンド結果の上位に正解があることを重視した指標。 K個(25固定)のレコメンド結果を取得した際に、正解が上位にある程スコアが高く、末尾にある、もしくはリストに無いほどスコアが低くなる指標。
リストを上位からたどり、初めて正解のある場所に依存したスコアとなる。
(それより下位の正解はスコアに寄与しない)
リストのN番目にある場合、reciprocal rankは 1/N となる。
ユーザー毎にレコメンデーションのリスト(25個)を取得して、reciprocal rankを計算し、その平均を算出したものが、mean reciprocal rankとなる。
normalized discounted cumulative gain at K
(NDCG)
レコメンド結果の全体(K個)の順序に依存した指標。 K個のレコメンド結果を取得した際に、その順序と適合度合い(relevance)を加味して、理想的な順列をどの程度再現できたか表す指標。
理想的な順列は、適合度合いが高い順に並んでいる順列となる。
Personalizeにおけるレコメンドの場合、適合度合いは存在しないせず(予約語としてもない)、その場合は単純に正解を1、不正解を0として適合度合いを扱うため、K個のレコメンド結果のどの場所に正解のデータがあるかどうかが指標の高さと関連する。
precision at K レコメンドの正確性を表す指標。リストの順番には依存しない。 K個のレコメンド結果のうち、実際にユーザーがアクションした割合を示す指標。
大きいほど間違ったレコメンドが少ない。
指標の性質上、レコメンドリストの順序には依存しない評価指標。

NDCGの算出式など具体的な式については下記を参考としてください。

またレコメンドシステムの評価指標全般についてはこちらにもまとまっています。

実際にUSER_SEGMENTATIONを構築してみた

Modules

import boto3
from boto3.session import Session
import json
import pathlib
import pandas as pd
import datetime as dt

データ作成

  • データは公式で公開されているnotebook環境を参考にして処理します。
    • 元データは、Amazon review data (2018)内の、Grocery and Gourmet Foodのreviewsです。
    • 公式notebookは、reviewsとmetadataを使っていますが、この記事ではreviewsのみを使います。
    • 元データのユーザ数が20万人以上あるため、料金を下げたい場合は以降の処理をユーザー数を削った状態で実行してください。
events_df = pd.read_json('Grocery_and_Gourmet_Food.json.gz', lines=True, compression='infer')
events_df = events_df[['reviewerID', 'asin', 'unixReviewTime']]\
  .rename(columns = {'reviewerID':'USER_ID', 'asin':'ITEM_ID', 'unixReviewTime':'TIMESTAMP'})
events_df.drop_duplicates(inplace=True)

# interactionが少ないデータを除外
user_len=events_df.groupby(by='USER_ID')['TIMESTAMP'].count()
valid_users=set(user_len[user_len>3].index)
events_df=events_df[events_df.USER_ID.isin(valid_users)]

# 並べ替え
events_df = events_df.sort_values(['USER_ID', 'TIMESTAMP'], kind='mergesort')

print("number of total interactions:{} num users:{} num items:{} ".format(len(events_df),len(events_df['USER_ID'].unique()),len(events_df['ITEM_ID'].unique())))
# OUT ==> number of total interactions:1702661 num users:230682 num items:187565
  • 検証用に、trainとtestに分割します。trainを過去のデータ、testを未来のデータとなるようにします。
test_start_date = [2018,2,1]
test_start = dt.datetime(*test_start_date).timestamp()
train_df = events_df[events_df['TIMESTAMP']<test_start]
test_df = events_df[events_df['TIMESTAMP']>=test_start]
  • あくまで過去のinteractionsがUSER_SEGMENTATIONするための情報となるので、trainに存在しないuserIdは推定することができません。
  • そのため、trainに存在するuserIdをwarm_userとし、warm_userのみをtestデータに絞り込みます。
test_df = test_df[test_df['USER_ID'].isin(warm_user)]
  • trainデータとtestデータを出力
    • trainは後述のスキーマ定義通りのcsvファイルで出力します。
    • testは500件のみをjsonデータで出力します。
    • この500件もバッチジョブのクエリ件数として料金に影響しますので、料金を節約したい方は低く設定してください。
train_df.to_csv("interactions.csv", index=False)

groundtruth = test_df.groupby('ITEM_ID')['USER_ID'].unique()
#sample 500 items from test data as test queries
test_labels = groundtruth.sample(n=500,random_state=10)
test_labels\
  .reset_index()[['ITEM_ID']]\
  .rename(columns={'ITEM_ID': 'itemId'})\
  .to_json('batch-input-20220228.json',orient='records',lines=True)

変数の事前定義

bucket_name = 'Your S3 bucket name'

import_s3_uri_interaction = f's3://{bucket_name}/interactions.csv'
import_s3_uri_batch_input = f's3://{bucket_name}/batch-input-20220228.json'
import_s3_uri_batch_output = f's3://{bucket_name}/output/'

region_name = 'ap-northeast-1'
prefix = 'trial-20220228'
dataset_group_name          = f'{prefix}-dataset-group'
schema_interaction_name     = f'{prefix}-schema-interaction'
dataset_interaction_name    = f'{prefix}-dataset-interaction'
import_job_interaction_name = f'{prefix}-import-job-interaction'
solution_name               = f'{prefix}-solution-item-affinity'
iam_role_name               = f'{prefix}-personalize-exection-role'
iam_custom_policy_name      = f'{prefix}-personalize-execution-policy'

client_s3                  = boto3.client('s3', region_name=region_name)
client_personalize         = boto3.client('personalize', region_name=region_name)
client_personalize_runtime = boto3.client('personalize-runtime', region_name=region_name)
client_iam                 = boto3.client('iam', region_name=region_name)

S3 bucketの作成と設定

  • batch jobの出力のため、bucket_policyのActionには"s3:PutObject"が必要となります。
# create_bucket
location = {'LocationConstraint': region_name}
client_s3.create_bucket(Bucket=bucket_name, CreateBucketConfiguration=location)

# put_public_access_block
client_s3.put_public_access_block(
    Bucket=bucket_name,
    PublicAccessBlockConfiguration={
        'BlockPublicAcls': True,
        'IgnorePublicAcls': True,
        'BlockPublicPolicy': True,
        'RestrictPublicBuckets': True,
    },
)

# create bucket_policy
bucket_policy = {
    "Version": "2012-10-17",
    "Id": "PersonalizeS3BucketAccessPolicy",
    "Statement": [
        {
            "Sid": "PersonalizeS3BucketAccessPolicy",
            "Effect": "Allow",
            "Principal": {
                "Service": "personalize.amazonaws.com"
            },
            "Action": [
                "s3:GetObject",
                "s3:ListBucket",
                "s3:PutObject"
            ],
            "Resource": [
                f"arn:aws:s3:::{bucket_name}",
                f"arn:aws:s3:::{bucket_name}/*"
            ]
        }
    ]
}
bucket_policy = json.dumps(bucket_policy)

# put_bucket_policy
client_s3.put_bucket_policy(
    Bucket=bucket_name,
    Policy=bucket_policy,
)

S3 bucketへのアップロード

# 学習用データ
interactions_csv = pathlib.Path('interactions.csv')
response = client_s3.upload_file(
    str(interactions_csv),
    bucket_name, 
    str(interactions_csv),
)

# batch job用入力
batch_input_json = pathlib.Path('batch-input-20220228.json')
response = client_s3.upload_file(
    str(batch_input_json),
    bucket_name, 
    str(batch_input_json),
)

Domain dataset group作成

response_create_dataset_group = client_personalize.create_dataset_group(
    name=dataset_group_name,
)

Schema作成

  • Interactionsのみをデータとして使います。
schema_interaction = {
  "type": "record",
  "name": "Interactions",
  "namespace": "com.amazonaws.personalize.schema",
  "fields": [
    {
      "name": "USER_ID",
      "type": "string"
    },
    {
      "name": "ITEM_ID",
      "type": "string"
    },
    {
      "name": "TIMESTAMP",
      "type": "long"
    }
  ],
  "version": "1.0"
}

schema_interaction = json.dumps(schema_interaction)

response_create_schema = client_personalize.create_schema(
    name=schema_interaction_name,
    schema=schema_interaction,
)

Dataset作成

response_create_dataset = client_personalize.create_dataset(
    name=dataset_interaction_name,
    schemaArn=response_create_schema['schemaArn'],
    datasetGroupArn=response_create_dataset_group['datasetGroupArn'],
    datasetType='Interactions'
)

IAMロール作成

  • まずはcustom policyの作成します。
iam_custom_policy_document = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "s3:ListBucket"
            ],
            "Effect": "Allow",
            "Resource": [
                f"arn:aws:s3:::{bucket_name}"
            ]
        },
        {
            "Action": [
                "s3:GetObject",
                "s3:PutObject"
            ],
            "Effect": "Allow",
            "Resource": [
                f"arn:aws:s3:::{bucket_name}/*"
            ]
        }
    ]
}

response_create_policy = client_iam.create_policy(
    PolicyName=iam_custom_policy_name,
    PolicyDocument=json.dumps(iam_custom_policy_document),
)
  • 次にroleを作成します。
    • S3のポリシーは、batch jobのため書き込みも必要となるため、今回はFullAccessとします。
assume_role_policy_document = {
    "Version": "2012-10-17",
    "Statement": [
        {
          "Effect": "Allow",
          "Principal": {
            "Service": "personalize.amazonaws.com"
          },
          "Action": "sts:AssumeRole"
        }
    ]
}


create_role_response = client_iam.create_role(
    RoleName = iam_role_name,
    AssumeRolePolicyDocument = json.dumps(assume_role_policy_document)
)

client_iam.attach_role_policy(
    RoleName = iam_role_name,
    PolicyArn = "arn:aws:iam::aws:policy/AmazonS3FullAccess"
)

client_iam.attach_role_policy(
    RoleName = iam_role_name,
    PolicyArn = response_create_policy['Policy']['Arn']
)

response_get_role = client_iam.get_role(RoleName=iam_role_name)

Import job作成

  • 作成した、IAM Roleを与える必要があります。
response_create_dataset_import_job = client_personalize.create_dataset_import_job(
    jobName=import_job_interaction_name,
    datasetArn=response_create_dataset['datasetArn'],
    dataSource={
        'dataLocation': import_s3_uri_interaction
    },
    roleArn=response_get_role['Role']['Arn']
)

Solution作成

  • 今回レシピは、aws-item-affinityを使います。
response_create_solution = client_personalize.create_solution(
    name=solution_name,
    datasetGroupArn=response_create_dataset_group['datasetGroupArn'],
    recipeArn='arn:aws:personalize:::recipe/aws-item-affinity',
)

SolutionVersion作成(training)

  • 学習には1時間程度かかりました。
create_solution_version_response = client_personalize.create_solution_version(
    solutionArn = response_create_solution['solutionArn']
)

metrics取得

response_get_solution_metrics = client_personalize.get_solution_metrics(
    solutionVersionArn = create_solution_version_response['solutionVersionArn']
)
  • 得られるmetricsは以下となりました。
"metrics": {
  "coverage": 0.4914,
  "hits_at_1_percent": 0.177,
  "recall_at_1_percent": 0.0331
}

batch segment job作成

  • バッチ推論結果出力先のimport_s3_uri_batch_outputは、/で終わらせる必要があるので注意が必要です。
topk = len(train_df['USER_ID'].unique())//100 # 全体の1%を取得する

response_create_batch_segment_job = client_personalize.create_batch_segment_job(
    jobName = batch_job_name,
    solutionVersionArn = create_solution_version_response['solutionVersionArn'],
    numResults = topk,
    jobInput =  {
        "s3DataSource": {
            "path": import_s3_uri_batch_input
        }
    },
    jobOutput = {
        "s3DataDestination": {
            "path": import_s3_uri_batch_output
        }
    },
    roleArn = response_get_role['Role']['Arn']
)

train時とtestデータのmetricsの比較

  • 結果をS3からダウンロード
    • 結果ファイルは入力ファイルの末尾に.outが付与されたファイル名となります。
response = client_s3.download_file(
    bucket_name,
    "output/" + pathlib.Path(import_s3_uri_batch_input).name + ".out",
    pathlib.Path(import_s3_uri_batch_input).name + ".out"
)
  • 指標の導出
    • 算出処理は、公式のnotebookから流用します。
def get_hit_recall_k(k, topk, grndtruth):
    """ compare topk and gnd; 
    """
    df = grndtruth.join(topk.reindex(grndtruth.index, fill_value=[]).to_frame('pred'))
    hit = df.apply(lambda x: len(set(x['pred']).intersection(x['target_users'])),axis=1)
    recall = hit / grndtruth['target_users'].apply(lambda x:min(len(x),k))
    hit_recall = pd.DataFrame({'hit': hit, 'recall': recall}).rename(columns={'hit':'hits @ top-{}'.format(k),'recall':'recall @ top-{}'.format(k)}) # item by metric_name
    return hit_recall

output=pd.read_json(pathlib.Path(import_s3_uri_batch_input).name + ".out", lines = True)
prediction=output.apply(lambda x:pd.Series({'ITEM_ID':x['input']['itemId'],'USER_ID':x['output']['usersList']}),axis=1).set_index('ITEM_ID')['USER_ID']

print('Test metrics for Personalized-item-affinity')
get_hit_recall_k(topk, prediction, test_labels).mean()
  • 以下の結果が得られました。学習時のmetricsとあまり変わらない値が得られています。
Test metrics for Personalized-item-affinity
hits @ top-2263      0.166000
recall @ top-2263    0.063681
dtype: float64

まとめ

この記事ではUSER_SEGMENTATIONの構築をしてみました。
USER_SEGMENTATIONは、購入してほしい商品に合うユーザーを見つけることができますので、 従来のユーザーに対してお勧めするアイテムを取得するレコメンデーションと組み合わせて、 より柔軟で最適なレコメンデーションを構築できるようになりました。 これによりPersonalizeの活用の幅が更に広がりそうです。