Amazon SageMakerでデータの偏りを考慮した線形学習 – Amazon SageMaker Advent Calendar 2018
こんにちは、大阪DI部の大澤です。 この記事は「クラスメソッド Amazon SageMaker Advent Calendar」の15日目の記事です。
今回は「データの偏りを考慮した線形学習」についてご紹介します。 SageMakerの組み込みアルゴリズムである線形学習者(Linear Learner)を用いて、偏りがあるデータセットをどう補正して学習するかという内容のノートブックをやってみます。
やってみる
公平性の定義
特徴量の中に2つの値を持つセンシティブなもの(例:女と男)を含むような、二値分類の問題を考えます。 Equal Opportunityは次のように定義されます。
[latex] \mathbb{P}_{(x,y)} \big[ f(x)>0 \, \big| \, x \text{ in group } A, y = 1 \big] = \mathbb{P}_{(x,y)} \big[ f(x)>0 \, \big| \, x \text{ in group } B, y = 1 \big] [/latex]
- f(x) = 学習済みモデル
- A : 女性
- B : 男性
- P(x, y) : 条件にあった全ての(x, y)の組み合わせによる確率
Equal Opportunityは2つのグループ(AとB)における真陽性率(True Positive Rate, TPR)が同じということです。
Equal OpportunityからDifference in Equal Opportunity(DEO)は次のようになります。
[latex] DEO(f) = \Big| \mathbb{P}_{(x,y)}\big[ f(x)>0 \, \big| \, x \text{ in group } A, y = 1 \big] - \mathbb{P}_{(x,y)} \big[ f(x)>0 \, \big| \, x \text{ in group } B, y = 1 \big] \Big| [/latex]
DEOが低い値であるほど2つのグループ(AとB)を持つセンシティブな特徴量においてfは公平なモデルだと言えます。
DEOを算出する関数を定義しておきます。
from sklearn.metrics import confusion_matrix def deo_from_list(dataset, predictions, groupA_idxs, groupB_idxs): tnA, fpA, fnA, tpA = confusion_matrix(np.where(dataset[groupA_idxs][:,-1] == 1, 1, 0), predictions[groupA_idxs]).ravel() tnB, fpB, fnB, tpB = confusion_matrix(np.where(dataset[groupB_idxs][:,-1] == 1, 1, 0), predictions[groupB_idxs]).ravel() print('Examples in group A: %d' % len(groupA_idxs)) print('Examples in group B: %d' % len(groupB_idxs)) print('TPR group A: %f' % (float(tpA) / (tpA + fnA))) print('TPR group B: %f' % (float(tpB) / (tpB + fnB))) print('TPR all dataset: %f' % (float(tpA + tpB) / (tpA + fnA + tpB + fnB))) print('Accuracy all dataset: %f' % (float(tpA + tnA + tpB + tnB) / (tpA + fpA + fnA + tnA + tpB + fpB + fnB + tnB))) return(np.abs(float(tpA) / (tpA + fnA) - float(tpB) / (tpB + fnB)))
モデルから評価指標であるDEOを求めることが出来ると便利なので、その関数も定義しておきます。
def deo_from_model(model, dataset, sensitive_feature, groupA_value=None): if groupA_value == None: groupA_value = np.max(dataset[:, sensitive_feature]) groupA_idxs = [idx for idx, val in enumerate(dataset) if val[sensitive_feature] == groupA_value] groupB_idxs = [idx for idx, val in enumerate(dataset) if val[sensitive_feature] != groupA_value] predictions = [] for array in np.array_split(dataset[:,:-1], 100): result = model.predict(array) predictions += [r['predicted_label'] for r in result['predictions']] predictions = np.array(predictions) return deo_from_list(dataset, predictions, groupA_idxs, groupB_idxs)
データの保存場所とIAMロールの定義
学習データやモデルなどを保存する場所の指定と学習やエンドポイントの展開を行うIAMロールを指定します。
from sagemaker import Session # S3のバケット名と保存時のプレフィックスを指定する bucket = Session().default_bucket() #'fairness-test2' prefix = 'sagemaker/DEMO-linear-adult' # Define IAM role from sagemaker import get_execution_role import pandas as pd import numpy as np import urllib import os import sklearn.preprocessing as preprocessing import seaborn as sns # IAMロールを指定する role = get_execution_role()
データセットの取得
UCI Machine Learning Repositoryの Adult というデータセットを使うため、リポジトリからダウンロードしてきます。
if not os.path.isfile('adult.data'): urllib.request.urlretrieve('https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.data', 'adult.data') print('adult.data saved!') else: print('adult.data already here.') if not os.path.isfile('adult.test'): urllib.request.urlretrieve('https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.test', 'adult.test') print('adult.test saved!') else: print('adult.test already here.')
データセットが持つ属性と取りうる値は以下の通りです。
Age | 連続値 |
---|---|
Workclass | Private, Self-emp-not-inc, Self-emp-inc, Federal-gov, Local-gov, State-gov, Without-pay, Never-worked. |
Fnlwgt | 連続値 |
Education | Bachelors, Some-college, 11th, HS-grad, Prof-school, Assoc-acdm, Assoc-voc, 9th, 7th-8th, 12th, Masters, 1st-4th, 10th, Doctorate, 5th-6th, Preschool. |
Education-num | 連続値 |
Marital-status | Married-civ-spouse, Divorced, Never-married, Separated, Widowed, Married-spouse-absent, Married-AF-spouse. |
Occupation | Tech-support, Craft-repair, Other-service, Sales, Exec-managerial, Prof-specialty, Handlers-cleaners, Machine-op-inspct, Adm-clerical, Farming-fishing, Transport-moving, Priv-house-serv, Protective-serv, Armed-Forces. |
Relationship | Wife, Own-child, Husband, Not-in-family, Other-relative, Unmarried. |
Ethnic group | White, Asian-Pac-Islander, Amer-Indian-Eskimo, Other, Black. |
Gender | Female, Male. |
Capital-gain | 連続値 |
Capital-loss | 連続値 |
Hours-per-week | 連続値 |
Native-country | United-States, Cambodia, England, Puerto-Rico, Canada, Germany, Outlying-US(Guam-USVI-etc), India, Japan, Greece, South, China, Cuba, Iran, Honduras, Philippines, Italy, Poland, Jamaica, Vietnam, Mexico, Portugal, Ireland, France, Dominican-Republic, Laos, Ecuador, Taiwan, Haiti, Columbia, Hungary, Guatemala, Nicaragua, Scotland, Thailand, Yugoslavia, El-Salvador, Trinadad&Tobago, Peru, Hong, Holand-Netherlands. |
head adult.data
目的変数は次の通りです。年間で$50000より多く稼いでるか否かです。
Target | <=50, >50 |
---|
データの確認
各属性のヒストグラムを描画し、どういったデータなのかを確認します。
import matplotlib.pyplot as plt from math import ceil positive_idxs = [idx for idx, val in enumerate(original_data['Target']) if val == ">50K"] fig = plt.figure(figsize=(20,100)) cols = 2 rows = ceil(float(original_data.shape[1]) / cols) * 2 for i, column in enumerate(original_data.columns): ax = fig.add_subplot(rows, cols, 2 * i + 1) ax.set_title(column) if original_data.dtypes[column] == np.object: original_data[column][:].value_counts(sort=True).plot(kind="bar", axes=ax) else: original_data[column][:].hist(axes=ax) plt.xticks(rotation="vertical") ax = fig.add_subplot(rows, cols, 2 * i + 2) ax.set_title(column + " (only positive examples)") if original_data.dtypes[column] == np.object: original_data[column][positive_idxs].value_counts(sort=True).plot(kind="bar", axes=ax) else: original_data[column][positive_idxs].hist(axes=ax) plt.xticks(rotation="vertical") plt.subplots_adjust(hspace=0.7, wspace=0.2)
こんな感じで描画されます。 (多いので Age と Workclassのみ表示してますが、実際には各属性について描画されるはずです。)
データの前処理
学習や推論に使えるデータになるように各特徴量を変換します。
# カテゴリ変数を数値に変換する def number_encode_features(df): result = df.copy() encoders = {} for column in result.columns: if result.dtypes[column] == np.object: encoders[column] = preprocessing.LabelEncoder() # print('Column:', column, result[column]) result[column] = encoders[column].fit_transform(result[column].fillna('None')) return result, encoders encoded_data, _ = number_encode_features(original_data) training_data_matrix = np.array(encoded_data.values, dtype=float) encoded_data, _ = number_encode_features(original_test) test_data_matrix = np.array(encoded_data.fillna(0).values, dtype=float) # 各値を標準化します scaler = preprocessing.MinMaxScaler(feature_range=(0.0, 1.0)) training_data_matrix = scaler.fit_transform(training_data_matrix) test_data_matrix = scaler.transform(test_data_matrix)
データの変換とアップロード
学習用のフォーマットにデータを変換し、S3へアップロードします。
import io import numpy as np import sagemaker.amazon.common as smac import boto3 import os vectors = np.array([t.tolist() for t in training_data_matrix[:,:-1]]).astype('float32') labels = np.where(np.array([t.tolist() for t in training_data_matrix[:,-1]]) == 1, 1, 0).astype('float32') buf = io.BytesIO() smac.write_numpy_to_dense_tensor(buf, vectors, labels) buf.seek(0) key = 'recordio-pb-data' boto3.resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'train', key)).upload_fileobj(buf) s3_train_data = 's3://{}/{}/train/{}'.format(bucket, prefix, key) print('uploaded training data location: {}'.format(s3_train_data))
通常の線形モデル
学習
通常の線形モデルをml.c4.xlargeのインスタンス1台で学習させます。
from sagemaker.amazon.amazon_estimator import get_image_uri import sagemaker # モデルの保存場所 output_location = 's3://{}/{}/output'.format(bucket, prefix) print('training artifacts will be uploaded to: {}'.format(output_location)) # コンテナイメージの取得 container = get_image_uri(boto3.Session().region_name, 'linear-learner', "latest") # estimatorの設定 sess = sagemaker.Session() linear = sagemaker.estimator.Estimator(container, role, train_instance_count=1, train_instance_type='ml.c4.xlarge', output_path=output_location, sagemaker_session=sess) linear.set_hyperparameters(feature_dim=14, # 特徴量の次元数 predictor_type='binary_classifier', # 今回は二値分類 mini_batch_size=200) # バッチサイズ(一度の学習時のデータ数) # 学習開始 linear.fit({'train': s3_train_data})
モデルのデプロイ
モデルをホストするエンドポイントを作成して、モデルをデプロイします。
from sagemaker.predictor import csv_serializer, json_deserializer linear_predictor = linear.deploy(initial_instance_count=1, instance_type='ml.m4.xlarge') linear_predictor.content_type = 'text/csv' linear_predictor.serializer = csv_serializer linear_predictor.deserializer = json_deserializer
精度の検証
テストデータで予測した場合にどれだけの精度なのかを調べます。
import numpy as np predictions = [] distance_from_hyperplane = [] for array in np.array_split(test_data_matrix[:,:-1], 100): result = linear_predictor.predict(array) predictions += [r['predicted_label'] for r in result['predictions']] distance_from_hyperplane += [r['score'] for r in result['predictions']] predictions_test = np.array(predictions) distance_from_hyperplane_test = np.array(distance_from_hyperplane) import pandas as pd pd.crosstab(np.where(test_data_matrix[:,-1] == 1, 1, 0), predictions_test, rownames=['actuals'], colnames=['predictions'])
sensitive_feature_gender = 9 # Gender groupA_value = 0.0 # Female deo = deo_from_model(linear_predictor, test_data_matrix, sensitive_feature_gender, groupA_value=groupA_value) print('DEO: %f' % deo)
この結果を見るとDEOは0.24となっています。これは2つのグループのTruePositive率(TP)の差が0.24もあるということです。グループBは40%近いTPに対して、グループAは約16%程度しかありません。
この偏りをどうにかすることが今回の目的です。
偏りを考慮した線形学習モデル
まず、 u というベクトルを導入します。
[latex] u = \frac{1}{n(+, A)} \sum_{x \in X(+, A)} x - \frac{1}{n(+, B)} \sum_{x \in X(+, B)} x [/latex]
- X(+, A)とX(+, B) はグループA,BのラベルがPositiveとなるデータの集合
- n(+, A)とn(+, B)はそれぞれの集合の要素数(濃度)
ベクトルuは2つのグループ(AとB)のそれぞれの特徴量の平均の差で、グループごとの偏りを表しています。
元となる特徴量を次のように変換することで2つのグループの偏りを減らします。
[latex] \hat{x}_j = x_j - x_i \frac{u_j}{u_i} \,\,\,\, j \in \{1, \dots, i-1, i+1, \dots, d\} [/latex]
iは特徴量の添字番号で、 ^x_jはx_jに対応する新しい特徴量です。このときの注意点として、特徴量は1つ少なくなってしまいます。
グループの偏りを減らすための処理を次のように関数化します。
class UncorrelationMethod: def __init__(self, sensitive_feature, groupA_value=0.0): self.val0 = groupA_value self.sensitive_feature = sensitive_feature self.u = None def new_representation(self, examples): if self.u is None: print('You have to fit the model first...') return examples new_examples = np.array([ex if ex[self.sensitive_feature] == self.val0 # case x_i = 0, Group A else ex + self.u for ex in examples]) # case x_i = 1, Group B new_examples = np.delete(new_examples, self.sensitive_feature, 1) return new_examples def fit(self, dataset): tmp = [ex for idx, ex in enumerate(dataset) if dataset[idx, -1] == 1 and ex[self.sensitive_feature] == self.val0] average_A_1 = np.mean(tmp, 0) n_A_1 = len(tmp) tmp = [ex for idx, ex in enumerate(dataset) if dataset[idx, -1] == 1 and ex[self.sensitive_feature] != self.val0] average_not_A_1 = np.mean(tmp, 0) n_not_A_1 = len(tmp) N_1 = len([ex for idx, ex in enumerate(dataset) if dataset[idx, -1] == 1]) self.u = average_A_1[:-1] - average_not_A_1[:-1] # Our hypothesis of values 0 (A) and +1 (B) for the sensitive feature among the two groups # has the following consequence: self.u[self.sensitive_feature] = -1.0
先ほど定義した関数で特徴量に変換処理を施します。
uncorr_data = UncorrelationMethod(sensitive_feature_gender, 0.0) uncorr_data.fit(training_data_matrix) new_training_data_matrix = np.hstack([uncorr_data.new_representation(training_data_matrix[:, :-1]), training_data_matrix[:, -1:-2:-1]]) new_test_data_matrix = np.hstack([uncorr_data.new_representation(test_data_matrix[:, :-1]), test_data_matrix[:, -1:-2:-1]])
学習
変換後のデータセットで線形モデルの学習を行います。学習終了後にはモデルをデプロイします。
vectors = np.array([t.tolist() for t in new_training_data_matrix[:,:-1]]).astype('float32') labels = np.where(np.array([t.tolist() for t in new_training_data_matrix[:,-1]]) == 1, 1, 0).astype('float32') buf = io.BytesIO() smac.write_numpy_to_dense_tensor(buf, vectors, labels) buf.seek(0) key = 'recordio-pb-data' boto3.resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'train', key)).upload_fileobj(buf) s3_train_data = 's3://{}/{}/train/{}'.format(bucket, prefix, key) print('uploaded training data location: {}'.format(s3_train_data)) output_location = 's3://{}/{}/output'.format(bucket, prefix) print('training artifacts will be uploaded to: {}'.format(output_location)) linearf = sagemaker.estimator.Estimator(container, role, train_instance_count=1, train_instance_type='ml.c4.xlarge', output_path=output_location, sagemaker_session=sess) linearf.set_hyperparameters(feature_dim=13, predictor_type='binary_classifier', mini_batch_size=200) path_fair_model = linearf.fit({'train': s3_train_data}) linear_predictorf = linearf.deploy(initial_instance_count=1, instance_type='ml.m4.xlarge') linear_predictorf.content_type = 'text/csv' linear_predictorf.serializer = csv_serializer linear_predictorf.deserializer = json_deserializer
精度の検証
predictions = [] distance_from_hyperplane = [] for array in np.array_split(new_test_data_matrix[:,:-1], 100): result = linear_predictorf.predict(array) predictions += [r['predicted_label'] for r in result['predictions']] distance_from_hyperplane += [r['score'] for r in result['predictions']] distance_from_hyperplane_test_fair = np.array(distance_from_hyperplane) predictions_test_fair = np.array(predictions) pd.crosstab(np.where(new_test_data_matrix[:,-1] == 1, 1, 0), predictions_test_fair, rownames=['actuals'], colnames=['predictions'])
groupA_idxs = [idx for idx, val in enumerate(test_data_matrix) if val[sensitive_feature_gender] == groupA_value] groupB_idxs = [idx for idx, val in enumerate(test_data_matrix) if val[sensitive_feature_gender] != groupA_value] deo = deo_from_list(new_test_data_matrix, predictions_test_fair, groupA_idxs, groupB_idxs) print('DEO: %f' % deo)
全体の精度がわずかに下がることになりましたが、DEOが約6%とかなり低下しました。
学習データで推論をしてみて、過学習してないかも確認します。
predictions = [] distance_from_hyperplane = [] for array in np.array_split(new_training_data_matrix[:,:-1], 100): result = linear_predictorf.predict(array) predictions += [r['predicted_label'] for r in result['predictions']] distance_from_hyperplane += [r['score'] for r in result['predictions']] distance_from_hyperplane_train_fair = np.array(distance_from_hyperplane) predictions_train_fair = np.array(predictions) pd.crosstab(np.where(new_training_data_matrix[:,-1] == 1, 1, 0), predictions_train_fair, rownames=['actuals'], colnames=['predictions']) groupA_idxs = [idx for idx, val in enumerate(training_data_matrix) if val[sensitive_feature_gender] == groupA_value] groupB_idxs = [idx for idx, val in enumerate(training_data_matrix) if val[sensitive_feature_gender] != groupA_value] deo = deo_from_list(new_training_data_matrix, predictions_train_fair, groupA_idxs, groupB_idxs) print('DEO: %f' % deo)
テスト用データに比べて圧倒的に学習用データの精度が高い訳ではないので、問題なさそうです。
データの分布の確認
元となるデータセットと偏りを修正したデータセットそれぞれのデータ分布を描画して見ます。
import matplotlib.pyplot as plt sensitive_feature = sensitive_feature_gender SMALL_SIZE = 12 MEDIUM_SIZE = 12 BIGGER_SIZE = 16 bins = 30 X = training_data_matrix[:, :-1] y = training_data_matrix[:, -1] Xte = test_data_matrix[:, :-1] yte = test_data_matrix[:, -1] ypos = np.max(y) yneg = np.min(y) idx_group_A1 = [idx for idx, v in enumerate(Xte) if v[sensitive_feature] == groupA_value and yte[idx] == ypos] idx_group_B1 = [idx for idx, v in enumerate(Xte) if v[sensitive_feature] != groupA_value and yte[idx] == ypos] titles = ['Adult Dataset - TPR Area - Linear', 'Adult Dataset - TPR Area - Fair Linear'] for i,distance_from_hyperplane in enumerate([distance_from_hyperplane_test, distance_from_hyperplane_test_fair]): distance_from_hyperplane = distance_from_hyperplane - 0.5 xmin = np.min([np.min(distance_from_hyperplane[idx_group_A1]), np.min(distance_from_hyperplane[idx_group_B1])]) xmax = np.max([np.max(distance_from_hyperplane[idx_group_A1]), np.max(distance_from_hyperplane[idx_group_B1])]) fig, ax = plt.subplots(figsize=(8, 6), dpi=90) plt.rc('font', size=SMALL_SIZE) # controls default text sizes plt.rc('axes', titlesize=SMALL_SIZE) # fontsize of the axes title plt.rc('axes', labelsize=MEDIUM_SIZE) # fontsize of the x and y labels plt.rc('xtick', labelsize=SMALL_SIZE) # fontsize of the tick labels plt.rc('ytick', labelsize=SMALL_SIZE) # fontsize of the tick labels plt.rc('legend', fontsize=MEDIUM_SIZE) # legend fontsize #plt.rc('figure', titlesize=MEDIUM_SIZE) # fontsize of the figure title pdf, bins, patches = ax.hist(distance_from_hyperplane[idx_group_A1], bins=bins, density=True, stacked=True, label='A=Female, Y=1', alpha=1.0) ax.hist(distance_from_hyperplane[idx_group_B1], bins=bins, density=True, stacked=True, label='B=Male, Y=1', alpha=0.5) ax.legend(loc='upper left') ax.set_xlim(left=0.0, right=xmax) ax.set_ylim(0, 3) plt.title(titles[i]) plt.show()
エンドポイントの削除
最後に無駄な課金が発生しないように不要になったエンドポイントを削除します。
import sagemaker sagemaker.Session().delete_endpoint(linear_predictor.endpoint) sagemaker.Session().delete_endpoint(linear_predictorf.endpoint)
さいごに
今回は 「クラスメソッド Amazon SageMaker Advent Calendar」 の15日目として、 「Amazon SageMakerで偏りを考慮した線形学習」 についてお送りしました。今回扱ったデータセットのようにカテゴリによってデータに偏りがあるケースは多いと思います。そういった際のソリューションの1つとして知れて良かったな、と個人的に思いました。
お読みくださりありがとうございました〜!明日もお楽しみに〜!