Amazon SageMaker Searchをboto3のSageMakerクライアントから使ってみる – Amazon SageMaker Advent Calendar 2018

こんにちは、大阪DI部の大澤です。

この記事は「クラスメソッド Amazon SageMaker Advent Calendar」の11日目の記事です。

今回はAmazon SageMaker Searchを触ってみます。 Amazon SageMaker SearchはAmazon SageMakerでの学習ジョブを様々な条件で検索し、各ジョブの内容を比較することができるという機能です。 以下のエントリではマネジメントコンソールにおけるSageMaker Searchの紹介を行いました。

今回はboto3のSageMakerクライアントからSageMaker Searchの機能を使った学習ジョブの抽出を試してみます。

やってみる

概要

  • MNISTのデータセットを使って、手書き数字が0かどうかを分類する線形分類器モデルを学習させます。
    • 3パターンのパラメータでモデルを学習
  • boto3のSageMakerクライアントで3つの学習結果を取ってきて、可視化します。
  • エンドポイントを作成し、もっともよかった(lossの少ない)モデルをデプロイし、推論を試してみます。
  • エンドポイントのモデルから学習ジョブ情報を取得します。

セットアップ

データを保存するS3の情報や学習ジョブにつけるタグ、モデル学習後にデプロイするエンドポイント名、使用するIAMロールを定義します。

bucket = 'bucke_name'
prefix = 'sagemaker/DEMO-linear-mnist'

#replace with a tag key and value of your choice
tagKey = 'Project'
tagValue = 'Project_Binary_Classifier'

#name for endpoint where the winning model will be depoloyed
endpointName = 'linear-learner-2018-12-10-03-11-20-651'


# Define IAM role
import boto3
from sagemaker import get_execution_role

role = get_execution_role()

データ準備

ダウンロード

まずは今回使うデータセット(MNIST)をダウンロードしてきます。

%%time
import pickle, gzip, numpy, urllib.request, json

# Load the dataset
urllib.request.urlretrieve("http://deeplearning.net/data/mnist/mnist.pkl.gz", "mnist.pkl.gz")
with gzip.open('mnist.pkl.gz', 'rb') as f:
    train_set, valid_set, test_set = pickle.load(f, encoding='latin1')

ダウンロードしてきたデータの一部をプロットして確認します。

%matplotlib inline
import matplotlib.pyplot as plt
plt.rcParams["figure.figsize"] = (2,10)

def show_digit(img, caption='', subplot=None):
    if subplot==None:
        _,(subplot)=plt.subplots(1,1)
    imgr=img.reshape((28,28))
    subplot.axis('off')
    subplot.imshow(imgr, cmap='gray')
    plt.title(caption)

show_digit(train_set[0][40], 'This is a {}'.format(train_set[1][40]))

train_setの構成は次の通りです。

train_set = [
    [
        [手書き数字を表す1次元配列1],
        [手書き数字を表す1次元配列2],
        ...
    ],
    [
        手書き数字を表す1次元配列1がどの数字かを表すラベル(0~9),
        手書き数字を表す1次元配列2がどの数字かを表すラベル(0~9),
        ...
    ]
]

データ変換

データを線形学習器の教師データとして使える形式(RecordIO-ProtoBuf)に変換します。 線形学習器の詳細についてはドキュメントをご覧ください。

import io
import numpy as np
import sagemaker.amazon.common as smac
# 各行が手書き数字を表す二次元配列
vectors = np.array([t.tolist() for t in train_set[0]]).astype('float32')

# ラベルデータの作成。ラベルが0の時は1、0ではない時は0とする。(今回は手書き数字が0かどうかの二値分類のため)
labels = np.where(np.array([t.tolist() for t in train_set[1]]) == 0, 1, 0).astype('float32')

# SageMakerSDKのユーティリティを使って、RecordIO-ProtoBufに変換する
buf = io.BytesIO()
smac.write_numpy_to_dense_tensor(buf, vectors, labels)
buf.seek(0)

データのアップロード

先ほど作成した教師データをS3にアップロードします。

import boto3
import os

key = 'recordio-pb-data'
boto3.resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'train', key)).upload_fileobj(buf)
s3_train_data = 's3://{}/{}/train/{}'.format(bucket, prefix, key)
print('uploaded training data location: {}'.format(s3_train_data))

学習

線形学習器モデルの学習を3パターン行います。それぞれの学習ジョブではmini_batch_sizeを変更します。mini_batch_sizeはモデルの学習時にどれだけのデータをまとめて学習させるかを表す変数です。

まずは共通の項目を設定します。 学習と推論に使用するコンテナイメージを取得します。

from sagemaker.amazon.amazon_estimator import get_image_uri

container = get_image_uri(boto3.Session().region_name, 'linear-learner')
print("model will be trained using the container: " + str(container))

boto3のSageMakerクライアントとセッションを作成しておきます。


import boto3
import sagemaker

smclient = boto3.client(service_name='sagemaker')
sess = sagemaker.session.Session(sagemaker_client=smclient)

1つ目の学習ジョブの実行

まずは、mini_batch_sizeを100として1つ目の学習ジョブを回します。

# モデルアーティファクトの出力先
output_location_1 = 's3://{}/{}/output-1'.format(bucket, prefix)

# 学習をハンドルするEstimatorの設定
linear_1 = sagemaker.estimator.Estimator(container,
                                        role, 
                                        train_instance_count=1, 
                                        train_instance_type='ml.c4.xlarge',
                                        output_path=output_location_1,
                                        tags=[{"Key":tagKey, "Value":tagValue}],
                                        sagemaker_session=sess)

# ハイパーパラメータの設定
linear_1.set_hyperparameters(feature_dim=784, # データの次元数
                           predictor_type='binary_classifier', # 二値
                           mini_batch_size=100)

# 学習開始
linear_1.fit({'train': s3_train_data})

2つ目の学習ジョブの実行

mini_batch_sizeを200として2つ目の学習ジョブを回します。

output_location_2 = 's3://{}/{}/output-2'.format(bucket, prefix)

linear_2 = sagemaker.estimator.Estimator(container,
                                       role, 
                                       train_instance_count=1, 
                                       train_instance_type='ml.c4.xlarge',
                                       output_path=output_location_2,
                                       tags=[{"Key":tagKey, "Value":tagValue}],
                                       sagemaker_session=sess)

linear_2.set_hyperparameters(feature_dim=784,
                           predictor_type='binary_classifier',
                           mini_batch_size=200)

linear_2.fit({'train': s3_train_data})

3つ目の学習ジョブの実行

mini_batch_sizeを300として3つ目学習ジョブを回します。

output_location_3 = 's3://{}/{}/output-3'.format(bucket, prefix)

linear_3 = sagemaker.estimator.Estimator(container,
                                       role, 
                                       train_instance_count=1, 
                                       train_instance_type='ml.c4.xlarge',
                                       output_path=output_location_3,
                                       tags=[{"Key":tagKey, "Value":tagValue}],
                                       sagemaker_session=sess)

linear_3.set_hyperparameters(feature_dim=784,
                           predictor_type='binary_classifier',
                           mini_batch_size=300)

linear_3.fit({'train': s3_train_data})

ジョブの確認

3つの学習ジョブが終了したらboto3のSageMakerクライアントでSageMaker Searchの機能を使って、ジョブの結果を取得します。 検索の条件は以下のsearch_paramsのような辞書で定義します。今回は指定したタグの学習ジョブかつステータスがCompletedで絞り込んだジョブを取得します。取得するジョブの並び順は、最終的なtrainobjective_lossの値を昇順という指定です。 そのほかの検索クエリなどの詳細についてはドキュメントをご覧ください。

import boto3
import pandas
import numpy as np

search_params={
   "MaxResults": 10,
   "Resource": "TrainingJob",
   "SearchExpression": { 
      "Filters": [ 
         { 
            "Name": "Tags." + str(tagKey),
            "Operator": "Equals",
            "Value": tagValue
         },
         {
            "Name": "TrainingJobStatus",
            "Operator": "Equals",
            "Value": "Completed"
         }
      ]
   },
  "SortBy": "Metrics.train:objective_loss:final",
  "SortOrder": "Ascending"
}

smclient = boto3.client(service_name='sagemaker')
results = smclient.search(**search_params)
print("The search query returned " + str(len(results['Results'])) + " training jobs.\n")

3つのジョブの結果を取得。

取得したジョブを最終的なloss順のDataFrameとそれぞれのlossの値を描画します。

rows=[]
coord=[]
for result in results['Results']: 
    trainingJob = result['TrainingJob']
    metrics = trainingJob['FinalMetricDataList']
    accuracy = metrics[[x['MetricName'] for x in metrics].index('train:objective_loss:final')]['Value']
    rows.append([trainingJob['TrainingJobName'],
                 trainingJob['TrainingJobStatus'],
                 trainingJob['HyperParameters']['mini_batch_size'],
                 accuracy])
    coord.append([float(trainingJob['HyperParameters']['mini_batch_size']), accuracy])

headers=["Training Job Name", "Training Job Status", "Mini Batch Size", "Objective Loss"]
df = pandas.DataFrame(data=rows,columns=headers)
from IPython.display import HTML
display(HTML(df.to_html()))

axes=['mini_batch_size', 'train:objective_loss:final'] 
df = pandas.DataFrame(data=coord, columns=axes)
ax1 = df.plot.scatter(x='mini_batch_size',
                      y='train:objective_loss:final',
                      c='DarkBlue')
ax1

今回はmini_batch_sizeが100のジョブのlossが低いため、一番良いモデルとしてこの後使います。

※今回はサンプルノートブックに合わせて、教師データだけでモデルを評価してますが、本来は検証データとテストデータも合わせて評価した方が良いです。

モデルのデプロイ

エンドポイントを作成し、lossが低かった1つ目のモデルをデプロイします。

linear_predictor = linear_1.deploy(initial_instance_count=1,
                                 instance_type='ml.m4.xlarge', endpoint_name=endpointName)

推論

推論周りをハンドルするpredictorにコンテンツタイプとシリアライザーとでシリアライザーを設定します。

from sagemaker.predictor import csv_serializer, json_deserializer

# csv形式のデータを入力する
linear_predictor.content_type = 'text/csv'
linear_predictor.serializer = csv_serializer

# 推論結果はjsonなので、jsonデータをデシリアライズするように設定
linear_predictor.deserializer = json_deserializer

# 推論するデータを描画
plt.imshow(np.reshape(train_set[0][30], (28, 28)))

# 推論
result = linear_predictor.predict(train_set[0][30])
print(result)

無事推論出来ました

エンドポイントの情報から学習ジョブの情報へ

今デプロイしているモデルの情報を起点に学習ジョブの情報を取得してみます。

# まずはエンドポイントコンフィグを取得
endpoint_config = smclient.describe_endpoint_config(EndpointConfigName=endpointName)

# エンドポイントコンフィグに含まれるモデル名を取得
model_name = endpoint_config['ProductionVariants'][0]['ModelName']

# モデル名からモデル情報を取得し、モデルアーティファクトの場所を取り出す
model = smclient.describe_model(ModelName=model_name)
modelURI = model['PrimaryContainer']['ModelDataUrl']
print(str(modelURI))


#モデルアーティファクトの情報から学習ジョブを検索
search_params={
   "MaxResults": 1,
   "Resource": "TrainingJob",
   "SearchExpression": { 
      "Filters": [ 
         { 
            "Name": "ModelArtifacts.S3ModelArtifacts",
            "Operator": "Equals",
            "Value": modelURI
         }
      ]
   },
}

results = smclient.search(**search_params)
print("The search query returned " + str(len(results['Results'])) + " training jobs.\n")

学習ジョブが見つかりました。

取得したジョブデータから学習時のメトリクスやハイパーパラメータといったジョブの詳細データを取り出して見てみます。

trainingJob = results['Results'][0]['TrainingJob']
metrics = trainingJob['FinalMetricDataList']
metricnames = [x['MetricName'] for x in metrics]
metricvalues = [x['Value'] for x in metrics]
hyperparams = trainingJob['HyperParameters']

# データフレームの作成
headers=["Training Job Name", "Training Datasource URI", "Training Algorithm"] + list(hyperparams.keys()) + metricnames
rows=[]
rows.append([trainingJob['TrainingJobName'],
             trainingJob['InputDataConfig'][0]['DataSource']['S3DataSource']['S3Uri'],
             trainingJob['AlgorithmSpecification']['TrainingImage']] + list(hyperparams.values()) + metricvalues
            )
df = pandas.DataFrame(data=rows,columns=headers)

# データフレームのヘッダ(th要素)に設定するCSS
th_props = [
  ('font-size', '11px'),
  ('text-align', 'center'),
  ('font-weight', 'bold'),
  ('color', '#6d6d6d'),
  ('background-color', '#f7f7f9')
  ]

# データフレームのtd要素用のCSS
td_props = [
  ('font-size', '11px'),
  ('text-align', 'center')
  ]

# CSSを設定
styles = [
  dict(selector="th", props=th_props),
  dict(selector="td", props=td_props)
  ]
html = (df.style.set_table_styles(styles))
from IPython.display import display, HTML

# 表示
html

エンドポイントの削除

最後に無駄な課金が発生しないようにエンドポイントを削除します。

import sagemaker

sagemaker.Session().delete_endpoint(linear_predictor.endpoint)

さいごに

今回は 「クラスメソッド Amazon SageMaker Advent Calendar」 の11日目として、のAmazon SageMaker Searchの機能をboto3のSageMakerクライアントを使って試しました。
機械学習はいくつもの実験を繰り返す必要があるので、大量の学習ジョブが作られる事が多いです。今回試したようにSageMaker SearchのAPIを利用する事で、そういった大量の学習ジョブを独自に扱うシステムを作る事も出来そうです。とはいえ、SageMaker Searchは今はまだベータ機能です。今後の大幅な変更がある可能性もあるため注意が必要です。今後の改善・拡張に期待です!

お読みくださりありがとうございました〜!明日もお楽しみに〜!