XGBoostを使って解約率を予測するチュートリアルやってみた
皆さん、こんにちは。
クルトンです!
今回は解約率を予想するチュートリアルをやってみました。
なお、実行環境はSageMakerノートブックインスタンスで、インスタンスタイプはml.m4.xlargeです。
2023/05/17(水)追記
本記事で学習させたモデルを使って、モデル解釈時に使われるSHAPを含むレポートをSageMaker Clarifyで出力してみました。モデルの学習後、よければこちらもご覧ください。
ノートブックインスタンスにおける処理の流れ
処理の流れは次のようになります。モデルの学習時にはSageMakerのSessionオブジェクト等が必要となります。 AWS上で機械学習をする時に覚えておくべき事柄は、モデルの学習にはSageMakerなどのサービスを、データを置く場所としてはS3を使うという点です。
- データを用意
- S3バケットへ用意したデータをアップロード
- モデルを用意
- S3にあるデータを使って学習
- モデルを推論エンドポイントへデプロイ
- 推論エンドポイントを使って予測
- 必要に応じてAWSリソースの削除
必要なモジュールのインポート
import sys !{sys.executable} -m pip install sagemaker pandas numpy --upgrade
!{sys.executable}と書かれている部分はノートブックインスタンスのPythonを実行しているバイナリファイルを指しています。
要はpython -m pip install sagemaker pandas numpy --upgrade
のようなコマンドを叩いて、モジュールsagemaker、pandas、numpyの3つをアップグレードする処理を実行しています。
import sagemaker sess = sagemaker.Session() bucket = sess.default_bucket() prefix = "sagemaker/DEMO-xgboost-churn" import boto3 import re from sagemaker import get_execution_role role = get_execution_role()
次に今回必要となる、SageMakerのセッションオブジェクトの取得などを行います。変数prefixで設定している内容はS3バケットへのアップロード時やモデルの学習時に指定するものですので、お好みの名前に変更ください。
import pandas as pd import numpy as np import matplotlib.pyplot as plt import io import os import sys import time import json from IPython.display import display from time import strftime, gmtime from sagemaker.inputs import TrainingInput from sagemaker.serializers import CSVSerializer
後のコードで必要となるものを全てインポートしておきます。
bucket
role
最後にバケット名や現在どのIAMロールで実行しているか確認をしておきましょう。 ファイルやIAMロールを削除する時にも必要となりますし、どれが使われているかを把握しておくことは重要です。
S3バケットへデータのアップロード
s3 = boto3.client("s3") s3.download_file(f"sagemaker-sample-files", "datasets/tabular/synthetic/churn.txt", "churn.txt")
データについては、AWS公式が公開しているデータセットをダウンロードしてきて、そちらを、こちら側の環境にあるS3バケットへアップロードを行ないます。まずはダウンロードまでを行ないます。
churn = pd.read_csv("./churn.txt") pd.set_option("display.max_columns", 500) churn
len(churn.columns)
ファイル読み込みをpandasのデータフレームを使って行ないます。 データの中身を確認するためここで出力しています。
ここでチュートリアルでは、カラムの最大表示件数を500に設定していますが、ここはお好みで構いません。
for column in churn.select_dtypes(include=["object"]).columns: display(pd.crosstab(index=churn[column], columns="% observations", normalize="columns"))
- select_dtypesメソッドを使って特定のデータタイプ(今回だとオブジェクト型)のみを抽出し、そのオブジェクト型のカラム名を取得してfor文で一つずつ取り出しています。
- displayメソッドを使って表記します。
- pd.crosstabでクロス集計表を表示しています。
- 引数indexでは読み込んだデータセットの行にあたる部分を渡します。
- 引数columnsは列の名前になります。
- 引数normalizeで、全てのカラムにおける各カラムのデータの割合を算出しています。
まとめると、このコードでやっている事は各カラムにおけるデータの割合を確認しています。
display(churn.describe()) %matplotlib inline hist = churn.hist(bins=30, sharey=True, figsize=(10, 10))
- describeメソッドを使って読み込んだデータの基本統計量を確認します。
- histメソッドを使ってヒストグラムを確認します。
- 引数binsで階級を設定します。表示する棒の数を設定します。
- 引数shareyをデフォルトでFalseです。Trueにする事でそれぞれのヒストグラムの表示時領域のy軸の値を揃える(share)する事が可能です。似た引数でx軸に対して設定できるsharexがあります。
- 引数figsizeで全体の表示領域を設定しています。
取り扱うデータの特徴を見ています。必要であれば前処理をするため、その確認をしています。
churn = churn.drop("Phone", axis=1) churn["Area Code"] = churn["Area Code"].astype(object)
- dropメソッドを使って"Phone"のカラム名の"列"を削除します。
- ここまで確認してきたデータの内容を鑑みるに、解約率に電話番号はそこまで関係がないように見受けられる為だと思います。
- astypeメソッドを使ってデータのタイプをobject型に変更しています。(元はint64です。)
for column in churn.select_dtypes(include=["object"]).columns: if column != "Churn?": display(pd.crosstab(index=churn[column], columns=churn["Churn?"], normalize="columns")) for column in churn.select_dtypes(exclude=["object"]).columns: print(column) hist = churn[[column, "Churn?"]].hist(by="Churn?", bins=30) plt.show()
- 解約したかを表す"Churn?"とそれ以外のカラムにおいて関係性を確認しています。
- 一つ目のfor文ではオブジェクト型のものを抽出してクロス集計表を使って表示します。
- 二つ目のfor文ではオブジェクト型以外のものを抽出してヒストグラムを使って表示します。(引数byで"Churn?"の値であるTrueとFalseに分けられています。)
display(churn.corr()) pd.plotting.scatter_matrix(churn, figsize=(12, 12)) plt.show()
- それぞれのカラムの関係性を数値とグラフの2つから確認します。
- corrメソッドを使って各カラムの相関係数を算出します。
- scatter_matrixメソッドを使って、各カラムにおける散布図を図示します。
相関係数が高いカラム同士は、どちらか片方が必要のないカラムと判断できる可能性があるため、確認をしておきます。
churn = churn.drop(["Day Charge", "Eve Charge", "Night Charge", "Intl Charge"], axis=1)
チュートリアルでは相関係数などから判断して、除くカラムを選択しています。
model_data = pd.get_dummies(churn) model_data = pd.concat( [model_data["Churn?_True."], model_data.drop(["Churn?_False.", "Churn?_True."], axis=1)], axis=1 )
ここではモデルの学習や検証に使う用のデータを準備しています。
- get_dummiesメソッドを使ってカテゴリカル変数を数値に変換しています。(前処理しています。)
- get_dummiesメソッドを使うと"Churn?"での真偽値にそれぞれを表すカラムが作成されます。(例えば"Churn?_True"において1なら解約、0なら解約していない事を表します。)
- 解約したかを表すカラム以外のデータセットに対して、解約した事を表す"Churn?_True"を正解ラベルとして列に加えて一つのデータセットを作成します。
train_data, validation_data, test_data = np.split( model_data.sample(frac=1, random_state=1729), [int(0.7 * len(model_data)), int(0.9 * len(model_data))], ) train_data.to_csv("train.csv", header=False, index=False) validation_data.to_csv("validation.csv", header=False, index=False)
学習用、検証用、テスト用の三つにデータを分割して変数へ格納しています。
- sampleメソッドでデータの行をランダムな並びに変えています。
- fracではデータの抽出する割合を指定します。今回は全部のデータを使いたいので1(100%)を指定します。
- splitメソッドでデータを3つに分割しています。
- 一つ目の引数では分割対象のデータを渡します。
- listで渡している引数では、何個めのデータで分割するかのインデックスを指定します。
- 今回だと始まりから7割までを一つ目、7割の地点から9割の地点までを二つ目、9割の地点から終わりまでを3つ目としてデータを分割します。
- つまり全部で5000個のデータがあるので、train_dataには7割(3500個)のデータ、validation_dataには2割(1000個)のデータ、test_dataには1割(500個)のデータが格納されます。
自分はよくfrom sklearn.model_selection import train_test_split
を初めの方に書いています。train_test_split
メソッドを2回使う事で、学習用、検証用、テスト用の3つにデータを分割しているのですが、ここではsampleメソッドを使ってデータの中身をランダムにしてから、splitメソッドで3つに分けています。
s3 = boto3.Session().client("s3") s3.upload_file(Filename="train.csv", Bucket=bucket, Key=os.path.join(prefix, "train/train.csv")) s3.upload_file(Filename="validation.csv", Bucket=bucket, Key=os.path.join(prefix, "validation/validation.csv"))
ここで、S3バケットへファイルのアップロードを行います。trainデータはprefixで指定したパスの直下にtrainというフォルダを作ってtrain.csvというファイルをアップロードしています。validationについても同様にしてアップロードをしています。
モデルの学習
container = sagemaker.image_uris.retrieve("xgboost", sess.boto_region_name, "1.5-1") display(container)
公式が用意している、勾配ブースティング木を使ったアルゴリズムであるXGBoostのDockerイメージを取得します。 displayメソッドを使って、どこから取得してきたのかが表示されます。
s3_input_train = TrainingInput( s3_data="s3://{}/{}/train".format(bucket, prefix), content_type="csv" ) s3_input_validation = TrainingInput( s3_data="s3://{}/{}/validation/".format(bucket, prefix), content_type="csv" )
SageMakerを使ってモデルの学習する際に必要な、学習データと検証データが置かれているS3 URIを指定します。
sess = sagemaker.Session() xgb = sagemaker.estimator.Estimator( container, role, instance_count=1, instance_type="ml.m4.xlarge", output_path="s3://{}/{}/output".format(bucket, prefix), sagemaker_session=sess, ) xgb.set_hyperparameters( max_depth=5, eta=0.2, gamma=4, min_child_weight=6, subsample=0.8, verbosity=0, objective="binary:logistic", num_round=100, ) xgb.fit({"train": s3_input_train, "validation": s3_input_validation})
sagemaker.estimator.Estimator
インスタンスを生成し、モデルのハイパーパラメータの値を設定をします。その後、fitメソッドを使って、学習を行います。
fitメソッドを使う際に、学習用データと検証用データにS3にある該当するファイルを指定する必要があるため、先ほど用意した変数2つを渡しています。
推論エンドポイントのデプロイ
xgb_predictor = xgb.deploy( initial_instance_count=1, instance_type="ml.m4.xlarge", serializer=CSVSerializer() )
学習が終わったら、結果を推論エンドポイントとしてデプロイします。デプロイ完了後にデータを渡すと、予測結果を返してくれるようになります。
ここは注意が必要で、推論エンドポイントは選択したインスタンス(ここだとml.m4.xlarge)の利用料金が掛かり続けるので、必要がない場合は削除をすべきリソースです。削除は後ほどコード上で行います。
評価
def predict(data, rows=500): split_array = np.array_split(data, int(data.shape[0] / float(rows) + 1)) predictions = "" for array in split_array: predictions = "".join([predictions, xgb_predictor.predict(array).decode("utf-8")]) return predictions.split("\n")[:-1] predictions = predict(test_data.to_numpy()[:, 1:])
予測結果を取得するために推論エンドポイントへ渡すのに必要なデータの形にして、予測結果を取得する関数を定義しています。
関数定義後に、実際に予測結果を取得します。なお、ここで予測結果を取得するのに使っているデータは元のデータセットを3分割した際に、ここまで使用していないtest_data
です。
predictions = np.array([float(num) for num in predictions]) print(predictions)
予測した結果をnumpyを使って変換します。変換した結果、1次元の配列になっている事をご確認ください。
pd.crosstab( index=test_data.iloc[:, 0], columns=np.round(predictions), rownames=["actual"], colnames=["predictions"], )
クロス集計表で、予測と実測値を表記しています。自分のところでは次のような結果が得られました。
actual \ predictions | 0.0 | 1.0 |
---|---|---|
0 | 234 | 19 |
1 | 14 | 233 |
パッと見た印象としては予測精度は良さそうです。500個のデータのうち、467個のデータで合っていますから、正解率は93.4%です。
plt.hist(predictions) plt.xlabel("Predicted churn probability") plt.ylabel("Number of customers") plt.show()
ヒストグラムで視覚的に予測結果を確認しています。
実際の値では0か1かどちらかしか値を取らないですが、予測結果では、1である可能性を数値として返します。
pd.crosstab(index=test_data.iloc[:, 0], columns=np.where(predictions > 0.3, 1, 0))
先ほどもクロス集計表を使って表示していましたが、ここでの表記はnumpy.whereを使っています。
このコードを日本語訳すると、「予測値が0.3より大きい場合は解約した(1である)と見なして、それ以外の場合は解約していない(0である)場合のクロス集計表を表示する」となります。
モデルの予測結果に関係するコスト
解約したかどうかを予測できるようにした後に、予測結果が間違った場合に、どれだけのコストが掛かるかを計算しています。
cutoffs = np.arange(0.01, 1, 0.01) costs = [] for c in cutoffs: costs.append( np.sum( np.sum( np.array([[0, 100], [500, 100]]) * pd.crosstab(index=test_data.iloc[:, 0], columns=np.where(predictions > c, 1, 0)) ) ) ) costs = np.array(costs) plt.plot(cutoffs, costs) plt.xlabel("Cutoff") plt.ylabel("Cost") plt.show()
真陰性は0ドル、偽陰性は500ドル、真陽性は100ドル、偽陽性は100ドルとしてコストを割り当て、図示しています。 それぞれの予測と解約の関係についての説明は次の通りです。
- 真陰性(0ドル)
- 解約していないと予測して、実際に解約していない顧客
- 偽陰性(500ドル)
- 解約していないと予測して、実際には解約していた顧客
- 真陽性(100ドル)
- 解約したと予測して、実際に解約していた顧客
- 偽陽性(100ドル)
- 解約したと予測して、実際には解約していない顧客
numpy.arrangeメソッドを使って、0.01から1までの0.01ずつ増やした数列を変数cutoffsへ用意しています。(つまり0.01、0.02、……、0.99の数列を用意しています。)
変数cutoffsで用意した数列は、numpy.whereでの予測値を1か0で判別するときの条件式で使われています。
print( "Cost is minimized near a cutoff of:", cutoffs[np.argmin(costs)], "for a cost of:", np.min(costs), )
予測結果が間違っていた場合に支払う事になるであろうコストが一番小さい金額と、予測値の1と判断する時の数値について表示します。
実際の現場では、ここで表示したようなコスト感を見て許容出来そうであれば導入するなどの決定が行なわれるのだと考えられます。
推論エンドポイントの削除
xgb_predictor.delete_endpoint()
ここまで終えた後は、推論エンドポイントは料金が掛かり続けるため、今後も使う場合は除いて、削除しておきます。
また、ノートブックインスタンスは止めないとコストが掛かるため、そちらも止めておきます。
他に作成して課金がされるAWSリソースとして、S3バケットにアップロードしたファイルがありますが、必要なければバケットごと削除ください。
終わりに
今回は、解約率の予測をやっているチュートリアルをやってみました。ここまでやってみた感想としては、解約率を予測するドメインによって、必要となるデータがかなり変わりそうだという事です。サービスを利用しているユーザーの快、不快が影響しているであろう項目が含められると予測が上手く出来そうです。
今回はここまで。
それでは、また!