[レポート] Amazon SageMaker RL: Solving business problems with RL and bandits #AIM404 #reinvent

最初に

こんにちは、データアナリティクス事業本部のyoshimです。
今日はre:Invent2019にて行われた「Amazon SageMaker RL: Solving business problems with RL and bandits」というワークショップの内容についてご紹介するエントリーを書こうと思います。

ワークショップ概要

本ワークショップの概要は下記の通りです。

In reinforcement learning (RL), an RL agent learns in an interactive environment by trial and error using feedback from its own actions, and can make sophisticated multi-step decisions. As a result, RL has broad applicability in robotics, industrial control, finance, HVAC, dialog systems, online advertising, and more. This workshop provides practitioners with hands-on experience building and deploying RL agents from scratch. We use examples from two scenarios: one where the environment can be simulated (computer games, resource allocation simulators, etc.) and one where it cannot be and the agent learns in a live environment (recommender systems, trading bots, etc.).

一応、機械翻訳したものも載せておきます。

強化学習(RL)では、RLエージェントは、対話型環境で、自身のアクションからのフィードバックを使用して試行錯誤によって学習し、洗練されたマルチステップの意思決定を行うことができます。その結果、RLは、ロボット工学、産業用制御、金融、HVAC、対話システム、オンライン広告などに幅広く適用できます。このワークショップは、実践者にRLエージェントをゼロから構築および展開する実践的な経験を提供します。環境をシミュレートできるシナリオ(コンピューターゲーム、リソース割り当てシミュレーターなど)と、実際の環境でエージェントが学習するシナリオ(推奨システム、取引ボットなど)の2つのシナリオの例を使用します。

今回のワークショップで実施した内容はこちらに格納してあります。
基本的に「AIM404-RL_for_your_business.ipynb」ファイルを上から順に実行していく形です。

目次

1.やることの概要

上記のワークショップでやったことは「レコメンドモデルを継続的に更新し続ける」というシステムで「多腕バンディット問題」を利用したものです。
もう少し具体的に言うと、下記のようなフローを実現します。

  • 1.既存データから学習したモデルを推論エンドポイントとしてデプロイする
  • 2.対象のアプリケーションからレコメンドを実行する際に、推論エンドポイントから推論結果を取得する
  • 3.推論エンドポイントが推論を実行したタイミングでS3に推論結果を出力
  • 4.アプリケーション側で、「推論結果を利用した後にユーザーがどのような行動をとったか」をS3に出力
    • 学習用データの正解ラベル(reward)として利用するため
  • 5.「3」、「4」の結果を利用して、モデルの更新をする

上記の「5」では、既存のモデルに差分データを使ってファインチューニングしています。
また、「多腕バンディット」を利用しているので、「既存データの活用のみならず、探索的にデータを取得」できる点も大きなメリットです。
(今回は「epsilon-greedy」アルゴリズムを採用します。このアルゴリズムでは「epsilon」というハイパーパラメータの割合に応じて探索を行うため、徐々にepsilonの数値を小さくしていく流れが一般的かと思います)

全体的なシステム構成図は下記の通りです。

2.各要素技術の紹介

今回のワークショップで出てくる技術要素としては、「1.やることの概要」でご紹介した通りです。
今回の構成で注意点となりそうな部分について、概要レベルですが記述しておきます。

Amazon SageMaker RL

「Amazon SageMaker RL」はSageMaker上で強化学習を実行する機能です。
強化学習とは、従来の教師なし学習や教師あり学習とは少し異なり、報酬や環境等を定義しアクションに応じた報酬から最終的に得られる長期報酬の最大化を学習することを目的とするものです。
AWSのドキュメントでは、下記のように紹介されています。

強化学習 (RL) とは、ある環境で機能するエージェントの目的を最適化するポリシーと呼ばれる、戦略の学習を試みる機械学習の手法です。たとえば、エージェントはロボットであり、環境は迷宮であり、目標は最小限の時間で正常に迷宮を進むことであると仮定できます。RL では、エージェントがアクションを実行し、環境の状態を監視して、環境の現状の値に基づいて報酬を得ます。長期報酬を最大化することを目的とし、エージェントのアクションの結果として受け取ります。RL は、エージェントが自律的な意思決定を行うことができる問題の解決に適しています。

参照:Amazon SageMaker RL

☆多腕バンディット問題

機械学習の手法の1つです。
このアルゴリズムについても更に色々と種類があリます。
今回はアルゴリズムの解説はしませんが、「探索」と「活用」をバランス良く実現するために利用されることが多いようです。
(ex.例えばレコメンドの例で言うと、「この広告が最適」と判断したものを提供する「活用」と、「この広告も試してみよう」という「探索」の両方を行い、長期的な目的変数の最大化を実現する)

☆ファインチューニング

既存のモデルの重みを初期値として重みを「調整」する学習処理。
転移学習(トランスファーラーニング)とは言葉の意味合いが異なる点に注意。

3.やってみた

今回のワークショップで実行したスクリプトはこちらの通りです。
かなり量が多いので、主要な部分だけ抽出して解説します。

3-1.管理用DynamoDBテーブルの作成

管理用DynamoDBテーブルを作成します。
(cfnテンプレートからスタック作成)
ここでは3つのテーブルを作成するのですが、それぞれの用途は下記の通りです。

  • BanditsExperimentTable
    • 試行を全般的に管理するテーブル
  • BanditsJoinTable
    • 学習用データを作成するAthenaの処理結果を管理するためのテーブル(input・outputデータのS3パス、ジョブのステータス等)
  • BanditsModelTable
    • モデルの管理用(学習に使ったデータ、評価結果、モデルのS3パス等)
experiment_name = "AIM404-sbs" #YOUR EXPERIMENT NAME HERE - can be AIM404-1
bandits_experiment = ExperimentManager(config, experiment_id=experiment_name)

3-2.モデルを学習

モデルを学習します。
最初のモデルは既存データから一部をサンプリングした上で学習しています。

bandits_experiment.initialize_first_model(input_data_s3_prefix=bandits_experiment.last_joined_job_train_data) 

なお、ここではSageMakerRLの「RLEstimator」を使ってSageMakerのトレーニングJOBとして実行できるようにして、学習時は「train-vw.py」を、評価時は「eval-cfa-vw.py」をエントリーポイントとして利用しています。また、コンテナイメージはAWS側で事前に用意されている「Vowpal Wabbit Images」を利用しています。
(AWS側で事前に用意されているSageMakerRLのコンテナイメージ一覧についてはこちらをご参照ください)

「Vowpal Wabbit Images」コンテナイメージでは「Vowpal Wabbitプロジェクト」をサポートしており、そのため今回利用するコンテキストバンディットのライブラリが使えます。

## エントリーポイント等の変数を指定した状態で、RLEstimatorクラスを利用して学習する
self.rl_estimator = RLEstimator(**rl_estimator_args)

参照:model_manager.py

## 学習時は「train-vw.py」、モデルの評価時は「eval-cfa-vw.py」をエントリーポイントとして利用する
entry_point = "eval-cfa-vw.py" if eval else "train-vw.py"

参照:model_manager.py

学習時は「VWModelクラス」を利用しています。
今回のスクリプトでは多クラス分類タスク(7クラス)で、「Epsilon-greedy」アルゴリズムを採用しているため、ハイパーパラメータで指定する「epsilon」の割合でランダムにレコメンド(探索)、「1-epsilon」の割合で既存データから最適だと判断した結果をレコメンドします。
(上記以外のアルゴリズムも指定できます。詳細はこちらをご参照ください)

## 学習
from vw_model import VWModel

vw_model = VWModel(cli_args=f"{vw_args} -f {MODEL_OUTPUT_PATH} --save_resume",
                           model_path=None, test_only=False, quiet_mode=False)
vw_model.learn(context_vector=json.loads(experience["observation"]),
                           action=experience["action"],
                           cost=1 - experience["reward"],
                           probability=experience["action_prob"])

参照:train-vw.py

3-3.評価

学習が完了したら、学習実行時にあらかじめ作成しておいた評価用データを利用して結果を評価します。

## 評価用のトレーニングJOBを実行&結果をDynamoDBのテーブルに格納
bandits_experiment.evaluate_model(
    input_data_s3_prefix=bandits_experiment.last_joined_job_eval_data,
    evaluate_model_id=bandits_experiment.last_trained_model_id)

上記の処理を実行すると、下記の部分を呼び出す形になります。

self.next_model_to_evaluate = ModelManager(
    model_db_client=self.model_db_client,
    experiment_id=self.experiment_id,
    model_id=evaluate_model_id,
    image=self.image,
    role=self.resource_manager.iam_role_arn,
    instance_config=self.resource_manager.evaluation_fleet_config,
    boto_session=self.boto_session,
    algor_config=self.algor_config
    )

self.next_model_to_evaluate.evaluate(
    input_data_s3_prefix=input_data_s3_prefix,
    manifest_file_path=manifest_file_path,
    evaluation_job_name=next_evaluation_job_id,
    local_mode = self.local_mode,
    wait=wait,
    logs=True
    )

参照:experiment_manager.py

これは下記の部分を呼び出している状態なので、「評価対象のモデルをLOADしてきて、指定した評価用データで学習」している形になります。

model_artifact_path = self.model_record.get_model_artifact_path()
rl_estimator_args = self._get_rl_estimator_args(eval=True)
rl_estimator_args['model_channel_name'] = 'pretrained_model'
rl_estimator_args['model_uri'] = model_artifact_path

self.rl_estimator = RLEstimator(**rl_estimator_args)

self.rl_estimator.fit(
                job_name=evaluation_job_name,
                inputs=eval_channel_inputs,
                wait=wait,
                logs=logs
            )

参照:model_maager.py

今回の評価指標は「cost = 1-mean(reward)」なので0〜1の間に収まり、かつ小さい方が好ましいです。
上記の評価結果とベースラインの値を比較して、改善しているかを確認します。

## ベースラインの確認
baseline_score = evaluate_historical_data(data_file='statlog_warm_start.data')

評価した結果、ベースラインよりも良い結果が出ているようならモデルを推論エンドポイントとしてデプロイします。

bandits_experiment.deploy_model(model_id=bandits_experiment.last_trained_model_id) 

3-4.推論結果をS3に出力する

推論エンドポイントができたら、実際に推論をします。

predictor = bandits_experiment.predictor
sim_app = StatlogSimApp(predictor=predictor)

user_id, user_context = sim_app.choose_random_user()
action, event_id, model_id, action_prob, sample_prob = predictor.get_action(obs=user_context)

print('Selected action: {}, event ID: {}, model ID: {}, probability: {}'.format(action, event_id, model_id, action_prob))
Selected action: 1, event ID: 287733769677081517909691498534974586882, model ID: yoshim-AIM404-sbs-model-id-1577533090, probability: 0.999142000857999

今回は「kinesis firehose」のストリームで、「SageMaker推論エンドポイント」で推論が発生したらその内容をS3にファイル出力するようにしているので、それと突合するためのカラム(eventID)と「実際に顧客がレコメンドの結果、反応したかどうかの正解ラベル」である「rewardカラム(0か1のいずれかが入る)」の2つのカラムを持つデータをS3に出力します。

そのあとに、推論結果とアプリから出力した結果データを「eventid」で突合して「特徴量」、「正解ラベル(reward)」を保持したデータをS3に出力します。ここはAthenaを使います。

local_mode = bandits_experiment.local_mode
batch_size = 500 # collect 500 data instances
print("Collecting batch of experience data...")

# Generate experiences and log them
for i in range(batch_size):
    user_id, user_context = sim_app.choose_random_user()
    action, event_id, model_id, action_prob, sample_prob = predictor.get_action(obs=user_context.tolist())
    reward = sim_app.get_reward(user_id, action, event_id, model_id, action_prob, sample_prob, local_mode)
    
    
if local_mode:
    bandits_experiment.ingest_joined_data(sim_app.joined_data_buffer)
else:
    print("Waiting for firehose to flush data to s3...")
    time.sleep(60) # Wait for firehose to flush data to S3 - actually the VW container image take as a venv the firehose stream name
    rewards_s3_prefix = bandits_experiment.ingest_rewards(sim_app.rewards_buffer)
    print(rewards_s3_prefix)
    bandits_experiment.join(rewards_s3_prefix)
    
sim_app.clear_buffer()

以下は、「bandits_experiment.join」で呼び出しているAthenaの処理部分です。
AthenaでJOIN処理を施した結果をS3にファイル出力し、DyanamoDBの「BanditsJoinTable」テーブルにその結果を書き込むことで次回の再学習時に参照できるようにします。

self.next_join_job = JoinManager(join_db_client=self.join_db_client,
    experiment_id=self.experiment_id,
    join_job_id=next_join_job_id,
    input_obs_data_s3_path=input_obs_data_s3_path,
    obs_start_time=obs_start_time,
    obs_end_time=obs_end_time,
    input_reward_data_s3_path=rewards_s3_path,
    boto_session=self.boto_session)

logger.info("Started joining job...")
self.next_join_job.start_join(ratio=ratio, wait=wait)

参照:experiment_manager.py

3-5.モデルの再学習

「3-4.推論結果をS3に出力する」でS3に出力したデータを使って、モデルをファインチューニングします。
フルに学習し直すのではなく、既存のモデルの重みを初期値として、差分データで学習することで微調整する形です。

bandits_experiment.train_next_model(input_data_s3_prefix=bandits_experiment.last_joined_job_train_data)

上記の処理は下記の部分を呼び出しています。

if input_model_id is None and self.experiment_record._last_trained_model_id is not None:
    logger.info(f"Use last trained model {self.experiment_record._last_trained_model_id} "
"as pre-trained model for training")

    ## 前回のモデルのメタ情報を取得
    input_model_id = self.experiment_record._last_trained_model_id


self.next_model_to_train = ModelManager(
    model_db_client=self.model_db_client,
    experiment_id=self.experiment_id,
    model_id=next_model_to_train_id,
    image=self.image,
    role=self.resource_manager.iam_role_arn,
    instance_config=self.resource_manager.training_fleet_config,
    boto_session=self.boto_session,
    algor_config=self.algor_config
    )

self.next_model_to_train.fit(wait=wait,
input_model_id=input_model_id,
input_data_s3_prefix=input_data_s3_prefix,
manifest_file_path=manifest_file_path,
logs=wait)

参照:experiment_manager.py

3-6.推論エンドポイントのモデルを更新する

あとは上記のモデルを推論エンドポイントに反映します。
SageMakerの推論エンドポイントの更新は、裏側では「blue-greenデプロイ」モデルが反映されているため、モデルの更新中も推論エンドポイントにダウンタイムは発生しません。

bandits_experiment.deploy_model(model_id=bandits_experiment.last_trained_model_id)

3-7.後処理

上記までで一通り終わりなのですが、各リソースが不要な場合は削除しましょう。
(cfnのスタックは手動で削除しましょう)

bandits_experiment.clean_resource(experiment_id=bandits_experiment.experiment_id)
bandits_experiment.clean_table_records(experiment_id=bandits_experiment.experiment_id)

4.まとめ

SageMakerRLの利用は初めてだったので解説内容に拙い部分が多々あるかと思います、すいません。
レコメンドモデルを作成するとなると、「活用と探索のバランスをとる」、「モデルを継続的に更新する」といった2点は重要なポイントなので、今回のようなアプローチが有効となる場面もあるのかと思います。

また、今回は推論結果をS3に出力するためにkinesisを利用していますが、Amazon SageMaker Model Monitorを使う方がベターなのかと思います。
「Amazon SageMaker Model Monitor」がどういったことをしてくれる機能なのか、についてはこちらをご参照ください。

以上、長くなりましたがご覧いただきありがとうございました。

5.参考