Amazon SageMakerのk-Nearest-Neighbor(kNN/k近傍法)をパフォーマンス検証してみた

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

概要

こんにちは、yoshimです。 今回はSage Makerでビルトインアルゴリズムとして実装されている「kNN(k近傍法)」について、チュートリアルを実際にやってみます。 今回は「パフォーマンス検証」の部分をやってみようと思います。

目次

1.最初に

本エントリーは「Sage Maker」の「kNN」のチュートリアルを実際にやってみたので、そのご紹介のエントリーです。
また、このチュートリアルは「Part1」、「Part2」と2段階に別れていて、今回ご紹介するのは「Part2」の「パフォーマンス検証」の部分になります。

具体的には、「推論エンドポイントをcpu、gpuのいずれにするか」、「kNNアルゴリズム内の検索アルゴリズム」、「サブサンプリングサイズ」等をそれぞれ変更してのパフォーマンス検証をしていました。 検証結果としては、「処理速度、精度、コストはトレードオフの関係なのでユースケースに合わせて検討する必要がある」というものなのですが、検証方法については一回目を通しておいても損はないと思います。

今回利用したコードがある場所は下記の通りです。

「sagemaker example」の

これです。

Gitだとこちらです。 これの「Part 2: Deep dive, Tuning kNN」を本エントリーではやってみます。

なお、「Part1」については別エントリーでご紹介しているので、よろしかったらそちらもご参照ください。

2.実際にやってみた

精度、レイテンシーのトレードオフ関係について具体的にどのように対応するべきか、といったことを検証するために、「kNNの分析手法」、「エンドポイントのインスタンスタイプ」等について検証していきます。

2-1.便利な関数の作成

モデル生成時に検証用データでの検証をすませ、ログに結果を出力してくれるような便利な関数を作成します。 (これで、エンドポイント作成&エンドポイントに検証用データを投入する手間が省ける)

 
import matplotlib.pyplot as plt

import sagemaker
from sagemaker import get_execution_role
from sagemaker.predictor import csv_serializer, json_deserializer

def trained_estimator_from_hyperparams(s3_train_data, hyperparams, output_path, s3_test_data=None):
    """
    Create an Estimator from the given hyperparams, fit to training data, 
    and return a deployed predictor
    
    """
    # specify algorithm containers. These contain the code for the training job
    containers = {
        'us-west-2': '174872318107.dkr.ecr.us-west-2.amazonaws.com/knn:1',
        'us-east-1': '382416733822.dkr.ecr.us-east-1.amazonaws.com/knn:1',
        'us-east-2': '404615174143.dkr.ecr.us-east-2.amazonaws.com/knn:1',
        'eu-west-1': '438346466558.dkr.ecr.eu-west-1.amazonaws.com/knn:1',
        'ap-northeast-1': '351501993468.dkr.ecr.ap-northeast-1.amazonaws.com/knn:1',
        'ap-northeast-2': '835164637446.dkr.ecr.ap-northeast-2.amazonaws.com/knn:1',
        'ap-southeast-2': '712309505854.dkr.ecr.ap-southeast-2.amazonaws.com/knn:1'
    }
    # set up the estimator
    knn = sagemaker.estimator.Estimator(containers[boto3.Session().region_name],
        get_execution_role(),
        train_instance_count=1,
        train_instance_type='ml.m5.2xlarge',
        output_path=output_path,
        sagemaker_session=sagemaker.Session())
    knn.set_hyperparameters(**hyperparams)
    
    # train a model. fit_input contains the locations of the train and test data
    fit_input = {'train': s3_train_data}
    if s3_test_data is not None:
        fit_input['test'] = s3_test_data
    knn.fit(fit_input)
    return knn

この関数が保持する2つのハイパーパラメータについては下記の通りです。

  • train_instance_count: 学習に利用されるインスタンス数です。データセットが大きい場合は複数のインスタンスを利用する必要があります。今回のケースではデータセットが小さいので、インスタンス数は1で実行します。

  • train_instance_type:学習に利用されるインスタンスタイプです。'ml.m5.2xlarge'を推奨しますが、データセットが大きい場合はGPU,もしくはコンピューティング最適化されたインスタンス(ml.c5.2xlarge等)を利用するといいでしょう。

2-2.推論用エンドポイント作成の関数

「2-1.便利な関数の作成」で作成した関数はエンドポイントを立てるのに必要な情報を含み、かつ学習が済んだモデルを返します。 下記の関数は学習済みのモデルを利用し、エンドポイントを立ち上げます。 下記の関数の「instance_type」引数に指定したインスタンスタイプでエンドポイントを立ち上げるので、学習で利用したエンドポイントとは異なるインスタンスタイプでエンドポイントを立ち上げることができます。

kNNでは、「与えたデータポイントと全てのデータポイントとの距離を計算」する必要があるために、推論に時間がかかります。 そのため、パフォーマンスを向上するためにGPUインスタンス、もしくはコンピューティング最適化されたCPU (c5.xlarge)インスタンスを利用したい場合があります。 ここからは、上記の2つのインスタンスタイプでのパフォーマンスを比較します。

 
def predictor_from_hyperparams(knn_estimator, estimator_name, instance_type, endpoint_name=None): 
    knn_predictor = knn_estimator.deploy(initial_instance_count=1, instance_type=instance_type,
                                        endpoint_name=endpoint_name)
    knn_predictor.content_type = 'text/csv'
    knn_predictor.serializer = csv_serializer
    knn_predictor.deserializer = json_deserializer
    return knn_predictor
2-3.学習の実行

複数のパターンで学習してパフォーマンスを比較してみます。

  • kNNの検索アルゴリズム 「brute force(総当たり)」と「IndexIVFPQインデックスを利用する」パターンの2パターンで比較します。 「brute force(総当たり)」は全てのデータポイントと距離を計算するので、処理は遅いのですが精度は高くなります。 一方、「IndexIVFPQインデックスを利用する」パターンでは計算対象のデータポイントを最初に絞り込むために処理が早くなる一方、「総当たり」と比較すると精度が落ちてしまいます。 また、いずれの検索アルゴリズムでも距離は「ユークリッド距離」を利用します。

  • サブサンプリングサイズ sagemakerでのkNNにおける「sample_size」パラメータはモデルを生成するのに利用されるデータポイント数を意味します。 精度のことを考えると全てのデータポイントを利用したいところですが、計算コストが高くなりすぎる、もしくはそもそも実行不可能になることがあります。 更に言うなら、200,000件のデータでの学習で2,000,000件のデータでの学習と同程度の精度を得ることができることもあります。 今回の検証ではこのパラメータに「200,000」、「500,000」を指定して比較検証します。

つまり、最終的には下記の4パターンで比較検証を実施します。

アルゴリズム サブサンプリングサイズ
brute force 200,000
brute force 500,000
IndexIVFPQ 200,000
IndexIVFPQ 500,000
 
'''
「総当たり」、「サンプルサイズ200,000件」で学習し、S3にモデルを出力します。
結果は「test accuracy <score>=0.812020240267」でした。
'''

hyperparams_flat_l2 = {
    'feature_dim': 54,
    'k': 100,
    'sample_size': 200000,
    'predictor_type': 'classifier' 
    # NOTE: The default distance is L2 and index is Flat, so we don't list them here
}
output_path_flat_l2 = 's3://' + bucket + '/' + prefix + '/flat_l2/output'
knn_estimator_flat_l2 = trained_estimator_from_hyperparams(s3_train_data, hyperparams_flat_l2, output_path_flat_l2, 
                                                           s3_test_data=s3_test_data)
 
'''
「総当たり」、「サンプルサイズ500,000件」で学習し、S3にモデルを出力します。
結果は「test accuracy <score>=0.871639530481」でした。
'''

hyperparams_flat_l2_large = {
    'feature_dim': 54,
    'k': 100,
    'sample_size': 500000,
    'predictor_type': 'classifier' 
}
output_path_flat_l2_large = 's3://' + bucket + '/' + prefix + '/flat_l2_large/output'
knn_estimator_flat_l2_large = trained_estimator_from_hyperparams(s3_train_data, hyperparams_flat_l2_large, 
                                                                 output_path_flat_l2_large, s3_test_data=s3_test_data)
 
'''
「IndexIVFPQ」インデックス 、「サンプルサイズ200,000件」で学習し、S3にモデルを出力します。
結果は「test accuracy <score>=0.804464562321」でした。
'''

hyperparams_ivfpq_l2 = {
    'feature_dim': 54,
    'k': 100,
    'sample_size': 200000,
    'predictor_type': 'classifier',
    'index_type': 'faiss.IVFPQ',
    'index_metric': 'L2',
    'faiss_index_ivf_nlists': 500, 
    'faiss_index_pq_m': '8'
    # number of distinct quantizers. In our case dim=54, so every quantizer will code up to 7 numbers.
    # Each block of (up to) 7 numbers will be coded into a single byte.
}
output_path_ivfpq_l2 = 's3://' + bucket + '/' + prefix + '/ivfpq_l2/output'
knn_estimator_ivfpq_l2 = trained_estimator_from_hyperparams(s3_train_data, hyperparams_ivfpq_l2, output_path_ivfpq_l2, 
                                                           s3_test_data=s3_test_data)
 
'''
「IndexIVFPQ」インデックス 、「サンプルサイズ500,000件」で学習し、S3にモデルを出力します。
結果は「test accuracy <score>=0.863188874738」でした。
'''

hyperparams_ivfpq_l2_large = {
    'feature_dim': 54,
    'k': 100,
    'sample_size': 500000,
    'predictor_type': 'classifier',
    'index_type': 'faiss.IVFPQ',
    'index_metric': 'L2',
    'faiss_index_ivf_nlists': 700, 
    'faiss_index_pq_m': '8'
}
output_path_ivfpq_l2_large = 's3://' + bucket + '/' + prefix + '/ivfpq_l2_large/output'
knn_estimator_ivfpq_l2_large = trained_estimator_from_hyperparams(s3_train_data, hyperparams_ivfpq_l2_large, 
                                                                  output_path_ivfpq_l2_large, s3_test_data=s3_test_data)
2-4.結果の評価

結果をまとめると下記の通りでした。 ある程度想定通りですが、サブサンプリングサイズは500,000件にした方が良さそうですね。 また、検索アルゴリズムを変えても精度は思ったよりは低下しませんでした。 レイテンシーの確認後、レイテンシーと精度のトレードオフの兼ね合いからどちらの検索アルゴリズムを利用するかを検討してもいいのではないでしょうか?

アルゴリズム サブサンプリングサイズ 検証用データセットでの精度
brute force 200,000 0.812020240267
brute force 500,000 0.871639530481
IndexIVFPQ 200,000 0.804464562321
IndexIVFPQ 500,000 0.863188874738
2-5.評価

sagemakerのkNNでは、「どのラベルか、といった単一の予測結果」のみならず「近くのデータのラベル、距離」を得ることができます。 この返り値を利用することで、下記のようなことができます。

  • kのチューニング:リストの前の方にある値を利用することで、kの値を調整可能
  • ランキング/複数のクラスづけ:一つのクラスに分類するのではなく、複数のクラスでランクづけをしたい場合がありますが、そのような場合に利用します。(ex.画像のタグ付け)

上記の返り値を取得するためには、エンドポイントにcurlする際に「verbose」を使用する必要があります。
推論時のinput,output仕様についてはこちらをご参照ください。

以降は、一旦「kのチューニング」に絞って検証を進めます。 下記に作成した「evaluate」関数を利用して精度とレイテンシーを検証します。

また、ここで利用している「精度の指標」4点の概要は下記の通りです。

精度の評価指標 意味
f1_weight 加重平均。データの件数に比例した、f1の平均値。
f1_macro マクロ平均。全クラスのf1の平均値。
f1_micro マイクロ平均。各クラスごとにTP,FP,FNの件数を計算し、その値からf1を求める。
acc (正解数)/(データの件数)
 
from scipy import stats
import matplotlib.pyplot as plt
from sklearn.metrics import f1_score, accuracy_score
import time

# 複数の異なるkでのスコアを保存する。
def scores_for_ks(test_labels, knn_labels, ks):
    f1_weight = []
    f1_macro = []
    f1_micro = []
    acc = []
    for k in ks:
        pred_k = stats.mode(knn_labels[:,:k], axis=1)[0].reshape((-1,))
        f1_weight.append(f1_score(test_labels, pred_k, average='weighted'))
        f1_macro.append(f1_score(test_labels, pred_k, average='macro'))
        f1_micro.append(f1_score(test_labels, pred_k, average='micro'))
        acc.append(accuracy_score(test_labels, pred_k))
    return {'f1_weight': f1_weight, 'f1_macro': f1_macro, 'f1_micro': f1_micro, 'accuracy': acc, }

# 結果をプロットする
def plot_prediction_quality(scores, ks):
    colors = ['r-', 'b-', 'g-','y-'][:len(scores)]
    for (k,v), color in zip(scores.items(), colors):
        plt.plot(ks, v, color, label=k)
    plt.legend()
    plt.xlabel('k')
    plt.ylabel('prediction quality')
    plt.show()

# 精度(scores_for_ksに出てきた4つの指標)を評価する
def evaluate_quality(predictor, test_features, test_labels, model_name, verbose=True, num_batches=100):
    """
    Evaluate quality metrics of a model on a test set. 
    """
    # tune the predictor to provide the verbose response
    predictor.accept = 'application/json; verbose=true'
    
    # split the test data set into num_batches batches and evaluate using prediction endpoint. 
    print('running prediction (quality)...')
    batches = np.array_split(test_features, num_batches)
    knn_labels = []
    for batch in batches:
        pred_result = predictor.predict(batch)
        cur_knn_labels = np.array([pred_result['predictions'][i]['labels'] for i in range(len(pred_result['predictions']))])
        knn_labels.append(cur_knn_labels)
    knn_labels = np.concatenate(knn_labels)
    print('running prediction (quality)... done')

    # figure out different k values
    top_k = knn_labels.shape[1]
    ks = range(1, top_k+1)
    
    # compute scores for the quality of the model for each value of k
    print('computing scores for all values of k... ')
    quality_scores = scores_for_ks(test_labels, knn_labels, ks)
    print('computing scores for all values of k... done')
    if verbose:
        plot_prediction_quality(quality_scores, ks)
    
    return quality_scores

# レイテンシーを評価する
def evaluate_latency(predictor, test_features, test_labels, model_name, verbose=True, num_batches=100):
    """
    Evaluate the run-time of a model on a test set.
    """
    # tune the predictor to provide the non-verbose response
    predictor.accept = 'application/json' # 受け取るデータの仕様
    
    # latency for large batches:
    # split the test data set into num_batches batches and evaluate the latencies of the calls to endpoint. 
    print('running prediction (latency)...')
    batches = np.array_split(test_features, num_batches)
    test_preds = []
    latency_sum = 0
    for batch in batches:
        start = time.time()
        pred_batch = predictor.predict(batch)
        latency_sum += time.time() - start
    latency_mean = latency_sum / float(num_batches)
    avg_batch_size = test_features.shape[0] / num_batches
    
    # estimate the latency for a batch of size 1
    latencies = []
    attempts = 2000
    for i in range(attempts):
        start = time.time()
        pred_batch = predictor.predict(test_features[i].reshape((1,-1)))
        latencies.append(time.time() - start)

    latencies = sorted(latencies)
    latency1_mean = sum(latencies) / float(attempts)
    latency1_p90 = latencies[int(attempts*0.9)]
    latency1_p99 = latencies[int(attempts*0.99)]
    print('running prediction (latency)... done')
    
    if verbose:
        print("{:<11} {:.3f}".format('Latency (ms, batch size %d):' % avg_batch_size, latency_mean * 1000))
        print("{:<11} {:.3f}".format('Latency (ms) mean for single item:', latency1_mean * 1000))
        print("{:<11} {:.3f}".format('Latency (ms) p90 for single item:', latency1_p90 * 1000))
        print("{:<11} {:.3f}".format('Latency (ms) p99 for single item:', latency1_p99 * 1000))
        
    return {'Latency': latency_mean, 'Latency1_mean': latency1_mean, 'Latency1_p90': latency1_p90, 
            'Latency1_p99': latency1_p99}

# ここまでの関数をまとめた
def evaluate(predictor, test_features, test_labels, model_name, verbose=True, num_batches=100):
    eval_result_q = evaluate_quality(pred, test_features, test_labels, model_name=model_name, verbose=verbose, num_batches=num_batches)
    eval_result_l = evaluate_latency(pred, test_features, test_labels, model_name=model_name, verbose=verbose, num_batches=num_batches)
    return dict(list(eval_result_q.items()) + list(eval_result_l.items()))
    
2-6.検証

準備が完了したので、「2-4.結果の評価」で評価した4パターンに、さらに「推論用エンドポイントをcpu/gpuのいずれにするか」というケースを追加した、計8パターンで精度とレイテンシーを確認します。 なお、下記のコードでは検証が完了したらエンドポイントは削除しています。

 
import time

instance_types = ['ml.c5.xlarge', 'ml.p2.xlarge']
index2estimator = {'flat_l2': knn_estimator_flat_l2, 'ivfpq_l2': knn_estimator_ivfpq_l2,
                  'flat_l2_large': knn_estimator_flat_l2_large, 'ivfpq_l2_large': knn_estimator_ivfpq_l2_large}

eval_results = {}

for index in index2estimator:
    estimator = index2estimator[index]
    eval_results[index] = {}
    for instance_type in instance_types:
        model_name = 'knn_%s_%s'%(index, instance_type)
        endpoint_name = 'knn-latency-%s-%s-%s'%(index.replace('_','-'), instance_type.replace('.','-'),
                                               str(time.time()).replace('.','-'))
        print('\nsetting up endpoint for instance_type=%s, index_type=%s' %(instance_type, index))
        pred = predictor_from_hyperparams(estimator, index, instance_type, endpoint_name=endpoint_name) # 学習済みモデルを利用してエンドポイント立ち上げ
        print('')
        eval_result = evaluate(pred, test_features, test_labels, model_name=model_name, verbose=True) # 評価
        eval_result['instance'] = instance_type 
        eval_result['index'] = index 
        eval_results[index][instance_type] = eval_result # eval_resultsに結果を格納
        delete_endpoint(pred)

下記に結果をまとめます。

●レイテンシー
私が実行した時は下記の通りでした。

検索アルゴリズム インスタンスタイプ サブサンプリングサイズ バッチ単位でのレイテンシー(平均)(ms) レイテンシー(単発)の平均(ms) レイテンシー(単発)の低い方から90%の値(ms) レイテンシー(単発)の低い方から99%の値(ms)
総当たり CPU 200,000 180.045 15.668 18.212 24.064
総当たり GPU 200,000 60.022 13.635 14.338 16.783
IndexIVFPQ CPU 200,000 64.010 8.164 9.755 15.285
IndexIVFPQ CPU 200,000 62.799 11.134 13.048 19.478
総当たり CPU 500,000 376.948 24.943 28.879 32.635
総当たり GPU 500,000 86.838 14.495 15.836 18.323
IndexIVFPQ CPU 500,000 70.170 10.631 11.662 19.040
IndexIVFPQ GPU 500,000 61.245 11.351 12.692 18.445

「バッチ単位」では581個のデータポイントを処理しているのですが、単純に単発のレイテンシーにデータポイント数を掛け算したレイテンシーにはならないようです。

●精度
私が実施した際は下記の通りでした。
いずれの場合も似たような傾向を示していて、「f1_macro」が低いようでした。

・総当たり,CPU,サブサンプリングサイズ=200000

・総当たり,GPU,サブサンプリングサイズ=200000

・IndexIVFPQ,CPU,サブサンプリングサイズ=200000

・IndexIVFPQ,GPU,サブサンプリングサイズ=200000

・総当たり,CPU,サブサンプリングサイズ=500000

・総当たり,GPU,サブサンプリングサイズ=500000

・IndexIVFPQ,CPU,サブサンプリングサイズ=500000

・IndexIVFPQ,GPU,サブサンプリングサイズ=500000

2-7.「k」に色々な値を入れて精度を検証

「k」に色々な値を入れてもう少し検証を進めてみましょう。

 
import pandas as pd

k_range = range(1, 13)
df_index = []
data = []
columns_lat = ['latency1K', 'latency1_mean', 'latency1_p90', 'latency1_p99']
columns_acc = ['acc_%d' % k for k in k_range]
columns = columns_lat + columns_acc

for index, index_res in eval_results.items():
    for instance, res in index_res.items():
        # for sample size?
        df_index.append(index+'_'+instance)
        latencies = np.array([res['Latency'], res['Latency1_mean'], res['Latency1_p90'], res['Latency1_p99']])
        row = np.concatenate([latencies*10,
                             res['accuracy'][k_range[0] - 1:k_range[-1] ]])
        row *= 100
        data.append(row)

df = pd.DataFrame(index=df_index, data=data, columns=columns)
df_acc = df[columns_acc]
df_lat = df[columns_lat]

def highlight_apx_max(row):
    '''
    highlight the aproximate best (max or min) in a Series yellow.
    '''
    max_val = row.max()
    colors = ['background-color: yellow' if cur_val >= max_val * 0.9975 else '' for cur_val in row]
        
    return colors

df_acc.round(decimals=1).style.apply(highlight_apx_max, axis=1)

いずれかの精度指標値が他の「k」と比較した際に0.25%以上高い場合に目立つようにプロットしています。

2-8.レイテンシーの検証

続いて、レイテンシーの検証をします。

 
def highlight_far_from_min(row):
    '''
    highlight the aproximate best (max or min) in a Series yellow.
    '''
    med_val = row.median()
    colors = ['background-color: red' if cur_val >= med_val * 1.2 else '' for cur_val in row]
        
    return colors

df_lat.round(decimals=1).style.apply(highlight_far_from_min, axis=0)

レイテンシーが中央値より20%以上悪い場合は目立つようにプロットしています。 「総当たり」で「CPUインスタンス」を使っている場合はレイテンシーが高くなってしまっていました...。

2-9.ここまでの検証のまとめ
精度

サブサンプリングサイズが200,000件の際は、kは小さい方が精度が大きくなりそうです。しかし、「k」が小さすぎるとちょっとデータが変わっただけで結果もすぐに変わってしまうので少し不安です。 そこで、妥協点としてk=5くらいに設定するのが良さそうです。
また、「データ量を増やす」、「総当たりで計算する」といった対策をすることで精度はよくなります。
(今回の場合は、精度が97%にまで向上しました)

もし精度よりも処理スピードの方が重要な場合は、「データ量を増やしてIVFPQ_l2_large_XXを使う」、もしくは「サンプルサイズを小さくして総当たり」で計算するといった方法が良さそうです。

レイテンシー

ほとんどの場合で、20ms程度までに結果を取得することができました。
(ただ、「総当たり」で「CPU」を利用する場合は注意が必要でしょう)

3.まとめ

今まで自分はsagemakerでパフォーマンス検証と言ったら、ハイパーパラメータのチューニングしか頭になかったのですが、実際に利用する際はエンドポイントのレイテンシー等も考慮する必要があり、またそのためにはこのような検証をすると良い、というのが整理できたのでとても勉強になりました。
違うアルゴリズムを利用する際でも、同様の検証をすることはあると思うので、今回利用した方法は今後も参考にしようと思います。

4.引用

kNNについて
今回実行した分析内容について
推論時のinput,output仕様
sagemakerにおけるkNNの処理内容について
sagemakerにおけるkNNの処理内容について2
トレーニング時のinput仕様