この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
こんにちは、小澤です。 この記事はHadoop Advent Calendar 18日目のものとなります。
前回はSparkのDataset APIについて紹介させていただきました。
今回はみんな大好きな機械学習ライブラリである、Spark MLlibについて書かせていただきます。
なお、MLlibを使ってどのようなことができるかにフォーカスさせていただきますので、機械学習に関する詳細は割愛します。 具体的には以下の内容については説明しません。
spark.mllibとspark.ml
MLlibでは2つのパッケージが提供されています。 spark.mllibは従来からあるRDDの利用をベースとしたインターフェースを提供するものです。 一方、spark.mlはDataFrameをインターフェースとしたライブラリです。
spark.mllibはすでにメンテナンスモードになっており、今後はspark.mlが中心となることやPipelineと呼ばれる機能を活用した方が利便性が高いなどの理由で、今回はspark.mlを使った内容にさせていただきます。
Pipelineとは
学習するなどの処理単体で見ると関数一つでできてしまうのですが、その関数の引数となる値を作り出すためにやらなければならないことがいろいろあります。
この処理は煩雑になることも多く、そうなるとソースコードから処理フロー全体としてどんなことをしているのか分かりづらくなってしまいます。
そこでこのPipelineをことで定型的な処理をライブラリに任せたり、全体のフローを分かりやすくしてたりができます。
この図での前処理の部分はいろいろな状況が考えられるので状況次第となりますが、学習・予測ともに同じフォーマットにまで整えられたら、モデルだけでなく赤枠で囲った特徴抽出の部分も同じ処理を使いまわせたら嬉しいですね。
学習の際に一連のフローを定義できるだけでなく、Pipelineを使うとこの特徴抽出なども含めて保存しておくことが可能になります。
Pipelineを利用して機械学習フローを実装する
ここではexamplesにあるものを例にして実際のフローを見ています。
今回は評価まで含めて一通りの確認をするので、examplesにあるml/cross_validation.pyを利用します。順を追って見ていきましょう。
データの準備
この例では適当なダミーデータからDataFrameをまず作成しています。
training = spark.createDataFrame([
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0),
(4, "b spark who", 1.0),
(5, "g d a y", 0.0),
(6, "spark fly", 1.0),
(7, "was mapreduce", 0.0),
(8, "e spark program", 1.0),
(9, "a e c l", 0.0),
(10, "spark compile", 1.0),
(11, "hadoop software", 0.0)
], ["id", "text", "label"])
training.show(3)
+---+---------------+-----+
| id| text|label|
+---+---------------+-----+
| 0|a b c d e spark| 1.0|
| 1| b d| 0.0|
| 2| spark f g h| 1.0|
+---+---------------+-----+
only showing top 3 rows
training.printSchema()
root
|-- id: long (nullable = true)
|-- text: string (nullable = true)
|-- label: double (nullable = true)
この例ではidは各データを一意に与えられるものなので、特徴抽出の対象はtext、正解データはlabelとなります。
学習
次に学習の流れを追っていきます。
これは上記の図でいうところの「特徴抽出」と「学習」を行うものになります。
まずは処理フローを定義したPipelineを作成します
今回特徴抽出で行うのは、textの単語への分割と各単語の出現回数の数え上げです。
from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
# この2つは特徴抽出
tokenizer = Tokenizer(inputCol='text', outputCol='words')
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol='features')
# これは学習器の設定
lr = LogisticRegression(maxIter=10)
# 特徴抽出と学習アルゴリズムのフローをPipelineとして登録
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
# 学習を行ってモデルを生成
model = pipeline.fit(training)
特徴抽出では、inputCol, outputColでそれぞれ各処理の入出力を指定しています。
ここで注目しておくのは、これらの関数に実際のデータを渡していない点です。こうすることでこれらの処理フローと実際にこのフローに当てはめるデータとを分離しておくことができるようになっています。
なお、データを与えた時に実際にどのような変換が行われるか確認したい場合は、
tokenizer.transform(training).show(3, False)
+---+---------------+-----+----------------------+
|id |text |label|words |
+---+---------------+-----+----------------------+
|0 |a b c d e spark|1.0 |[a, b, c, d, e, spark]|
|1 |b d |0.0 |[b, d] |
|2 |spark f g h |1.0 |[spark, f, g, h] |
+---+---------------+-----+----------------------+
only showing top 3 rows
のように実際にDataFrameを引数にして、transformを実行すると確認できます。
最後に処理フローを定義したPipelineに対してfit関数にデータを渡すことで実際の学習を行っています。
予測
次に予測を行います。 こちらも学習データと同様にダミーのデータを用意します。
test = spark.createDataFrame([
(4, "spark i j k"),
(5, "l m n"),
(6, "mapreduce spark"),
(7, "apache hadoop")
], ["id", "text"])
先ほどとの違いは、予測のプロセスでは正解ラベルが未知のものがどのようなラベルがつくのかの予測を行うものなのでその項目がありせん。 これに対して、先ほど定義したモデルに基づいて予測を行います
predict = model.transform(test)
# predictの中には途中のfeatureなども含まれるので、必要なカラムのみ取得して確認
predict.select('id', 'text', 'probability', 'prediction').show(truncate=False)
+---+---------------+------------------------------------------+----------+
|id |text |probability |prediction|
+---+---------------+------------------------------------------+----------+
|4 |spark i j k |[0.0015595237733491984,0.9984404762266509]|1.0 |
|5 |l m n |[0.9999950075858843,4.9924141156278E-6] |0.0 |
|6 |mapreduce spark|[0.059606908424120426,0.9403930915758795] |1.0 |
|7 |apache hadoop |[0.9947588318910259,0.005241168108974087] |0.0 |
+---+---------------+------------------------------------------+----------+
predictionに予測したラベル、probabilityに各ラベルごとの確率が入っています。
Grid Search, Cross Validation
次に評価のプロセスの説明をします。
ここまでで、学習と予測が可能になりましたが実際にはどの程度うまく予測できているのかはわかりません。
ここでは以下の2つのプロセスを実行します。
これらの詳細についてもここでは割愛します。
実際の処理内容を見てみましょう
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# Grid Searchを行う範囲を指定する
paramGrid = ParamGridBuilder().addGrid(hashingTF.numFeatures, [10, 100, 1000]).addGrid(lr.regParam, [0.1, 0.01]).build()
# CrossValidationに対する設定
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=BinaryClassificationEvaluator(), numFolds=2)
# モデル作成
cvModel = cv.fit(training)
# 評価指標と各パラメータでの評価結果
print(cvModel.avgMetrics)
print(cvModel.getEvaluator().getMetricName())
モデルの保存
最後にサンプルにはありませんが、モデルの保存と読み込みついて説明します。
MLlibではPipelineの保存もできるので、再現などもすぐに行えます。
# 保存
pipeline.write().overwrite().save('test_pipeline')
cvModel.bestModel.write().overwrite().save('test_model')
# 読み込み
from pyspark.ml import PipelineModel
pipe2 = Pipeline.load('test_pipeline')
model2 = PipelineModel.load('test_model')
終わりに
今回はexampleを例にして、spark.mlのPipelineを使った機械学習のフローについて解説しました。 複雑な処理が挟まる可能性のある部分もPipelineを使うことで全体的な流れの見通しが良くなることを実感していただけたら幸いです。
明日はSparkなどを使ったデータ分析を便利するZeppelinというものについて書かせていただく予定です。
ぜひ、お楽しみに!