Amazon SageMakerでデータの偏りを考慮した線形学習 – Amazon SageMaker Advent Calendar 2018
こんにちは、大阪DI部の大澤です。 この記事は「クラスメソッド Amazon SageMaker Advent Calendar」の15日目の記事です。
今回は「データの偏りを考慮した線形学習」についてご紹介します。 SageMakerの組み込みアルゴリズムである線形学習者(Linear Learner)を用いて、偏りがあるデータセットをどう補正して学習するかという内容のノートブックをやってみます。
特徴量の中に2つの値を持つセンシティブなもの(例:女と男)を含むような、二値分類の問題を考えます。 Equal Opportunityは次のように定義されます。
[latex] \mathbb{P}_{(x,y)} \big[ f(x)>0 \, \big| \, x \text{ in group } A, y = 1 \big] = \mathbb{P}_{(x,y)} \big[ f(x)>0 \, \big| \, x \text{ in group } B, y = 1 \big] [/latex]
- f(x) = 学習済みモデル
- A : 女性
- B : 男性
- P(x, y) : 条件にあった全ての(x, y)の組み合わせによる確率
Equal Opportunityは2つのグループ(AとB)における真陽性率(True Positive Rate, TPR)が同じということです。
Equal OpportunityからDifference in Equal Opportunity(DEO)は次のようになります。
[latex] DEO(f) = \Big| \mathbb{P}_{(x,y)}\big[ f(x)>0 \, \big| \, x \text{ in group } A, y = 1 \big] - \mathbb{P}_{(x,y)} \big[ f(x)>0 \, \big| \, x \text{ in group } B, y = 1 \big] \Big| [/latex]
from sklearn.metrics import confusion_matrix def deo_from_list(dataset, predictions, groupA_idxs, groupB_idxs): tnA, fpA, fnA, tpA = confusion_matrix(np.where(dataset[groupA_idxs][:,-1] == 1, 1, 0), predictions[groupA_idxs]).ravel() tnB, fpB, fnB, tpB = confusion_matrix(np.where(dataset[groupB_idxs][:,-1] == 1, 1, 0), predictions[groupB_idxs]).ravel() print('Examples in group A: %d' % len(groupA_idxs)) print('Examples in group B: %d' % len(groupB_idxs)) print('TPR group A: %f' % (float(tpA) / (tpA + fnA))) print('TPR group B: %f' % (float(tpB) / (tpB + fnB))) print('TPR all dataset: %f' % (float(tpA + tpB) / (tpA + fnA + tpB + fnB))) print('Accuracy all dataset: %f' % (float(tpA + tnA + tpB + tnB) / (tpA + fpA + fnA + tnA + tpB + fpB + fnB + tnB))) return(np.abs(float(tpA) / (tpA + fnA) - float(tpB) / (tpB + fnB)))
def deo_from_model(model, dataset, sensitive_feature, groupA_value=None): if groupA_value == None: groupA_value = np.max(dataset[:, sensitive_feature]) groupA_idxs = [idx for idx, val in enumerate(dataset) if val[sensitive_feature] == groupA_value] groupB_idxs = [idx for idx, val in enumerate(dataset) if val[sensitive_feature] != groupA_value] predictions = [] for array in np.array_split(dataset[:,:-1], 100): result = model.predict(array) predictions += [r['predicted_label'] for r in result['predictions']] predictions = np.array(predictions) return deo_from_list(dataset, predictions, groupA_idxs, groupB_idxs)
from sagemaker import Session # S3のバケット名と保存時のプレフィックスを指定する bucket = Session().default_bucket() #'fairness-test2' prefix = 'sagemaker/DEMO-linear-adult' # Define IAM role from sagemaker import get_execution_role import pandas as pd import numpy as np import urllib import os import sklearn.preprocessing as preprocessing import seaborn as sns # IAMロールを指定する role = get_execution_role()
UCI Machine Learning Repositoryの Adult というデータセットを使うため、リポジトリからダウンロードしてきます。
if not os.path.isfile('adult.data'): urllib.request.urlretrieve('https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.data', 'adult.data') print('adult.data saved!') else: print('adult.data already here.') if not os.path.isfile('adult.test'): urllib.request.urlretrieve('https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.test', 'adult.test') print('adult.test saved!') else: print('adult.test already here.')
Age | 連続値 |
Workclass | Private, Self-emp-not-inc, Self-emp-inc, Federal-gov, Local-gov, State-gov, Without-pay, Never-worked. |
Fnlwgt | 連続値 |
Education | Bachelors, Some-college, 11th, HS-grad, Prof-school, Assoc-acdm, Assoc-voc, 9th, 7th-8th, 12th, Masters, 1st-4th, 10th, Doctorate, 5th-6th, Preschool. |
Education-num | 連続値 |
Marital-status | Married-civ-spouse, Divorced, Never-married, Separated, Widowed, Married-spouse-absent, Married-AF-spouse. |
Occupation | Tech-support, Craft-repair, Other-service, Sales, Exec-managerial, Prof-specialty, Handlers-cleaners, Machine-op-inspct, Adm-clerical, Farming-fishing, Transport-moving, Priv-house-serv, Protective-serv, Armed-Forces. |
Relationship | Wife, Own-child, Husband, Not-in-family, Other-relative, Unmarried. |
Ethnic group | White, Asian-Pac-Islander, Amer-Indian-Eskimo, Other, Black. |
Gender | Female, Male. |
Capital-gain | 連続値 |
Capital-loss | 連続値 |
Hours-per-week | 連続値 |
Native-country | United-States, Cambodia, England, Puerto-Rico, Canada, Germany, Outlying-US(Guam-USVI-etc), India, Japan, Greece, South, China, Cuba, Iran, Honduras, Philippines, Italy, Poland, Jamaica, Vietnam, Mexico, Portugal, Ireland, France, Dominican-Republic, Laos, Ecuador, Taiwan, Haiti, Columbia, Hungary, Guatemala, Nicaragua, Scotland, Thailand, Yugoslavia, El-Salvador, Trinadad&Tobago, Peru, Hong, Holand-Netherlands. |
head adult.data
Target | <=50, >50 |
import matplotlib.pyplot as plt from math import ceil positive_idxs = [idx for idx, val in enumerate(original_data['Target']) if val == ">50K"] fig = plt.figure(figsize=(20,100)) cols = 2 rows = ceil(float(original_data.shape[1]) / cols) * 2 for i, column in enumerate(original_data.columns): ax = fig.add_subplot(rows, cols, 2 * i + 1) ax.set_title(column) if original_data.dtypes[column] == np.object: original_data[column][:].value_counts(sort=True).plot(kind="bar", axes=ax) else: original_data[column][:].hist(axes=ax) plt.xticks(rotation="vertical") ax = fig.add_subplot(rows, cols, 2 * i + 2) ax.set_title(column + " (only positive examples)") if original_data.dtypes[column] == np.object: original_data[column][positive_idxs].value_counts(sort=True).plot(kind="bar", axes=ax) else: original_data[column][positive_idxs].hist(axes=ax) plt.xticks(rotation="vertical") plt.subplots_adjust(hspace=0.7, wspace=0.2)
こんな感じで描画されます。 (多いので Age と Workclassのみ表示してますが、実際には各属性について描画されるはずです。)
# カテゴリ変数を数値に変換する def number_encode_features(df): result = df.copy() encoders = {} for column in result.columns: if result.dtypes[column] == np.object: encoders[column] = preprocessing.LabelEncoder() # print('Column:', column, result[column]) result[column] = encoders[column].fit_transform(result[column].fillna('None')) return result, encoders encoded_data, _ = number_encode_features(original_data) training_data_matrix = np.array(encoded_data.values, dtype=float) encoded_data, _ = number_encode_features(original_test) test_data_matrix = np.array(encoded_data.fillna(0).values, dtype=float) # 各値を標準化します scaler = preprocessing.MinMaxScaler(feature_range=(0.0, 1.0)) training_data_matrix = scaler.fit_transform(training_data_matrix) test_data_matrix = scaler.transform(test_data_matrix)
import io import numpy as np import sagemaker.amazon.common as smac import boto3 import os vectors = np.array([t.tolist() for t in training_data_matrix[:,:-1]]).astype('float32') labels = np.where(np.array([t.tolist() for t in training_data_matrix[:,-1]]) == 1, 1, 0).astype('float32') buf = io.BytesIO() smac.write_numpy_to_dense_tensor(buf, vectors, labels) buf.seek(0) key = 'recordio-pb-data' boto3.resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'train', key)).upload_fileobj(buf) s3_train_data = 's3://{}/{}/train/{}'.format(bucket, prefix, key) print('uploaded training data location: {}'.format(s3_train_data))
from sagemaker.amazon.amazon_estimator import get_image_uri import sagemaker # モデルの保存場所 output_location = 's3://{}/{}/output'.format(bucket, prefix) print('training artifacts will be uploaded to: {}'.format(output_location)) # コンテナイメージの取得 container = get_image_uri(boto3.Session().region_name, 'linear-learner', "latest") # estimatorの設定 sess = sagemaker.Session() linear = sagemaker.estimator.Estimator(container, role, train_instance_count=1, train_instance_type='ml.c4.xlarge', output_path=output_location, sagemaker_session=sess) linear.set_hyperparameters(feature_dim=14, # 特徴量の次元数 predictor_type='binary_classifier', # 今回は二値分類 mini_batch_size=200) # バッチサイズ(一度の学習時のデータ数) # 学習開始 linear.fit({'train': s3_train_data})
from sagemaker.predictor import csv_serializer, json_deserializer linear_predictor = linear.deploy(initial_instance_count=1, instance_type='ml.m4.xlarge') linear_predictor.content_type = 'text/csv' linear_predictor.serializer = csv_serializer linear_predictor.deserializer = json_deserializer
import numpy as np predictions = [] distance_from_hyperplane = [] for array in np.array_split(test_data_matrix[:,:-1], 100): result = linear_predictor.predict(array) predictions += [r['predicted_label'] for r in result['predictions']] distance_from_hyperplane += [r['score'] for r in result['predictions']] predictions_test = np.array(predictions) distance_from_hyperplane_test = np.array(distance_from_hyperplane) import pandas as pd pd.crosstab(np.where(test_data_matrix[:,-1] == 1, 1, 0), predictions_test, rownames=['actuals'], colnames=['predictions'])
sensitive_feature_gender = 9 # Gender groupA_value = 0.0 # Female deo = deo_from_model(linear_predictor, test_data_matrix, sensitive_feature_gender, groupA_value=groupA_value) print('DEO: %f' % deo)
まず、 u というベクトルを導入します。
[latex] u = \frac{1}{n(+, A)} \sum_{x \in X(+, A)} x - \frac{1}{n(+, B)} \sum_{x \in X(+, B)} x [/latex]
- X(+, A)とX(+, B) はグループA,BのラベルがPositiveとなるデータの集合
- n(+, A)とn(+, B)はそれぞれの集合の要素数(濃度)
[latex] \hat{x}_j = x_j - x_i \frac{u_j}{u_i} \,\,\,\, j \in \{1, \dots, i-1, i+1, \dots, d\} [/latex]
iは特徴量の添字番号で、 ^x_jはx_jに対応する新しい特徴量です。このときの注意点として、特徴量は1つ少なくなってしまいます。
class UncorrelationMethod: def __init__(self, sensitive_feature, groupA_value=0.0): self.val0 = groupA_value self.sensitive_feature = sensitive_feature self.u = None def new_representation(self, examples): if self.u is None: print('You have to fit the model first...') return examples new_examples = np.array([ex if ex[self.sensitive_feature] == self.val0 # case x_i = 0, Group A else ex + self.u for ex in examples]) # case x_i = 1, Group B new_examples = np.delete(new_examples, self.sensitive_feature, 1) return new_examples def fit(self, dataset): tmp = [ex for idx, ex in enumerate(dataset) if dataset[idx, -1] == 1 and ex[self.sensitive_feature] == self.val0] average_A_1 = np.mean(tmp, 0) n_A_1 = len(tmp) tmp = [ex for idx, ex in enumerate(dataset) if dataset[idx, -1] == 1 and ex[self.sensitive_feature] != self.val0] average_not_A_1 = np.mean(tmp, 0) n_not_A_1 = len(tmp) N_1 = len([ex for idx, ex in enumerate(dataset) if dataset[idx, -1] == 1]) self.u = average_A_1[:-1] - average_not_A_1[:-1] # Our hypothesis of values 0 (A) and +1 (B) for the sensitive feature among the two groups # has the following consequence: self.u[self.sensitive_feature] = -1.0
uncorr_data = UncorrelationMethod(sensitive_feature_gender, 0.0) uncorr_data.fit(training_data_matrix) new_training_data_matrix = np.hstack([uncorr_data.new_representation(training_data_matrix[:, :-1]), training_data_matrix[:, -1:-2:-1]]) new_test_data_matrix = np.hstack([uncorr_data.new_representation(test_data_matrix[:, :-1]), test_data_matrix[:, -1:-2:-1]])
vectors = np.array([t.tolist() for t in new_training_data_matrix[:,:-1]]).astype('float32') labels = np.where(np.array([t.tolist() for t in new_training_data_matrix[:,-1]]) == 1, 1, 0).astype('float32') buf = io.BytesIO() smac.write_numpy_to_dense_tensor(buf, vectors, labels) buf.seek(0) key = 'recordio-pb-data' boto3.resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'train', key)).upload_fileobj(buf) s3_train_data = 's3://{}/{}/train/{}'.format(bucket, prefix, key) print('uploaded training data location: {}'.format(s3_train_data)) output_location = 's3://{}/{}/output'.format(bucket, prefix) print('training artifacts will be uploaded to: {}'.format(output_location)) linearf = sagemaker.estimator.Estimator(container, role, train_instance_count=1, train_instance_type='ml.c4.xlarge', output_path=output_location, sagemaker_session=sess) linearf.set_hyperparameters(feature_dim=13, predictor_type='binary_classifier', mini_batch_size=200) path_fair_model = linearf.fit({'train': s3_train_data}) linear_predictorf = linearf.deploy(initial_instance_count=1, instance_type='ml.m4.xlarge') linear_predictorf.content_type = 'text/csv' linear_predictorf.serializer = csv_serializer linear_predictorf.deserializer = json_deserializer
predictions = [] distance_from_hyperplane = [] for array in np.array_split(new_test_data_matrix[:,:-1], 100): result = linear_predictorf.predict(array) predictions += [r['predicted_label'] for r in result['predictions']] distance_from_hyperplane += [r['score'] for r in result['predictions']] distance_from_hyperplane_test_fair = np.array(distance_from_hyperplane) predictions_test_fair = np.array(predictions) pd.crosstab(np.where(new_test_data_matrix[:,-1] == 1, 1, 0), predictions_test_fair, rownames=['actuals'], colnames=['predictions'])
groupA_idxs = [idx for idx, val in enumerate(test_data_matrix) if val[sensitive_feature_gender] == groupA_value] groupB_idxs = [idx for idx, val in enumerate(test_data_matrix) if val[sensitive_feature_gender] != groupA_value] deo = deo_from_list(new_test_data_matrix, predictions_test_fair, groupA_idxs, groupB_idxs) print('DEO: %f' % deo)
predictions = [] distance_from_hyperplane = [] for array in np.array_split(new_training_data_matrix[:,:-1], 100): result = linear_predictorf.predict(array) predictions += [r['predicted_label'] for r in result['predictions']] distance_from_hyperplane += [r['score'] for r in result['predictions']] distance_from_hyperplane_train_fair = np.array(distance_from_hyperplane) predictions_train_fair = np.array(predictions) pd.crosstab(np.where(new_training_data_matrix[:,-1] == 1, 1, 0), predictions_train_fair, rownames=['actuals'], colnames=['predictions']) groupA_idxs = [idx for idx, val in enumerate(training_data_matrix) if val[sensitive_feature_gender] == groupA_value] groupB_idxs = [idx for idx, val in enumerate(training_data_matrix) if val[sensitive_feature_gender] != groupA_value] deo = deo_from_list(new_training_data_matrix, predictions_train_fair, groupA_idxs, groupB_idxs) print('DEO: %f' % deo)
import matplotlib.pyplot as plt sensitive_feature = sensitive_feature_gender SMALL_SIZE = 12 MEDIUM_SIZE = 12 BIGGER_SIZE = 16 bins = 30 X = training_data_matrix[:, :-1] y = training_data_matrix[:, -1] Xte = test_data_matrix[:, :-1] yte = test_data_matrix[:, -1] ypos = np.max(y) yneg = np.min(y) idx_group_A1 = [idx for idx, v in enumerate(Xte) if v[sensitive_feature] == groupA_value and yte[idx] == ypos] idx_group_B1 = [idx for idx, v in enumerate(Xte) if v[sensitive_feature] != groupA_value and yte[idx] == ypos] titles = ['Adult Dataset - TPR Area - Linear', 'Adult Dataset - TPR Area - Fair Linear'] for i,distance_from_hyperplane in enumerate([distance_from_hyperplane_test, distance_from_hyperplane_test_fair]): distance_from_hyperplane = distance_from_hyperplane - 0.5 xmin = np.min([np.min(distance_from_hyperplane[idx_group_A1]), np.min(distance_from_hyperplane[idx_group_B1])]) xmax = np.max([np.max(distance_from_hyperplane[idx_group_A1]), np.max(distance_from_hyperplane[idx_group_B1])]) fig, ax = plt.subplots(figsize=(8, 6), dpi=90) plt.rc('font', size=SMALL_SIZE) # controls default text sizes plt.rc('axes', titlesize=SMALL_SIZE) # fontsize of the axes title plt.rc('axes', labelsize=MEDIUM_SIZE) # fontsize of the x and y labels plt.rc('xtick', labelsize=SMALL_SIZE) # fontsize of the tick labels plt.rc('ytick', labelsize=SMALL_SIZE) # fontsize of the tick labels plt.rc('legend', fontsize=MEDIUM_SIZE) # legend fontsize #plt.rc('figure', titlesize=MEDIUM_SIZE) # fontsize of the figure title pdf, bins, patches = ax.hist(distance_from_hyperplane[idx_group_A1], bins=bins, density=True, stacked=True, label='A=Female, Y=1', alpha=1.0) ax.hist(distance_from_hyperplane[idx_group_B1], bins=bins, density=True, stacked=True, label='B=Male, Y=1', alpha=0.5) ax.legend(loc='upper left') ax.set_xlim(left=0.0, right=xmax) ax.set_ylim(0, 3) plt.title(titles[i]) plt.show()
import sagemaker sagemaker.Session().delete_endpoint(linear_predictor.endpoint) sagemaker.Session().delete_endpoint(linear_predictorf.endpoint)
今回は 「クラスメソッド Amazon SageMaker Advent Calendar」 の15日目として、 「Amazon SageMakerで偏りを考慮した線形学習」 についてお送りしました。今回扱ったデータセットのようにカテゴリによってデータに偏りがあるケースは多いと思います。そういった際のソリューションの1つとして知れて良かったな、と個人的に思いました。