Amazon SageMakerでデータの偏りを考慮した線形学習 – Amazon SageMaker Advent Calendar 2018

こんにちは、大阪DI部の大澤です。 この記事は「クラスメソッド Amazon SageMaker Advent Calendar」の15日目の記事です。

今回は「データの偏りを考慮した線形学習」についてご紹介します。 SageMakerの組み込みアルゴリズムである線形学習者(Linear Learner)を用いて、偏りがあるデータセットをどう補正して学習するかという内容のノートブックをやってみます。

やってみる

公平性の定義

特徴量の中に2つの値を持つセンシティブなもの(例:女と男)を含むような、二値分類の問題を考えます。 Equal Opportunityは次のように定義されます。

\[ \mathbb{P}_{(x,y)} \big[ f(x)>0 \, \big| \, x \text{ in group } A, y = 1 \big] = \mathbb{P}_{(x,y)} \big[ f(x)>0 \, \big| \, x \text{ in group } B, y = 1 \big] \]

  • f(x) = 学習済みモデル
  • A : 女性
  • B : 男性
  • P(x, y) : 条件にあった全ての(x, y)の組み合わせによる確率

Equal Opportunityは2つのグループ(AとB)における真陽性率(True Positive Rate, TPR)が同じということです。

Equal OpportunityからDifference in Equal Opportunity(DEO)は次のようになります。

\[ DEO(f) = \Big| \mathbb{P}_{(x,y)}\big[ f(x)>0 \, \big| \, x \text{ in group } A, y = 1 \big] - \mathbb{P}_{(x,y)} \big[ f(x)>0 \, \big| \, x \text{ in group } B, y = 1 \big] \Big| \]

DEOが低い値であるほど2つのグループ(AとB)を持つセンシティブな特徴量においてfは公平なモデルだと言えます。

DEOを算出する関数を定義しておきます。

from sklearn.metrics import confusion_matrix

def deo_from_list(dataset, predictions, groupA_idxs, groupB_idxs):
    tnA, fpA, fnA, tpA  = confusion_matrix(np.where(dataset[groupA_idxs][:,-1] == 1, 1, 0),
                           predictions[groupA_idxs]).ravel()
    tnB, fpB, fnB, tpB  = confusion_matrix(np.where(dataset[groupB_idxs][:,-1] == 1, 1, 0),
                                           predictions[groupB_idxs]).ravel()

    print('Examples in group A: %d' % len(groupA_idxs))
    print('Examples in group B: %d' % len(groupB_idxs))

    print('TPR group A: %f' % (float(tpA) / (tpA + fnA)))
    print('TPR group B: %f' % (float(tpB) / (tpB + fnB)))
    print('TPR all dataset: %f' % (float(tpA + tpB) / (tpA + fnA + tpB + fnB)))
    print('Accuracy all dataset: %f' % (float(tpA + tnA + tpB + tnB) /
          (tpA + fpA + fnA + tnA + tpB + fpB + fnB + tnB)))

    return(np.abs(float(tpA) / (tpA + fnA) - float(tpB) / (tpB + fnB)))

モデルから評価指標であるDEOを求めることが出来ると便利なので、その関数も定義しておきます。

def deo_from_model(model, dataset, sensitive_feature, groupA_value=None):
    if groupA_value == None:
        groupA_value = np.max(dataset[:, sensitive_feature])

    groupA_idxs = [idx for idx, val in enumerate(dataset)
                   if val[sensitive_feature] == groupA_value]
    groupB_idxs = [idx for idx, val in enumerate(dataset)
                   if val[sensitive_feature] != groupA_value]

    predictions = []
    for array in np.array_split(dataset[:,:-1], 100):
        result = model.predict(array)
        predictions += [r['predicted_label'] for r in result['predictions']]

    predictions = np.array(predictions)

    return deo_from_list(dataset, predictions, groupA_idxs, groupB_idxs)

データの保存場所とIAMロールの定義

学習データやモデルなどを保存する場所の指定と学習やエンドポイントの展開を行うIAMロールを指定します。

from sagemaker import Session

# S3のバケット名と保存時のプレフィックスを指定する
bucket = Session().default_bucket() #'fairness-test2'
prefix = 'sagemaker/DEMO-linear-adult'

# Define IAM role
from sagemaker import get_execution_role
import pandas as pd
import numpy as np
import urllib
import os
import sklearn.preprocessing as preprocessing
import seaborn as sns

# IAMロールを指定する
role = get_execution_role()

データセットの取得

UCI Machine Learning RepositoryAdult というデータセットを使うため、リポジトリからダウンロードしてきます。

if not os.path.isfile('adult.data'):
    urllib.request.urlretrieve('https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.data',
                              'adult.data')
    print('adult.data saved!')
else:
    print('adult.data already here.')

if not os.path.isfile('adult.test'):
    urllib.request.urlretrieve('https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.test',
                              'adult.test')
    print('adult.test saved!')
else:
    print('adult.test already here.')

データセットが持つ属性と取りうる値は以下の通りです。

Age 連続値
Workclass Private, Self-emp-not-inc, Self-emp-inc, Federal-gov, Local-gov, State-gov, Without-pay, Never-worked.
Fnlwgt 連続値
Education Bachelors, Some-college, 11th, HS-grad, Prof-school, Assoc-acdm, Assoc-voc, 9th, 7th-8th, 12th, Masters, 1st-4th, 10th, Doctorate, 5th-6th, Preschool.
Education-num 連続値
Marital-status Married-civ-spouse, Divorced, Never-married, Separated, Widowed, Married-spouse-absent, Married-AF-spouse.
Occupation Tech-support, Craft-repair, Other-service, Sales, Exec-managerial, Prof-specialty, Handlers-cleaners, Machine-op-inspct, Adm-clerical, Farming-fishing, Transport-moving, Priv-house-serv, Protective-serv, Armed-Forces.
Relationship Wife, Own-child, Husband, Not-in-family, Other-relative, Unmarried.
Ethnic group White, Asian-Pac-Islander, Amer-Indian-Eskimo, Other, Black.
Gender Female, Male.
Capital-gain 連続値
Capital-loss 連続値
Hours-per-week 連続値
Native-country United-States, Cambodia, England, Puerto-Rico, Canada, Germany, Outlying-US(Guam-USVI-etc), India, Japan, Greece, South, China, Cuba, Iran, Honduras, Philippines, Italy, Poland, Jamaica, Vietnam, Mexico, Portugal, Ireland, France, Dominican-Republic, Laos, Ecuador, Taiwan, Haiti, Columbia, Hungary, Guatemala, Nicaragua, Scotland, Thailand, Yugoslavia, El-Salvador, Trinadad&Tobago, Peru, Hong, Holand-Netherlands.
head adult.data

目的変数は次の通りです。年間で$50000より多く稼いでるか否かです。

Target <=50, >50

データの確認

各属性のヒストグラムを描画し、どういったデータなのかを確認します。

import matplotlib.pyplot as plt
from math import ceil

positive_idxs = [idx for idx, val in enumerate(original_data['Target']) if val == ">50K"]

fig = plt.figure(figsize=(20,100))
cols = 2
rows = ceil(float(original_data.shape[1]) / cols) * 2
for i, column in enumerate(original_data.columns):
    ax = fig.add_subplot(rows, cols, 2 * i + 1)
    ax.set_title(column)

    if original_data.dtypes[column] == np.object:
        original_data[column][:].value_counts(sort=True).plot(kind="bar", axes=ax)
    else:
        original_data[column][:].hist(axes=ax)
        plt.xticks(rotation="vertical")

    ax = fig.add_subplot(rows, cols, 2 * i + 2)
    ax.set_title(column + " (only positive examples)")
    if original_data.dtypes[column] == np.object:
        original_data[column][positive_idxs].value_counts(sort=True).plot(kind="bar", axes=ax)
    else:
        original_data[column][positive_idxs].hist(axes=ax)
        plt.xticks(rotation="vertical")

plt.subplots_adjust(hspace=0.7, wspace=0.2)

こんな感じで描画されます。 (多いので AgeWorkclassのみ表示してますが、実際には各属性について描画されるはずです。)

データの前処理

学習や推論に使えるデータになるように各特徴量を変換します。

# カテゴリ変数を数値に変換する
def number_encode_features(df):
    result = df.copy()
    encoders = {}
    for column in result.columns:
        if result.dtypes[column] == np.object:
            encoders[column] = preprocessing.LabelEncoder()
            #  print('Column:', column, result[column])
            result[column] = encoders[column].fit_transform(result[column].fillna('None'))
    return result, encoders

encoded_data, _ = number_encode_features(original_data)
training_data_matrix = np.array(encoded_data.values, dtype=float)
encoded_data, _ = number_encode_features(original_test)
test_data_matrix = np.array(encoded_data.fillna(0).values, dtype=float)

# 各値を標準化します
scaler = preprocessing.MinMaxScaler(feature_range=(0.0, 1.0))
training_data_matrix = scaler.fit_transform(training_data_matrix)
test_data_matrix = scaler.transform(test_data_matrix)

データの変換とアップロード

学習用のフォーマットにデータを変換し、S3へアップロードします。

import io
import numpy as np
import sagemaker.amazon.common as smac
import boto3
import os

vectors = np.array([t.tolist() for t in training_data_matrix[:,:-1]]).astype('float32')
labels = np.where(np.array([t.tolist() for t in training_data_matrix[:,-1]]) == 1, 1, 0).astype('float32')

buf = io.BytesIO()
smac.write_numpy_to_dense_tensor(buf, vectors, labels)
buf.seek(0)

key = 'recordio-pb-data'
boto3.resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'train', key)).upload_fileobj(buf)
s3_train_data = 's3://{}/{}/train/{}'.format(bucket, prefix, key)
print('uploaded training data location: {}'.format(s3_train_data))

通常の線形モデル

学習

通常の線形モデルをml.c4.xlargeのインスタンス1台で学習させます。

from sagemaker.amazon.amazon_estimator import get_image_uri
import sagemaker

# モデルの保存場所
output_location = 's3://{}/{}/output'.format(bucket, prefix)
print('training artifacts will be uploaded to: {}'.format(output_location))

# コンテナイメージの取得
container = get_image_uri(boto3.Session().region_name, 'linear-learner', "latest")

# estimatorの設定
sess = sagemaker.Session()
linear = sagemaker.estimator.Estimator(container,
                                       role,
                                       train_instance_count=1,
                                       train_instance_type='ml.c4.xlarge',
                                       output_path=output_location,
                                       sagemaker_session=sess)
linear.set_hyperparameters(feature_dim=14, # 特徴量の次元数
                           predictor_type='binary_classifier', # 今回は二値分類
                           mini_batch_size=200) # バッチサイズ(一度の学習時のデータ数)

# 学習開始
linear.fit({'train': s3_train_data})

モデルのデプロイ

モデルをホストするエンドポイントを作成して、モデルをデプロイします。

from sagemaker.predictor import csv_serializer, json_deserializer
linear_predictor = linear.deploy(initial_instance_count=1,
                                 instance_type='ml.m4.xlarge')


linear_predictor.content_type = 'text/csv'
linear_predictor.serializer = csv_serializer
linear_predictor.deserializer = json_deserializer

精度の検証

テストデータで予測した場合にどれだけの精度なのかを調べます。

import numpy as np

predictions = []
distance_from_hyperplane = []
for array in np.array_split(test_data_matrix[:,:-1], 100):
    result = linear_predictor.predict(array)
    predictions += [r['predicted_label'] for r in result['predictions']]
    distance_from_hyperplane += [r['score'] for r in result['predictions']]

predictions_test = np.array(predictions)
distance_from_hyperplane_test = np.array(distance_from_hyperplane)

import pandas as pd

pd.crosstab(np.where(test_data_matrix[:,-1] == 1, 1, 0), predictions_test,
            rownames=['actuals'], colnames=['predictions'])

sensitive_feature_gender = 9  # Gender
groupA_value = 0.0 # Female

deo = deo_from_model(linear_predictor, test_data_matrix,
                     sensitive_feature_gender, groupA_value=groupA_value)
print('DEO: %f' % deo)

この結果を見るとDEOは0.24となっています。これは2つのグループのTruePositive率(TP)の差が0.24もあるということです。グループBは40%近いTPに対して、グループAは約16%程度しかありません。

この偏りをどうにかすることが今回の目的です。

偏りを考慮した線形学習モデル

まず、 u というベクトルを導入します。

\[ u = \frac{1}{n(+, A)} \sum_{x \in X(+, A)} x - \frac{1}{n(+, B)} \sum_{x \in X(+, B)} x \]

  • X(+, A)X(+, B) はグループA,BのラベルがPositiveとなるデータの集合
  • n(+, A)n(+, B)はそれぞれの集合の要素数(濃度)

ベクトルuは2つのグループ(AとB)のそれぞれの特徴量の平均の差で、グループごとの偏りを表しています。

元となる特徴量を次のように変換することで2つのグループの偏りを減らします。

\[ \hat{x}_j = x_j - x_i \frac{u_j}{u_i} \,\,\,\, j \in \{1, \dots, i-1, i+1, \dots, d\} \]

iは特徴量の添字番号で、 ^x_jx_jに対応する新しい特徴量です。このときの注意点として、特徴量は1つ少なくなってしまいます。

グループの偏りを減らすための処理を次のように関数化します。

class UncorrelationMethod:
    def __init__(self, sensitive_feature, groupA_value=0.0):
        self.val0 = groupA_value
        self.sensitive_feature = sensitive_feature
        self.u = None

    def new_representation(self, examples):
        if self.u is None:
            print('You have to fit the model first...')
            return examples
        new_examples = np.array([ex if ex[self.sensitive_feature] == self.val0 # case x_i = 0, Group A
                                 else ex + self.u for ex in examples]) # case x_i = 1, Group B
        new_examples = np.delete(new_examples, self.sensitive_feature, 1)
        return new_examples

    def fit(self, dataset):
        tmp = [ex for idx, ex in enumerate(dataset)
               if dataset[idx, -1] == 1 and ex[self.sensitive_feature] == self.val0]
        average_A_1 = np.mean(tmp, 0)
        n_A_1 = len(tmp)
        tmp = [ex for idx, ex in enumerate(dataset)
               if dataset[idx, -1] == 1 and ex[self.sensitive_feature] != self.val0]
        average_not_A_1 = np.mean(tmp, 0)
        n_not_A_1 = len(tmp)
        N_1 = len([ex for idx, ex in enumerate(dataset) if dataset[idx, -1] == 1])
        self.u = average_A_1[:-1] - average_not_A_1[:-1]
        # Our hypothesis of values 0 (A) and +1 (B) for the sensitive feature among the two groups
        # has the following consequence:
        self.u[self.sensitive_feature] = -1.0

先ほど定義した関数で特徴量に変換処理を施します。

uncorr_data = UncorrelationMethod(sensitive_feature_gender, 0.0)
uncorr_data.fit(training_data_matrix)
new_training_data_matrix = np.hstack([uncorr_data.new_representation(training_data_matrix[:, :-1]),
                                     training_data_matrix[:, -1:-2:-1]])
new_test_data_matrix = np.hstack([uncorr_data.new_representation(test_data_matrix[:, :-1]),
                                  test_data_matrix[:, -1:-2:-1]])

学習

変換後のデータセットで線形モデルの学習を行います。学習終了後にはモデルをデプロイします。

vectors = np.array([t.tolist() for t in new_training_data_matrix[:,:-1]]).astype('float32')
labels = np.where(np.array([t.tolist() for t in new_training_data_matrix[:,-1]]) == 1, 1, 0).astype('float32')

buf = io.BytesIO()
smac.write_numpy_to_dense_tensor(buf, vectors, labels)
buf.seek(0)

key = 'recordio-pb-data'
boto3.resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'train', key)).upload_fileobj(buf)
s3_train_data = 's3://{}/{}/train/{}'.format(bucket, prefix, key)
print('uploaded training data location: {}'.format(s3_train_data))

output_location = 's3://{}/{}/output'.format(bucket, prefix)
print('training artifacts will be uploaded to: {}'.format(output_location))

linearf = sagemaker.estimator.Estimator(container,
                                       role,
                                       train_instance_count=1,
                                       train_instance_type='ml.c4.xlarge',
                                       output_path=output_location,
                                       sagemaker_session=sess)
linearf.set_hyperparameters(feature_dim=13,
                           predictor_type='binary_classifier',
                           mini_batch_size=200)

path_fair_model = linearf.fit({'train': s3_train_data})

linear_predictorf = linearf.deploy(initial_instance_count=1,
                                 instance_type='ml.m4.xlarge')
linear_predictorf.content_type = 'text/csv'
linear_predictorf.serializer = csv_serializer
linear_predictorf.deserializer = json_deserializer

精度の検証

predictions = []
distance_from_hyperplane = []
for array in np.array_split(new_test_data_matrix[:,:-1], 100):
    result = linear_predictorf.predict(array)
    predictions += [r['predicted_label'] for r in result['predictions']]
    distance_from_hyperplane += [r['score'] for r in result['predictions']]

distance_from_hyperplane_test_fair = np.array(distance_from_hyperplane)
predictions_test_fair = np.array(predictions)
pd.crosstab(np.where(new_test_data_matrix[:,-1] == 1, 1, 0), predictions_test_fair,
            rownames=['actuals'], colnames=['predictions'])

groupA_idxs = [idx for idx, val in enumerate(test_data_matrix)
               if val[sensitive_feature_gender] == groupA_value]
groupB_idxs = [idx for idx, val in enumerate(test_data_matrix)
               if val[sensitive_feature_gender] != groupA_value]
deo = deo_from_list(new_test_data_matrix, predictions_test_fair, groupA_idxs, groupB_idxs)
print('DEO: %f' % deo)

全体の精度がわずかに下がることになりましたが、DEOが約6%とかなり低下しました。

学習データで推論をしてみて、過学習してないかも確認します。

predictions = []
distance_from_hyperplane = []
for array in np.array_split(new_training_data_matrix[:,:-1], 100):
    result = linear_predictorf.predict(array)
    predictions += [r['predicted_label'] for r in result['predictions']]
    distance_from_hyperplane += [r['score'] for r in result['predictions']]

distance_from_hyperplane_train_fair = np.array(distance_from_hyperplane)
predictions_train_fair = np.array(predictions)
pd.crosstab(np.where(new_training_data_matrix[:,-1] == 1, 1, 0),
            predictions_train_fair, rownames=['actuals'], colnames=['predictions'])

groupA_idxs = [idx for idx, val in enumerate(training_data_matrix)
               if val[sensitive_feature_gender] == groupA_value]
groupB_idxs = [idx for idx, val in enumerate(training_data_matrix)
               if val[sensitive_feature_gender] != groupA_value]

deo = deo_from_list(new_training_data_matrix, predictions_train_fair, groupA_idxs, groupB_idxs)
print('DEO: %f' % deo)

テスト用データに比べて圧倒的に学習用データの精度が高い訳ではないので、問題なさそうです。

データの分布の確認

元となるデータセットと偏りを修正したデータセットそれぞれのデータ分布を描画して見ます。

import matplotlib.pyplot as plt

sensitive_feature = sensitive_feature_gender

SMALL_SIZE = 12
MEDIUM_SIZE = 12
BIGGER_SIZE = 16
bins = 30

X = training_data_matrix[:, :-1]
y = training_data_matrix[:, -1]
Xte = test_data_matrix[:, :-1]
yte = test_data_matrix[:, -1]
ypos = np.max(y)
yneg = np.min(y)
idx_group_A1 = [idx for idx, v in enumerate(Xte)
                if v[sensitive_feature] == groupA_value and yte[idx] == ypos]
idx_group_B1 = [idx for idx, v in enumerate(Xte)
                if v[sensitive_feature] != groupA_value and yte[idx] == ypos]


titles = ['Adult Dataset - TPR Area - Linear',
         'Adult Dataset - TPR Area - Fair Linear']
for i,distance_from_hyperplane in enumerate([distance_from_hyperplane_test,
                                           distance_from_hyperplane_test_fair]):
    distance_from_hyperplane = distance_from_hyperplane - 0.5
    xmin = np.min([np.min(distance_from_hyperplane[idx_group_A1]),
                   np.min(distance_from_hyperplane[idx_group_B1])])
    xmax = np.max([np.max(distance_from_hyperplane[idx_group_A1]),
                   np.max(distance_from_hyperplane[idx_group_B1])])
    fig, ax = plt.subplots(figsize=(8, 6), dpi=90)
    plt.rc('font', size=SMALL_SIZE)          # controls default text sizes
    plt.rc('axes', titlesize=SMALL_SIZE)     # fontsize of the axes title
    plt.rc('axes', labelsize=MEDIUM_SIZE)    # fontsize of the x and y labels
    plt.rc('xtick', labelsize=SMALL_SIZE)    # fontsize of the tick labels
    plt.rc('ytick', labelsize=SMALL_SIZE)    # fontsize of the tick labels
    plt.rc('legend', fontsize=MEDIUM_SIZE)    # legend fontsize
    #plt.rc('figure', titlesize=MEDIUM_SIZE)  # fontsize of the figure title
    pdf, bins, patches = ax.hist(distance_from_hyperplane[idx_group_A1], bins=bins,
                                 density=True, stacked=True, label='A=Female, Y=1', alpha=1.0)
    ax.hist(distance_from_hyperplane[idx_group_B1], bins=bins, density=True,
            stacked=True, label='B=Male, Y=1', alpha=0.5)
    ax.legend(loc='upper left')
    ax.set_xlim(left=0.0, right=xmax)
    ax.set_ylim(0, 3)
    plt.title(titles[i])

plt.show()

エンドポイントの削除

最後に無駄な課金が発生しないように不要になったエンドポイントを削除します。

import sagemaker

sagemaker.Session().delete_endpoint(linear_predictor.endpoint)
sagemaker.Session().delete_endpoint(linear_predictorf.endpoint)

さいごに

今回は 「クラスメソッド Amazon SageMaker Advent Calendar」 の15日目として、 「Amazon SageMakerで偏りを考慮した線形学習」 についてお送りしました。今回扱ったデータセットのようにカテゴリによってデータに偏りがあるケースは多いと思います。そういった際のソリューションの1つとして知れて良かったな、と個人的に思いました。

お読みくださりありがとうございました〜!明日もお楽しみに〜!