Amazon SageMakerでk-Nearest-Neighbor(k-NN/k近傍法)をやってみた

2018.08.08

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

概要

こんにちは、yoshimです。
今回はsagemakerでビルトインアルゴリズムとして実装されている「k-NN(k近傍法)」について、チュートリアルを実際にやってみます。

目次

1.k-NN(k近傍法)とは

そもそも「k-NN(k近傍法)」とはなんなのか、という点についてですが、こちらについては昨年書いたブログをご参照ください。 大まかにいうと、「似たようなデータの値を元に推測(分類、回帰)をする」といったアルゴリズムです。 上述のブログでは「レコメンドシステム」の例でご紹介しています。

2.チュートリアルの説明

コードがある場所は下記の通りです。

「Sagemaker Examples」の

これです。

こちらからもご確認いただけます。

また、分析内容は「地理的特徴(土壌の質、標高、海抜等)」を使って「森林被覆」の種類を推測するというものです。 説明についてはこちらもご参照ください。

コードを見てみるとわかるのですが、このチュートリアルは結構長く、大まかに2つに区分できます。

Part 1: Running kNN in 5 minutes

「データ取り込み→モデル生成→推論エンドポイント生成→実際に推論→エンドポイント削除」までをやるという、「とりあえず一通りやってみる」、といった形です。 大まかに「sagemakerでk-NNをやるならどんな感じか」といったところを把握するためならここまでやれば十分そうでした。

Part 2: Deep dive, Tuning kNN

「推論エンドポイントをcpu、gpuのいずれにするか」、「k-NNアルゴリズム内の検索アルゴリズム」、「サブサンプリングサイズ」等をそれぞれ変更してのパフォーマンス検証をしていました。 検証結果としては、「処理速度、精度、コストはトレードオフの関係なのでユースケースに合わせて検討する必要がある」というものなのですが、検証方法については一回目を通しておいても損はないと思います。 (こちらは分析者よりもエンジニア向けの検証かと思います)

本エントリーでは「Part 1: Running kNN in 5 minutes」についてやってみます。
また、「Part 2: Deep dive, Tuning kNN」についてはこちらをご参照ください。

3.実際にやってみた

では、実際にチュートリアルをやってみようと思います。 ここでは、sagemaekerインスタンスは立ち上げ済みで、もうコードが実行できる環境が用意されていることを前提とします。 また、一部私の方で処理内容に関するコメントを入れています。

3-1.データのダウンロード

今回利用するデータセットをsagemakerにダウンロードしています。

%%bash

wget 'https://archive.ics.uci.edu/ml/machine-learning-databases/covtype/covtype.data.gz'
mkdir -p /tmp/covtype/raw
mv covtype.data.gz /tmp/covtype/raw/covtype.data.gz

ダウンロードしたデータはこんな感じです。

一番右のカラムがラベルで、それ以外が全て特徴量のデータです。

3-2.データの前処理

「3-1.データのダウンロード」でsagemakerインスタンスにダウンロードしたデータを「トレーニング用」と「検証用」に分割します。 また、「特徴量」と「正解ラベル」を分割しています。

import numpy as np
import os

data_dir = "/tmp/covtype/"
processed_subdir = "standardized"
raw_data_file = os.path.join(data_dir, "raw", "covtype.data.gz")
train_features_file = os.path.join(data_dir, processed_subdir, "train/csv/features.csv")
train_labels_file = os.path.join(data_dir, processed_subdir, "train/csv/labels.csv")
test_features_file = os.path.join(data_dir, processed_subdir, "test/csv/features.csv")
test_labels_file = os.path.join(data_dir, processed_subdir, "test/csv/labels.csv")

# read raw data
print("Reading raw data from {}".format(raw_data_file))
raw = np.loadtxt(raw_data_file, delimiter=',') # csvファイルの読み込み。ファイルの中身を確認すれば、CSVであることが確認できます。

# split into train/test with a 90/10 split
np.random.seed(0) # seed値を設定。何回やっても同じ結果にするためです。(下記の処理はランダム性がある処理なのですが、結果を統一するため)
np.random.shuffle(raw)
train_size = int(0.9 * raw.shape[0]) # トレーニング用データセットサイズを、全体の9割に指定
train_features = raw[:train_size, :-1] # 一番右側のカラムは「ラベル(クラス)」なので、それ以外のカラムを取り込む
train_labels = raw[:train_size, -1] # 一番右側のカラムのみを取り込む
test_features = raw[train_size:, :-1]
test_labels = raw[train_size:, -1]
3-3.S3にアップロード

・トレーニングデータをrecordIO形式に変換

import io
import sagemaker.amazon.common as smac

print('train_features shape = ', train_features.shape)
print('train_labels shape = ', train_labels.shape)

# Convert data to binary stream.
buf = io.BytesIO()
smac.write_numpy_to_dense_tensor(buf, train_features, train_labels)
buf.seek(0)

・トレーニングデータをS3にアップロード

import boto3
import os
import sagemaker

# bucket = sagemaker.Session().default_bucket() # modify to your bucket name
bucket = 'cm-yoshim-sagemaker-tokyo' # ⇦ここをあなたが使うバケット名に変更する。
prefix = 'knn-blog-2018-08-08' # ⇦ここをあなたが使うs3パスのプレフィックス名にする
key = 'recordio-pb-data'  # S3に出力されるファイル名

boto3.resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'train', key)).upload_fileobj(buf) # 上記で指定したパス、ファイル名でアップロード
s3_train_data = 's3://{}/{}/train/{}'.format(bucket, prefix, key)
print('uploaded training data location: {}'.format(s3_train_data))

こんな感じにファイルがアップロードできると思います。

・検証用データをS3にアップロード 検証用データも同様にS3にファイルをアップロードします。

print('test_features shape = ', test_features.shape)
print('test_labels shape = ', test_labels.shape)

buf = io.BytesIO()
smac.write_numpy_to_dense_tensor(buf, test_features, test_labels)
buf.seek(0)

boto3.resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'test', key)).upload_fileobj(buf)
s3_test_data = 's3://{}/{}/test/{}'.format(bucket, prefix, key)
print('uploaded test data location: {}'.format(s3_test_data))
3-4.トレーニング

トレーニングデータのパス、ハイパーパラメータ、モデルの出力先、(検証用データ)を引数として与えると「学習用インスタンスを立ち上げて、学習し、モデルをS3に出力する」関数を作成しています。

(k-NNは一般的なモデルとは異なり「学習」といったことはしません。では、ここでしているトレーニングとは何か、という点については4-1.トレーニングフェーズについてをご参照ください)

import matplotlib.pyplot as plt

import sagemaker
from sagemaker import get_execution_role
from sagemaker.predictor import csv_serializer, json_deserializer


def trained_estimator_from_hyperparams(s3_train_data, hyperparams, output_path, s3_test_data=None):
    """
    Create an Estimator from the given hyperparams, fit to training data, 
    and return a deployed predictor
    
    """
    # specify algorithm containers. These contain the code for the training job
    containers = {
        'us-west-2': '174872318107.dkr.ecr.us-west-2.amazonaws.com/knn:1',
        'us-east-1': '382416733822.dkr.ecr.us-east-1.amazonaws.com/knn:1',
        'us-east-2': '404615174143.dkr.ecr.us-east-2.amazonaws.com/knn:1',
        'eu-west-1': '438346466558.dkr.ecr.eu-west-1.amazonaws.com/knn:1',
        'ap-northeast-1': '351501993468.dkr.ecr.ap-northeast-1.amazonaws.com/knn:1',
        'ap-northeast-2': '835164637446.dkr.ecr.ap-northeast-2.amazonaws.com/knn:1',
        'ap-southeast-2': '712309505854.dkr.ecr.ap-southeast-2.amazonaws.com/knn:1'
    }
    # set up the estimator
    knn = sagemaker.estimator.Estimator(containers[boto3.Session().region_name],
        get_execution_role(),
        train_instance_count=1,  # 場合によっては変更する必要あり(インスタンスが増えても自動で分散してくれるのでコードの変更は不要)
        train_instance_type='ml.m5.2xlarge',  # 場合によっては変更する必要あり
        output_path=output_path,
        sagemaker_session=sagemaker.Session())
    knn.set_hyperparameters(**hyperparams)
    
    # train a model. fit_input contains the locations of the train and test data
    fit_input = {'train': s3_train_data}
    if s3_test_data is not None:
        fit_input['test'] = s3_test_data
    knn.fit(fit_input)
    return knn
3-5.トレーニング

ハイパーパラメータを指定し、上記で生成した関数を利用して学習を実行します。 コンソールのログ画面から「検証用データでの精度」が確認できますので、これ以降の作業に進む前に一回確認しておくといいと思います。

'''
ハイパーパラメータを指定し、学習を実行します。
https://docs.aws.amazon.com/ja_jp/sagemaker/latest/dg/kNN-tuning.html

「k」は近くの何個のデータと比較するか、ということを表すパラメータなのですが、イメージがつかない場合は手前味噌ですが下記のブログをご参照ください。
<blockquote class="wp-embedded-content" data-secret="F2TauRqy6f"><a href="https://dev.classmethod.jp/machine-learning/2017ad_20171218_knn/">機械学習_k近傍法_理論編</a></blockquote><iframe class="wp-embedded-content" sandbox="allow-scripts" security="restricted" style="position: absolute; clip: rect(1px, 1px, 1px, 1px);" src="https://dev.classmethod.jp/machine-learning/2017ad_20171218_knn/embed/#?secret=F2TauRqy6f" data-secret="F2TauRqy6f" width="600" height="338" title="“機械学習_k近傍法_理論編” — Developers.IO" frameborder="0" marginwidth="0" marginheight="0" scrolling="no"></iframe>

学習が終わったら、コンソールのsagemakerの「ログ」から「検証用データセットでの精度」が確認できます。
(NNベースだとエポックごとの評価指標推移が見れて面白いのですが、k-NNだとそういうのはないです)
私が実施した際は「test accuracy <score>=0.922136931603」とのことでした。

S3バケットとsagemakerインスタンスのリージョンが異なるとエラーになるので注意!

'''

hyperparams = {
    'feature_dim': 54, # 特徴量の次元数
    'k': 10, # knnのパラメータ。近くの何個のデータポイントと比較して、分類をするか。
    'sample_size': 200000, # サブサンプリングのサイズ(全データで計算する必要がないことも多いので)
    'predictor_type': 'classifier'  # knnは回帰もできるけど、今回は分類に設定。回帰にする場合は'regressor'にする。回帰の場合は直近kこのデータポイントの「平均」が取得される。
}
output_path = 's3://' + bucket + '/' + prefix + '/output' # モデルのoutput先のS3パス。適宜修正すること。
knn_estimator = trained_estimator_from_hyperparams(s3_train_data, hyperparams, output_path, 
                                                   s3_test_data=s3_test_data)

学習の進捗はコンソール画面上から確認できます。 この画像は学習中なので「ステータス」が「InProgress」になっていますね。 これは「Completed」になったら学習完了です。

「検証用データセットでの精度」を確認するためのログ画面は

この「ログの表示」をクリックして

対象のログをクリックして

最後の方に書いてありました!

3-6.推論用エンドポイント作成

作成したモデルをホストする推論用エンドポイントを生成します。 インスタンスタイプは「推論に要する時間」や「コスト」との兼ね合いで検討する必要があります。

・推論用エンドポイントを作成する関数を用意

'''
推論用エンドポイントを作成する関数を生成。
インスタンスタイプ、エンドポイント名を指定できるように引数に追加されている。
'''


def predictor_from_estimator(knn_estimator, estimator_name, instance_type, endpoint_name=None): 
    knn_predictor = knn_estimator.deploy(initial_instance_count=1, instance_type=instance_type,
                                        endpoint_name=endpoint_name)
    knn_predictor.content_type = 'text/csv' # エンドポイントが受け取るデータの仕様。
    knn_predictor.serializer = csv_serializer
    knn_predictor.deserializer = json_deserializer
    return knn_predictor

この推論用エンドポイントが受け取れるデータ仕様については、推論時のinput,output仕様をご確認いただきたいのですが、一応「4.気になったところ」に纏めておきました。 実際に利用する際は必ず確認する必要がある点です。

・推論用エンドポイントを作成

'''
上記で生成した関数に入れる引数を指定し、エンドポイントを生成
コンソールの「エンドポイント」を見るとエンドポイントを生成していることを確認できる。

エンドポイントは不要になったら削除する、ということを意識しないとコストがかかってしまうので注意!
'''

import time

instance_type = 'ml.m4.xlarge'
model_name = 'knn_%s'% instance_type
endpoint_name = 'knn-ml-m4-xlarge-%s'% (str(time.time()).replace('.','-'))
print('setting up the endpoint..')
predictor = predictor_from_estimator(knn_estimator, model_name, instance_type, endpoint_name=endpoint_name)

エンドポイントの作成過程もコンソール上から確認できます。 この画像だとステータスが「Creating」になっているので作成途中ですね。 エンドポイントの作成が完了したら、ここが「InService」になります。

3-7.推論

作成した推論用エンドポイントに検証用データセットを投入し、精度を検証します。 先ほどログで確認したものと同じ数値が出るはずです。

batches = np.array_split(test_features, 100) # 検証用データセットを100個に分割する。
print('data split into 100 batches, of size %d.' % batches[0].shape[0])

# obtain an np array with the predictions for the entire test set
start_time = time.time()
predictions = []
for batch in batches: # バッチ単位で処理
    result = predictor.predict(batch)
    cur_predictions = np.array([result['predictions'][i]['predicted_label'] for i in range(len(result['predictions']))]) # レコード単位で処理
    predictions.append(cur_predictions)
predictions = np.concatenate(predictions)
run_time = time.time() - start_time

test_size = test_labels.shape[0]
num_correct = sum(predictions == test_labels)
accuracy = num_correct / float(test_size)
print('time required for predicting %d data point: %.2f seconds' % (test_size, run_time))
print('accuracy of model: %.1f%%' % (accuracy * 100) ) # ログで確認した精度と同じ数字が出ますよ-。

こんな結果が出ました。 確かに先ほどログ画面で確認したものと同じような精度が確認できます。 (小数点以下を切り捨てているのですが)

これはサンプルコードに載ってないのですが、試しに1レコードだけ適当にエンドポイントに食わせて結果を確認してみることもできます。

'''
実際にデータを突っ込んでみて、どんな形になるかを確認してみましょう
'''
start_time = time.time()


test_features_work = test_features[:1,]
test_label_work = test_labels[:1,]

print('testデータの特徴量:{}'.format(test_features_work))
print('testデータのラベル:{}'.format(test_label_work))
print('推論の結果:{}'.format(predictor.predict(test_features_work)))

print('推論に要した時間は{}秒でした'.format(time.time() - start_time))
3-8.推論用エンドポイントの削除

最後に、推論用エンドポイントを削除します。 使わないエンドポイントは忘れずに削除しましょう。 (S3にモデルがあればまた立ち上げられるので)

'''
API用のエンドポイントを削除します。
'''

def delete_endpoint(predictor):
    try:
        boto3.client('sagemaker').delete_endpoint(EndpointName=predictor.endpoint)
        print('Deleted {}'.format(predictor.endpoint))
    except:
        print('Already deleted: {}'.format(predictor.endpoint))

delete_endpoint(predictor)
3-9.まとめ

k-NNの学習と推論エンドポイントの作成方法について学びました。 今回は特にチューニングしなかったのですが、検証データの精度が92.2%という結果になりました。 更に良い結果(処理時間、レイテンシー等)を得るために、ここから更に深掘りする場合は本チュートリアルの「Part2」をやってみると良さそうです。

4.気になったところ

このブログを書いていて、気になった部分についてまとめておきます。

4-1.トレーニングフェーズについて

k-NNは「トレーニングが不要なアルゴリズム(毎回全データセットと距離計算する)」だと私は今まで認識していたのに、今回トレーニングフェーズがあって困惑しました。 ですが、awsのドキュメント1awsのドキュメント2を読んで、トレーニングフェーズで何をしているかを理解することができました。 時間がある方は上記のドキュメントを読んでいただきたいのですが、要約するとトレーニングフェーズでは下記の3ステップの処理をしているようです。 (次元削減はデフォルトだとしないので、サブサンプリング、インデックス作成が大きなところだと思います)

a.サブサンプリング

投入される全データを利用するのではなく、ハイパーパラメータの「sample_size」で指定されたデータサイズだけを実際には利用します。 これは、全データを利用すると処理の負担が大きく、またデータサイズが大きすぎる場合は「全データを利用しなくても十分な精度がでる」ためです。

b.次元削減

メモリの負荷、推論時のレイテンシーを削減するために次元削減を実施します。 データの次元が大きすぎると、「次元の呪い」にかかってしまうため、次元数が1000を超えているようなら次元削減を実施することが一般的です。

c.インデックス作成

推論時に利用するインデックスを作成します。 推論時には、投入されたデータ-インデックス間での計算をすることで、距離計算をするデータポイントを絞り込みます。 このインデックスはクラスラベルごとに計算されています。

4-2.input,outputデータ仕様

トレーニング、推論時のデータ仕様についてです。
トレーニング時のinput仕様
推論時のinput,output仕様
詳細は上記をご確認いただきたいのですが、下記に各仕様について記述します。

a.トレーニング時のinput

「text/csv」,「application/x-recordio-protobuf」のいずれかを選択可能です。
今回の検証は「text/csv」で実施しています。
(下記は各仕様の例です)

・text/csv

4,1.2,1.3,9.6,20.3

・application/x-recordio-protobuf

[
    Record = {
        features = {
            'values': {
                values: [1.2, 1.3, 9.6, 20.3]  # float32
            }
        },
        label = {
            'values': {
                values: [4]  # float32
            }
        }
    }
] 


}
b.推論時のinput

「text/csv」、「application/json」、「application/jsonlines」、「application/x-recordio-protobuf」のいずれかを選択可能です。 今回の検証は「application/json」で実施しています。

・text/csv

1.2,1.3,9.6,20.3

・application/json

{
  "instances": [
    {"data": {"features": {"values": [-3, -1, -4, 2]}}},
    {"features": [3.0, 0.1, 0.04, 0.002]}]
}

・application/jsonlines

{"features": [1.5, 16.0, 14.0, 23.0]}
{"data": {"features": {"values": [1.5, 16.0, 14.0, 23.0]}}

・application/x-recordio-protobuf

[
    Record = {
        features = {
            'values': {
                values: [-3, -1, -4, 2]  # float32
            }
        },
        label = {}
    },
    Record = {
        features = {
            'values': {
                values: [3.0, 0.1, 0.04, 0.002]  # float32
            }
        },
        label = {}
    },
]
c.推論時のoutput

「application/json」、「application/jsonlines」、「application/x-recordio-protobuf」のいずれかを選択可能です。 また、「application/json」、「application/x-recordio-protobuf」の際は「verbose」オプションを指定して冗長な返り値を受け取ることができます。 (「verbose」オプションを追加すると、「直近3個のデータポイントの距離とラベル」を返してくれます)

今回の検証は「application/json」で実施しています。

・application/json

{
  "predictions": [
    {"predicted_label": 0.0},
    {"predicted_label": 2.0}
  ]
}

・application/jsonlines

{"predicted_label": 0.0}
{"predicted_label": 2.0}

・application/json; verbose=true

{
  "predictions": [
    {
        "predicted_label": 0.0,
        "distances": [3.11792408, 3.89746071, 6.32548437],
        "labels": [0.0, 1.0, 0.0]
    },
    {
        "predicted_label": 2.0,
        "distances": [1.08470316, 3.04917915, 5.25393973],
        "labels": [2.0, 2.0, 0.0]
    }
  ]
}

・application/x-recordio-protobuf

[
    Record = {
        features = {},
        label = {
            'predicted_label': {
                values: [0.0]  # float32
            }
        }
    },
    Record = {
        features = {},
        label = {
            'predicted_label': {
                values: [2.0]  # float32
            }
        }
    }
]

・application/x-recordio-protobuf; verbose=true

[
    Record = {
        features = {},
        label = {
            'predicted_label': {
                values: [0.0]  # float32
            },
            'distances': {
                values: [3.11792408, 3.89746071, 6.32548437]  # float32
            },
            'labels': {
                values: [0.0, 1.0, 0.0]  # float32
            }
        }
    },
    Record = {
        features = {},
        label = {
            'predicted_label': {
                values: [0.0]  # float32
            },
            'distances': {
                values: [1.08470316, 3.04917915, 5.25393973]  # float32
            },
            'labels': {
                values: [2.0, 2.0, 0.0]  # float32
            }
        }
    }
]
4-3.ハイパーパラメータ

こちらもAWSのドキュメントの通りですが、一応表にしておきます。

ハイパーパラメータ 必須 意味 デフォルト 備考
feature_dim INPUTデータの特徴量 - 正の整数
mini_batch_size - ミニバッチのサイズ 5,000 正の整数
k 何個の近くのデータポイントから推論するか - 正の整数
predictor_type 回帰にするか、分類にするか - 「classifier」か「regressor」
sample_size サブサンプリングサイズ - 正の整数にする
dimension_reduction_type - 次元削減アルゴリズム 次元削減をしない 「sign」、「fjlt」のいずれか
dimension_reduction_target 注1 次元削減対象のカラム - 0~feature_dimの間の整数
index_type - インデックスの種類 faiss.Flat 「faiss.Flat」, 「faiss.IVFFlat」, 「faiss.IVFPQ.」のいずれか
index_metric - 距離計算の方法 L2 「L2」, 「INNER_PRODUCT」, 「COSINE」のいずれか
faiss_index_ivf_nlists 注2 セントロイドの数 「auto」(sample_sizeの平方根) 正の整数
faiss_index_pq_m -注3 インデックスに保持されるベクトルの数 - 1, 2, 3, 4, 8, 12, 16, 20, 24, 28, 32, 40, 48, 56, 64, 96のいずれか

注1:「dimension_reduction_type」を指定した場合のみ
注2:「index_type」が「aiss.IVFFlat」、「faiss.IVFPQ.」のいずれかの場合のみ
注3:「index_type」が「faiss.IVFPQ」の場合のみ指定可能

5.引用

k-NNについて
今回実行した分析内容について
推論時のinput,output仕様
sagemakerにおけるk-NNの処理内容について
sagemakerにおけるk-NNの処理内容について2
トレーニング時のinput仕様