EMR上でZeppelinとSparkを使ってレコメンデーション
AWS Big Data Blogで Building a Recommendation Engine with Spark ML on Amazon EMR using Zeppelinというエントリーがあったので、そちらを実際に試してみたレポートになります。
内容としてはMovieLensのデータを使ってレコメンデーションエンジンを作るというものです。Spark Summit 2014にAdvanced Apache Spark Workshopというものがあったようで、そちらのMLlibのハンズオンをEMR上でZeppelinを使ってやってみるというものでした。
なお、Spark Summit 2014のハンズオンとAMP Camp 5 - big data bootcampをベースに日本語化したサイトがありました。今回のエントリーに対応するのは4. MLlib(機械学習)になります。詳細に説明されていましたので紹介しておきます。
用語の説明
MovieLens
MovieLensは映画のレコメンデーションシステムで運営母体であるGroupLensによってレーティング情報が公開されています。レコメンデーションシステムの題材としてよく利用されているようで、私が以前記述したEMR上でMahoutを使ってレコメンデーションというエントリーでもMovieLensを利用していました。
Apache SparkとMLlib
Apache Sparkは大量のデータを高速に処理するための汎用エンジンです。MLlibはそのSparkに含まれる機械学習のライブラリになります。
協調フィルタリング
協調フィルタリングは推薦システムに一般的に利用される手法です。MLlibはモデルベースの協調フィルタリングが実装されていて、その計算に交互最小二乗法(Alternating Least Squares、ALS)を利用しています。Spark MLlib でやってみる協調フィルタリング // Speaker Deckを見ると雰囲気がつかめるかと思います。
Zeppelin
Apache ZeppelinはJupyter Notebook(IPython Notebook)と同じようなWebアプリケーションです。画面上でSparkのコードを記述すると実行することができ、その実行結果もまとめて残しておくことができます。処理を書いてその場で実行できるという意味では対話型シェルとも言えますし、その処理結果を残しておけるという意味ではドキュメントとも言えるものです。
EMR
Amazon Elastic Map Reduceは大量のデータを処理するためのWebサービスです。具体的にはマネージドなApache Hadoopクラスタを提供します。SparkやZeppelinはEMRによってHadoopクラスタ上に自動的にセットアップすることが可能です。
実際に試す際の要点
実際に試す際の要点は以下の通りです。なお、東京リージョンで試して動作することを確認しています。
- MovieLensのデータは予めS3にアップロードしておく
- EMRでSparkとZeppelinも同時にセットアップするようにしてHadoopクラスタを起動する
- ZeppelinにはSSHトンネルを利用してアクセスする
MovieLensのデータは予めS3にアップロードしておく
SparkからS3上にあるデータを使ってレコメンデーションエンジンのモデルを作成することになるため、まずはGroupLensのサイトからMovieLensのデータをダウンロードしてS3にアップロードして下さい。MovieLensはデータサイズに応じて何種類か用意されていますので好みのサイズを利用して下さい。私は中規模ぐらいのml-10m.zipを利用しました。
ダウンロードしたらマネジメントコンソールなどを利用してS3にファイルをアップロードして下さい。movies.datとratings.datの2ファイルをアップロードすることになります。私はS3バケットの直下にmovieLensというフォルダを作成し、その中に2つのファイルをアップロードしました。
項目 | 設定値 | 補足 |
---|---|---|
S3のバケット名 | YOUR_BUCKET_NAME | 任意 |
S3で利用するフォルダ | movieLens | 任意。今回はmovieLensとした |
movies.datのパス | YOUR_BUCKET_NAME/movieLens/movies.dat | |
ratings.datのパス | YOUR_BUCKET_NAME/movieLens/ratings.dat |
EMRでSparkとZeppelinも同時にセットアップするようにしてHadoopクラスタを起動する
EMRを使ってHadoopクラスタを構築します。マネジメントコンソールから起動するのでEMRサービスの画面を開き[Create cluster]ボタンをクリックして下さい。
まずは[Go to advanced options]をクリックして下さい。設定値は以下の通りです。記述していないものはデフォルト値を利用しています。設定し終えたら[Create cluster]をクリックしてクラスターを構築して下さい。10分ほどすればクラスターの構築が完了します(ステータスがWaitingになれば構築完了です)。
項目 | 設定値 | 補足 |
---|---|---|
Cluster name | SparkML | 任意 |
Hadoop distribution | emr-4.1.0 | emr-4.2.0は未確認 |
Applications | Hadoop, Hive, Zeppelin-Sandbox, Spark | Zeppelin-Sandbox, Sparkを追加しPigとHueを削除。PigとHueは削除しなくてもいいがセットアップが早くなるので削除した |
Change settings | [{"classification":"spark-defaults", "properties":{"spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.dynamicAllocation.enabled":"true"}, "configurations":[]}] | [Edit software settings]をクリックしてテキストエリアに入力する。設定すると処理が高速化するが必須ではない |
EC2 instance type | r3.xlarge | Master, Coreの両方。Taskは利用しないので変えなくても問題ない |
EC2 key pair | YOUR_KEY_PAIR | 任意 |
ZeppelinにはSSHトンネルを利用してアクセスする
EMRによって構築されるHadoopクラスタはAWS上に存在するのでSSHトンネルを作成してZeppelinにアクセスします。ローカルのポート番号は任意です。ここでは8157としています。
ssh -i /path/to/YOUR_KEY_PAIR -N -L 8157:MASTER_PUBLIC_DNS:8890 hadoop@MASTER_PUBLIC_DNS
MASTER_PUBLIC_DNSはマネジメントコンソール上で確認できます。ec2-XXX-XXXX-XXX-XXX.ap-northeast-1.compute.amazonaws.comの箇所です。
SSHトンネルができたらブラウザのURLにhttp://localhost:8157/と入力して下さい。Zeppelinのトップページが表示されるはずです。
レコメンデーションエンジン作成
Zeppelinにも接続できたので、実際にレコメンデーションエンジンを作成したいと思います。これからソースコードを示しますが、変更する箇所はS3のバケット名であるYOUR_BUCKET_NAMEの箇所だけです。合計で3回YOUR_BUCKET_NAMEが登場します。最初のS3からのデータのロードと最後のモデルをS3に保存する箇所です。該当するソースコードの解説箇所で改めて変更が必要なことを記述しています。
Notebookの作成
レコメンデーションエンジン用のNotebookを作ります。
- Zeppelinの画面上部にある[Notebook]をクリックして[Create new note]をクリックして下さい。[Note Name]は任意です。例えばRecommenderとして下さい。
- Notebookができるので改めて[Notebook]をクリックして1. で作成したNotebookをクリックして下さい。レコメンデーションエンジン用のNotebookの画面が表示されます。
利用するライブラリのimport
まずは今回のレコメンデーションエンジンに必要なライブラリをimportします。テキストエリアに以下を入力して右上の実行ボタン[▷]をクリックして下さい。
import java.io.File import scala.io.Source import org.apache.log4j.Logger import org.apache.log4j.Level import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.rdd._ import org.apache.spark.mllib.recommendation.{ALS, Rating, MatrixFactorizationModel}
S3からデータのロード
S3からmovies.datとratings.datのデータをロードします。最後にprintln関数で件数を出力することでデータが正しくロードされていることを確認します。movieLensHomeDir定数のYOUR_BUCKET_NAMEの箇所は自身の環境に合わせて変更して下さい。
val movieLensHomeDir = "s3n://YOUR_BUCKET_NAME/movieLens/" val movies = sc.textFile(movieLensHomeDir + "movies.dat").map { line => val fields = line.split("::") // format: (movieId, movieName) (fields(0).toInt, fields(1)) }.collect.toMap val ratings = sc.textFile(movieLensHomeDir + "ratings.dat").map { line => val fields = line.split("::") // format: (timestamp % 10, Rating(userId, movieId, rating)) (fields(3).toLong % 10, Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble)) } val numRatings = ratings.count val numUsers = ratings.map(_._2.user).distinct.count val numMovies = ratings.map(_._2.product).distinct.count println("Got " + numRatings + " ratings from " + numUsers + " users on " + numMovies + " movies.")
学習、評価、テスト用データの作成
レーティング情報を以下のようにタイムスタンプの下1桁を使って分割します。
- 学習(training)60%
- 評価(validation)20%
- テスト(test)20%
val training = ratings.filter(x => x._1 < 6) .values .cache() val validation = ratings.filter(x => x._1 >= 6 && x._1 < 8) .values .cache() val test = ratings.filter(x => x._1 >= 8).values.cache() val numTraining = training.count() val numValidation = validation.count() val numTest = test.count() println("Training: " + numTraining + ", validation: " + numValidation + ", test: " + numTest)
モデルの作成
まずレコメンデーションエンジンの評価用にRMSE(Root Mean Squared Error)を計算する関数computeRmseを定義しておきます。そしてALS.trainメソッドで学習データを使ってモデルを作成します。この際に、パラメーターであるrank, iterations, lambdaについてはそれぞれ2種類用意して、計8通りのパターンについてモデルを作成し、評価データを使って一番RMSEが小さいモデルを採用するようにしています。なお、8回モデルを作成するため数分かかります。
/** Compute RMSE (Root Mean Squared Error). */ def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating], n: Long): Double = { val predictions: RDD[Rating] = model.predict(data.map(x => (x.user, x.product))) val predictionsAndRatings = predictions.map(x => ((x.user, x.product), x.rating)) .join(data.map(x => ((x.user, x.product), x.rating))).values math.sqrt(predictionsAndRatings.map(x => (x._1 - x._2) * (x._1 - x._2)).reduce(_ + _) / n) } val ranks = List(8, 12) val lambdas = List(0.1, 10.0) val numIters = List(10, 20) var bestModel: Option[MatrixFactorizationModel] = None var bestValidationRmse = Double.MaxValue var bestRank = 0 var bestLambda = -1.0 var bestNumIter = -1 for (rank <- ranks; lambda <- lambdas; numIter <- numIters) { val model = ALS.train(training, rank, numIter, lambda) val validationRmse = computeRmse(model, validation, numValidation) println("RMSE (validation) = " + validationRmse + " for the model trained with rank = " + rank + ", lambda = " + lambda + ", and numIter = " + numIter + ".") if (validationRmse < bestValidationRmse) { bestModel = Some(model) bestValidationRmse = validationRmse bestRank = rank bestLambda = lambda bestNumIter = numIter } }
モデルのテスト
テストデータのRMSEを確認します。学習データと評価データの単なる平均のレーティングと比べると23%改善していました。
// evaluate the best model on the test set val testRmse = computeRmse(bestModel.get, test, numTest) println("The best model was trained with rank = " + bestRank + " and lambda = " + bestLambda + ", and numIter = " + bestNumIter + ", and its RMSE on the test set is" + testRmse + ".") // create a naive baseline and compare it with the best model val meanRating = training.union(validation).map(_.rating).mean val baselineRmse = math.sqrt(test.map(x => (meanRating - x.rating) * (meanRating - x.rating)).mean) val improvement = (baselineRmse - testRmse) / baselineRmse * 100 println("The best model improves the baseline by " + "%1.2f".format(improvement) + "%.")
モデルを使って実際にレコメンデーションする
モデルを使って実際にレコメンデーションしてみます。ここではuser_id = 100のユーザーに対するレコメンデーションを行います。
val candidates = sc.parallelize(movies.keys.toSeq) val recommendations = bestModel.get .predict(candidates.map((100, _))) .collect() .sortBy(- _.rating) .take(10) var i = 1 println("Movies recommended for you:") recommendations.foreach { r => println("%2d".format(i) + ": " + movies(r.product)) i += 1 }
ジャンルをComedyに限定したレコメンデーション
先程は全映画からTop10の映画をレコメンデーションしましたが、今度はジャンルをComedyに限定してTop5のレコメンデーションを行います。
val moviesWithGenres = sc.textFile(movieLensHomeDir + "movies.dat").map { line => val fields = line.split("::") // format: (movieId, movieName, genre information) (fields(0).toInt, fields(2)) }.collect.toMap val comedyMovies = moviesWithGenres.filter(_._2.matches(".*Comedy.*")).keys val candidates = sc.parallelize(comedyMovies.toSeq) val recommendations = bestModel.get .predict(candidates.map((100, _))) .collect() .sortBy(- _.rating) .take(5) var i = 1 println("Comedy Movies recommended for you:") recommendations.foreach { r => println("%2d".format(i) + ": " + movies(r.product)) i += 1 }
モデルのS3への保存
今回作成したレコメンデーションのモデルをS3に保存します。こうすることで後で再利用することが出来ます。なお、YOUR_BUCKET_NAMEの箇所は自身の環境に合わせて変更して下さい。
// Save and load model bestModel.get.save(sc, "s3n://YOUR_BUCKET_NAME/movieLens/model/recommendation") val sameModel = MatrixFactorizationModel.load(sc, "s3n://YOUR_BUCKET_NAME/movieLens/model/recommendation")
このコードを実行すると実際にS3にモデルが保存されていることを確認できます。
まとめ
いかがだったでしょうか。Zeppelinを利用することでローカルのブラウザでAWS上のSparkを操作できるのはかなり便利かと思います。また、MLlibの利用方法についても雰囲気を掴んで頂けたのではないかと思います。簡単に試せますので、ぜひ一度お試し下さい。