[Spark][TF-IDF][テキスト処理] Reuters21578 を K-means 法でクラスタリングする

アイキャッチ

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

文書をグループ分けしたい

ネット上やDBの中にある文書をグループ分けしておいて、新しい文書が現れたときにそれが既存のどのグループに属しているか判断したい場合があります。

今回はApache Sparkを使って文章から取得できる TF-IDF を用いた K-means クラスタリングを実行し、分類を行ってみました。

クラスタリング

入力ベクトルのみから類似したベクトルのグループを見出すような機械学習の手法をクラスタリングと呼びます。

例えば、2次元 x-y 平面上における以下のようなデータ・セットがあった場合には

Fig0

クラスタリングによって以下の様なグループ分けがなされると期待されます。

Fig1

通常、訓練データ中の入力ベクトルに対応した目標ベクトルがあるような手法は教師あり機械学習と呼ばれますが、今回扱う K-means クラスタリングについては目標ベクトルがありません。そのため、教師なし機械学習とも呼ばれたりします。

K-means 法

K-means 法はn次元の入力ベクトルから以下の様な段階を経て類似したベクトルのグループを見出します。

1. 分割したいグループ数Kを予め定めておき、入力ベクトルの集合から無作為にK個のグループ分けをします

\[ U_k = \{ \boldsymbol{x}^{(k)}_1, \cdots, \boldsymbol{x}^{(k)}_{m_k} \} \left( \ k = 1, \cdots, \ K \right) \]

2. グループ分けされた入力ベクトルからK個平均ベクトルを算出します

\[ \boldsymbol{\mu}_k = \frac{1}{m_k} \sum^{m_k}_{i=1} \boldsymbol{x}^{(k)}_i \left( \ k = 1, \cdots, K \right) \]

3. 入力ベクトル全体をK個の平均ベクトルからの近さでグループ分けします

\[ U^{'}_k = \{ \ \boldsymbol{x} \ | \ \boldsymbol{x} \in U, \ k = \arg\min_j \left| \boldsymbol{x} - {\boldsymbol \mu}_j \right| \} \ \ ( k = 1, \dots, K ) \]

ここで U は入力ベクトル全体の集合です。

4. 入力ベクトルのグループ分けに変化がなくなるまで 2 3 を繰り返します

2 3 の繰り返しで次第にグループ分けの変化が少なくなっていき、変化がなくなった箇所で、もしくは変化が殆どないとみなせるくらいの入力ベクトルの移動がなくなる(収束)時をもってグループ分けの最終結果とします。

K-means が収束することや、および K-means 目的関数

\[ J = \sum^K_{i=1} \sum^{m_i}_{j=1} \left|\boldsymbol{x}^{(i)}_j - {\boldsymbol \mu}_i \right| \]

が最小とならずに極小になる場合がある(その場合クラスタリングが正しく行われていない可能性がある)ことに関しては MITの講義資料 が参考になります。

TF-IDF抽出

TF-IDF(term frequency-inverse document frequency) はドキュメント中の単語の出現頻度に単語が現れるドキュメントの数の逆数を因子とする係数をかけてドキュメントに現れる単語ごとの重みとする方法です。

各ドキュメントは全データ中の単語の数に相当する次元を有し、各ドキュメントにおける単語の重み付けは t を単語、d をドキュメント n を単語の頻度として、次のTFと

\[ TF(t, d) = \frac{n(t, d)}{\sum_d n(t, d)} \]

N を ドキュメント数、df を単語の現れるドキュメント数として、次のIDF

\[ IDF(t) = \log{\frac{N}{ {df}(t) } } \]

を用いれば以下のように書き表せます。

\[ TF(t, d) \cdot IDF(t) \]

理論的な前置きはこれくらいにしてライブラリを動かしてみます。

実行環境

導入には

  • sbt 0.13.8
  • scala 2.11.7
  • java 1.8

を用い、sbtプロジェクトとしてIntelliJの新規プロジェクトを生成し、名前をSparkKmeansStudyとしました。sbtとIntelliJの連携についてはQiitaの記事を参考にしました。

Spark MLLib

Apache Spark はスケーラブルなデータ分散処理の為のエンジンですが、今回はこの中の機械学習フレームワーク MLLib を用いてローカルで K-means クラスタリングを実行しました。

下記を build.sbt に記入し、IntelliJ のライブラリ同期機能を用いてライブラリをインポートしました。

scalaVersion := "2.11.7"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.4.0",
  "org.apache.spark" %% "spark-mllib" % "1.4.0"
)

サンプルデータ

ロイター通信がテキスト分類システムデータ・セットを開発するために用いたReuters-21578 を使用しました。

SparkKmeansStudy/src/main/resources/reuter ディレクトリ直下で下記コマンドを叩いてデータをダウンロードしました。

curl http://www.daviddlewis.com/resources/testcollections/reuters21578/reuters21578.tar.gz | tar -xz

取得されたテキストに対して、今回は分散環境でクラスタリングを行わない関係上、reut2-000.sgm のみを対象にしてテキスト分類を行いました。そのため、それ以外のsgmファイルを一旦ディレクトリから削除します。

ls | grep -v -E "reut2-000.sgm" | xargs rm -r

build.sbtに追加でApache Luceneを導入します。

libraryDependencies += "org.apache.lucene" % "lucene-benchmark" % "3.5.0"

これは全文検索のためのライブラリで、ロイター通信サンプルに特化したテキスト抽出のためのクラスを今回は用います。

プロジェクトの一番上の階層に戻り、下記コマンドを実行し、SGML形式のファイル(XMLと似たファイル形式)からテキストを抽出します。

sbt "run-main org.apache.lucene.benchmark.utils.ExtractReuters src/main/resources/reuter/ src/main/resources/reuter/extracted/"

サンプルデータは今回、このextractedフォルダに展開された1000個のドキュメントファイルとしました。

サンプルコード

今回動かしたサンプルコードです。

import org.apache.spark.SparkContext
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.feature.HashingTF
import org.apache.spark.mllib.feature.IDF

object KmeansSample extends App {
  val context = new SparkContext("local", "reuter-demo") // --(1)

  val stopwords = List(
    "a", "an", "the", "who", "what", "are", "is", "was", "he", "she", "of",
    "to", "and", "in", "said", "for", "The", "on", "it", "with", "has", "be",
    "will", "had", "this", "that", "by", "as", "not", "from", "at", "its",
    "they", "were", "would", "have", "or", "we", "his", "him", "her") // --(2)

  val documents = context
    .wholeTextFiles("src/main/resources/reuter/extracted/")
    .map{ _._2 }
    .map{ text => text.split("\n\n").last }
    .map{ _.split("\\s+").toSeq.filter{ word => !stopwords.contains(word) } } // --(3)

  val hashingTF = new HashingTF()
  val tf = hashingTF.transform(documents)

  val idf = new IDF().fit(tf)
  val tfidf = idf.transform(tf) // --(4)

  val k = 18
  val maxIterations = 50
  val kmeansModel = KMeans.train(tfidf, k, maxIterations) // --(5)

  val docsTable =
  (kmeansModel.predict(tfidf) zip documents).groupBy {
    case (clusterId: Int, _) => clusterId
  }.sortBy {
    case (clusterId: Int, pairs: Iterable[(Int, Seq[String])]) => clusterId
  }.map {
    case (clusterId: Int, pairs: Iterable[(Int, Seq[String])]) => (clusterId, pairs.map{ _._2 } )
  } // --(6)

  val wordsRanking =
  docsTable.map {
    case (clusterId: Int, docs: Iterable[Seq[String]]) => (clusterId, docs.flatten)
  }.map {
    case (clusterId: Int, words: Iterable[String]) =>
      var wordsMap = Map[String, Int]()
      words.foreach { word =>
        val oldCount = wordsMap.getOrElse(key = word, default = 0)
        wordsMap += (word -> (oldCount + 1))
      }
      (clusterId, wordsMap.toSeq.sortBy{ _._2 }(Ordering[Int].reverse))
  }.map {
    case (clusterId: Int, wordsTable: Seq[(String, Int)]) => (clusterId, wordsTable.take(10))
  } // --(7)

  wordsRanking.foreach {
    case (clusterId: Int, wordsTable: Seq[(String, Int)]) =>
      val wordCountString = wordsTable.map {
        case (word, count) => s"$word($count)"
      }.reduceLeft {
        (acc: String, elem: String) => acc + ", " + elem
      }
      println(s"ClusterId : $clusterId, Top words : $wordCountString")
  } // --(8)
}

コメントで番号がふられた部分について見ていきます。

--(1)

分散処理可能なコレクションである RDD を生成するための SparkContext を生成します。コンストラクタ第一引数にはmasterノード接続URL、第二引数はアプリケーション名を指定しています。

--(2)

ドキュメント中に現れるストップワード(一般的な英語の文章における頻出単語)を予め取り除くため、それをリストとして生成しています。 TF-IDF では頻出する単語は重み付けが小さくなりますが、事前にやれる処理はやっておきます。

--(3)

SparkContext のインスタンスから wholeTextFiles メソッドを呼び出し、引数で指定されたディレクトリ配下のテキストファイルを RDD に読み込みます。wholeTextFiles メソッドは(ファイル名, ファイル内容テキスト)のタプルを RDD にくるんで返します。

その後、テキストを改行で区切って最後の要素をとっています。これは対象ファイル一つ一つが

26-FEB-1987 15:14:42.83

RED LION INNS FILES PLANS OFFERING

Red Lion Inns Limited Partnership said it filed a registration statement ...

のような形式のテキストだからです。

最後にテキストをホワイトスペースで区切り、ストップワードを除外しています。

--(4)

--(3)から --(4)の間はドキュメントから TF-IDF で重み付けをしたベクトルを抽出しています。

--(5)

K-means 法によってクラスタリングを行っています。train メソッドでは第一引数にベクトルの RDD を、第二引数にクラスタリングするグループ数を、第三引数に K-means を繰り返し行う上限回数を指定します。結果はKmeansModelクラスのインスタンスとして得られ、そこからベクトルを入力するとどのグループに属しているか等を取得できます。

--(6)

クラスタリングした結果をもとにサンプルドキュメント群をクラスタごとに分けています。

--(7)

各グループごとにトップ10の頻出単語を算出し、単語と頻度数のペアを抜き出しています。

--(8)

頻出単語をグループごとに出力しています。

=====

尚、Java1.8からビルドオプションが変更され、MaxPermSize が廃止され、MetaspaceSize を大きめに設定すると Out of memory しにくくなります。

-XX:MaxMetaspaceSize=4096m

結果

このサンプルコードを実行すると以下の様な結果を得ます。

ClusterId : 0, Top words : mln(1205), (908), dlrs(787), vs(752), pct(722), said.(713), Reuter(662), cts(476), billion(473), U.S.(355)
ClusterId : 1, Top words : --(93), 1.50(5), Gulf(4), 2.04(4), 2.98(3), 3.70(3), 3.16(3), 1.73(3), 3.12(3), 1.56(3)
ClusterId : 2, Top words : crush(8), soybean(6), over(5), March(5), seen(4), said.(4), processors(4), just(4), few(4), down(3)
ClusterId : 3, Top words : Peladeau(13), newspaper(7), new(7), paper(7), two(6), daily(6), He(6), Quebecor(6), said.(5), mln(5)
ClusterId : 4, Top words : mln(20), CRA(11), dlrs(9), said.(7), against(7), net(6), 1986(5), earnings(4), tax(4), dividend(4)
ClusterId : 5, Top words : total(13), prev(8), Feb(8), Apr(7), Mar(6), registrations(4), May(4), nil,(4), Soybean(3), February(2)
ClusterId : 6, Top words : dlrs(10), management(8), Viacom(7), pct(6), stock(5), Redstone(5), company(5), plan(5), owns(4), television(4)
ClusterId : 7, Top words : U.S.(17), China(12), trade(9), which(8), GATT(7), Chinese(6), said.(5), exports(4), China's(4), last(4)
ClusterId : 8, Top words : Chinese(5), said.(4), tonnes(4), prices(3), soybeans(3), shipment,(3), sources(3), Japanese(3), down(2), beans(2)
ClusterId : 9, Top words : American(22), Medical(17), Pesch(9), Bass(7), takeover(6), could(6), offer(6), family(6), Wedge(5), much(5)
ClusterId : 10, Top words : Japanese(11), debt(7), bank(7), Nigeria(6), banks(6), said.(5), about(5), sources(5), creditors(3), banking(3)
ClusterId : 11, Top words : Funaro(10), Brazil(10), billion(8), said.(6), Brazil's(6), banks,(5), trade(5), last(5), commercial(5), talk(4)
ClusterId : 12, Top words : minority(3), women(2), hearing(2), tremendous(2), progress(2), United(2), more(2), made(2), white(2), resources,(1)
ClusterId : 13, Top words : nil(36), 7,500(13), 1987(8), QUOTA(5), IMPORTS(5), ALLOCATIONS(5), 10,920(4), quota(3), 10,010(3), HONDURAS(1)
ClusterId : 14, Top words : Cantrex(3), receive(2), merger(2), approval.(1), (1), 400(1), entitling(1), subsidiary(1), subject(1), appliance(1)
ClusterId : 15, Top words : acquisition(2), health(2), Chemical(2), Minerals(2), International(2), Johnson(2), Pitman-Moore(2), 45(1), (1), feedstock(1)
ClusterId : 16, Top words : ban(4), DOT(3), tobacco(3), more(3), study(3), health(2), Academy(2), smoke(2), needed(2), domestic(2)
ClusterId : 17, Top words : Kuwait(8), OPEC(6), oil(6), official(5), bpd(4), last(4), mln(4), because(3), above(3), Ali(3)

クラスタリングに偏りが見られ、結果としてはあまり芳しくは無いです。

TF-IDF で得られたドキュメントのベクトルは話題に応じて向きは変わりますが、単語数の違いでその大きさも変わります。

Spark 公式で現在サポートされているユークリッド距離測度では単語数の違いに応じたドキュメントベクトル大きさの影響が大きく、ドキュメントの話題が似通っていても別のグループとして扱われてしまう場合が多いです。

このため TF-IDF にはユークリッド距離測度が向かず、むしろドキュメントベクトルの向きのみの違いによって距離を測るコサイン距離測度を使った方が良いです。

Spark をカスタマイズしてコサイン距離を導入するところまで行ければよかったのですが、今回はここまでにします。

参考