EMR上でZeppelinとSparkを使ってレコメンデーション

2015.11.21

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

AWS Big Data BlogBuilding a Recommendation Engine with Spark ML on Amazon EMR using Zeppelinというエントリーがあったので、そちらを実際に試してみたレポートになります。

内容としてはMovieLensのデータを使ってレコメンデーションエンジンを作るというものです。Spark Summit 2014にAdvanced Apache Spark Workshopというものがあったようで、そちらのMLlibのハンズオンをEMR上でZeppelinを使ってやってみるというものでした。

なお、Spark Summit 2014のハンズオンとAMP Camp 5 - big data bootcampをベースに日本語化したサイトがありました。今回のエントリーに対応するのは4. MLlib(機械学習)になります。詳細に説明されていましたので紹介しておきます。

用語の説明

MovieLens

MovieLensは映画のレコメンデーションシステムで運営母体であるGroupLensによってレーティング情報が公開されています。レコメンデーションシステムの題材としてよく利用されているようで、私が以前記述したEMR上でMahoutを使ってレコメンデーションというエントリーでもMovieLensを利用していました。

Apache SparkとMLlib

Apache Sparkは大量のデータを高速に処理するための汎用エンジンです。MLlibはそのSparkに含まれる機械学習のライブラリになります。

協調フィルタリング

協調フィルタリングは推薦システムに一般的に利用される手法です。MLlibはモデルベースの協調フィルタリングが実装されていて、その計算に交互最小二乗法(Alternating Least Squares、ALS)を利用しています。Spark MLlib でやってみる協調フィルタリング // Speaker Deckを見ると雰囲気がつかめるかと思います。

Zeppelin

Apache ZeppelinJupyter Notebook(IPython Notebook)と同じようなWebアプリケーションです。画面上でSparkのコードを記述すると実行することができ、その実行結果もまとめて残しておくことができます。処理を書いてその場で実行できるという意味では対話型シェルとも言えますし、その処理結果を残しておけるという意味ではドキュメントとも言えるものです。

EMR

Amazon Elastic Map Reduceは大量のデータを処理するためのWebサービスです。具体的にはマネージドなApache Hadoopクラスタを提供します。SparkやZeppelinはEMRによってHadoopクラスタ上に自動的にセットアップすることが可能です。

実際に試す際の要点

実際に試す際の要点は以下の通りです。なお、東京リージョンで試して動作することを確認しています。

  • MovieLensのデータは予めS3にアップロードしておく
  • EMRでSparkとZeppelinも同時にセットアップするようにしてHadoopクラスタを起動する
  • ZeppelinにはSSHトンネルを利用してアクセスする

MovieLensのデータは予めS3にアップロードしておく

SparkからS3上にあるデータを使ってレコメンデーションエンジンのモデルを作成することになるため、まずはGroupLensのサイトからMovieLensのデータをダウンロードしてS3にアップロードして下さい。MovieLensはデータサイズに応じて何種類か用意されていますので好みのサイズを利用して下さい。私は中規模ぐらいのml-10m.zipを利用しました。

ダウンロードしたらマネジメントコンソールなどを利用してS3にファイルをアップロードして下さい。movies.datratings.datの2ファイルをアップロードすることになります。私はS3バケットの直下にmovieLensというフォルダを作成し、その中に2つのファイルをアップロードしました。

building-a-recommendation-engine-with-spark-ml-on-amazon-emr-using-zeppelin2

項目 設定値 補足
S3のバケット名 YOUR_BUCKET_NAME 任意
S3で利用するフォルダ movieLens 任意。今回はmovieLensとした
movies.datのパス YOUR_BUCKET_NAME/movieLens/movies.dat
ratings.datのパス YOUR_BUCKET_NAME/movieLens/ratings.dat

EMRでSparkとZeppelinも同時にセットアップするようにしてHadoopクラスタを起動する

EMRを使ってHadoopクラスタを構築します。マネジメントコンソールから起動するのでEMRサービスの画面を開き[Create cluster]ボタンをクリックして下さい。

まずは[Go to advanced options]をクリックして下さい。設定値は以下の通りです。記述していないものはデフォルト値を利用しています。設定し終えたら[Create cluster]をクリックしてクラスターを構築して下さい。10分ほどすればクラスターの構築が完了します(ステータスがWaitingになれば構築完了です)。

項目 設定値 補足
Cluster name SparkML 任意
Hadoop distribution emr-4.1.0 emr-4.2.0は未確認
Applications Hadoop, Hive, Zeppelin-Sandbox, Spark Zeppelin-Sandbox, Sparkを追加しPigとHueを削除。PigとHueは削除しなくてもいいがセットアップが早くなるので削除した
Change settings [{"classification":"spark-defaults", "properties":{"spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.dynamicAllocation.enabled":"true"}, "configurations":[]}] [Edit software settings]をクリックしてテキストエリアに入力する。設定すると処理が高速化するが必須ではない
EC2 instance type r3.xlarge Master, Coreの両方。Taskは利用しないので変えなくても問題ない
EC2 key pair YOUR_KEY_PAIR 任意

building-a-recommendation-engine-with-spark-ml-on-amazon-emr-using-zeppelin3

building-a-recommendation-engine-with-spark-ml-on-amazon-emr-using-zeppelin4

ZeppelinにはSSHトンネルを利用してアクセスする

EMRによって構築されるHadoopクラスタはAWS上に存在するのでSSHトンネルを作成してZeppelinにアクセスします。ローカルのポート番号は任意です。ここでは8157としています。

ssh -i /path/to/YOUR_KEY_PAIR -N -L 8157:MASTER_PUBLIC_DNS:8890 hadoop@MASTER_PUBLIC_DNS

MASTER_PUBLIC_DNSはマネジメントコンソール上で確認できます。ec2-XXX-XXXX-XXX-XXX.ap-northeast-1.compute.amazonaws.comの箇所です。 building-a-recommendation-engine-with-spark-ml-on-amazon-emr-using-zeppelin5

SSHトンネルができたらブラウザのURLにhttp://localhost:8157/と入力して下さい。Zeppelinのトップページが表示されるはずです。 building-a-recommendation-engine-with-spark-ml-on-amazon-emr-using-zeppelin6

レコメンデーションエンジン作成

Zeppelinにも接続できたので、実際にレコメンデーションエンジンを作成したいと思います。これからソースコードを示しますが、変更する箇所はS3のバケット名であるYOUR_BUCKET_NAMEの箇所だけです。合計で3回YOUR_BUCKET_NAMEが登場します。最初のS3からのデータのロードと最後のモデルをS3に保存する箇所です。該当するソースコードの解説箇所で改めて変更が必要なことを記述しています。

Notebookの作成

レコメンデーションエンジン用のNotebookを作ります。

  1. Zeppelinの画面上部にある[Notebook]をクリックして[Create new note]をクリックして下さい。[Note Name]は任意です。例えばRecommenderとして下さい。
  2. Notebookができるので改めて[Notebook]をクリックして1. で作成したNotebookをクリックして下さい。レコメンデーションエンジン用のNotebookの画面が表示されます。

building-a-recommendation-engine-with-spark-ml-on-amazon-emr-using-zeppelin7

利用するライブラリのimport

まずは今回のレコメンデーションエンジンに必要なライブラリをimportします。テキストエリアに以下を入力して右上の実行ボタン[▷]をクリックして下さい。

import java.io.File
import scala.io.Source

import org.apache.log4j.Logger
import org.apache.log4j.Level

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd._
import org.apache.spark.mllib.recommendation.{ALS, Rating, MatrixFactorizationModel}

building-a-recommendation-engine-with-spark-ml-on-amazon-emr-using-zeppelin8

S3からデータのロード

S3からmovies.datratings.datのデータをロードします。最後にprintln関数で件数を出力することでデータが正しくロードされていることを確認します。movieLensHomeDir定数のYOUR_BUCKET_NAMEの箇所は自身の環境に合わせて変更して下さい。

val movieLensHomeDir = "s3n://YOUR_BUCKET_NAME/movieLens/"

val movies = sc.textFile(movieLensHomeDir + "movies.dat").map { line =>
  val fields = line.split("::")
  // format: (movieId, movieName)
  (fields(0).toInt, fields(1))
}.collect.toMap

val ratings = sc.textFile(movieLensHomeDir + "ratings.dat").map { line =>
  val fields = line.split("::")
  // format: (timestamp % 10, Rating(userId, movieId, rating))
  (fields(3).toLong % 10, Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble))
}

val numRatings = ratings.count
val numUsers = ratings.map(_._2.user).distinct.count
val numMovies = ratings.map(_._2.product).distinct.count
println("Got " + numRatings + " ratings from "
  + numUsers + " users on " + numMovies + " movies.")

building-a-recommendation-engine-with-spark-ml-on-amazon-emr-using-zeppelin9

学習、評価、テスト用データの作成

レーティング情報を以下のようにタイムスタンプの下1桁を使って分割します。

  • 学習(training)60%
  • 評価(validation)20%
  • テスト(test)20%
val training = ratings.filter(x => x._1 < 6)
  .values
  .cache()
val validation = ratings.filter(x => x._1 >= 6 && x._1 < 8)
  .values
  .cache()
val test = ratings.filter(x => x._1 >= 8).values.cache()

val numTraining = training.count()
val numValidation = validation.count()
val numTest = test.count()

println("Training: " + numTraining + ", validation: " + numValidation + ", test: " + numTest)

building-a-recommendation-engine-with-spark-ml-on-amazon-emr-using-zeppelin10

モデルの作成

まずレコメンデーションエンジンの評価用にRMSE(Root Mean Squared Error)を計算する関数computeRmseを定義しておきます。そしてALS.trainメソッドで学習データを使ってモデルを作成します。この際に、パラメーターであるrank, iterations, lambdaについてはそれぞれ2種類用意して、計8通りのパターンについてモデルを作成し、評価データを使って一番RMSEが小さいモデルを採用するようにしています。なお、8回モデルを作成するため数分かかります。

/** Compute RMSE (Root Mean Squared Error). */
def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating], n: Long): Double = {
    val predictions: RDD[Rating] = model.predict(data.map(x => (x.user, x.product)))
    val predictionsAndRatings = predictions.map(x => ((x.user, x.product), x.rating))
    .join(data.map(x => ((x.user, x.product), x.rating))).values
    math.sqrt(predictionsAndRatings.map(x => (x._1 - x._2) * (x._1 - x._2)).reduce(_ + _) / n)
}

val ranks = List(8, 12)
val lambdas = List(0.1, 10.0)
val numIters = List(10, 20)
var bestModel: Option[MatrixFactorizationModel] = None
var bestValidationRmse = Double.MaxValue
var bestRank = 0
var bestLambda = -1.0
var bestNumIter = -1
for (rank <- ranks; lambda <- lambdas; numIter <- numIters) {
  val model = ALS.train(training, rank, numIter, lambda)
  val validationRmse = computeRmse(model, validation, numValidation)
  println("RMSE (validation) = " + validationRmse + " for the model trained with rank = " 
    + rank + ", lambda = " + lambda + ", and numIter = " + numIter + ".")
  if (validationRmse < bestValidationRmse) {
    bestModel = Some(model)
    bestValidationRmse = validationRmse
    bestRank = rank
    bestLambda = lambda
    bestNumIter = numIter
  }
}

building-a-recommendation-engine-with-spark-ml-on-amazon-emr-using-zeppelin11

モデルのテスト

テストデータのRMSEを確認します。学習データと評価データの単なる平均のレーティングと比べると23%改善していました。

// evaluate the best model on the test set
val testRmse = computeRmse(bestModel.get, test, numTest)
println("The best model was trained with rank = " + bestRank + " and lambda = " + bestLambda
  + ", and numIter = " + bestNumIter + ", and its RMSE on the test set is" + testRmse + ".")

// create a naive baseline and compare it with the best model
val meanRating = training.union(validation).map(_.rating).mean
val baselineRmse = 
  math.sqrt(test.map(x => (meanRating - x.rating) * (meanRating - x.rating)).mean)
val improvement = (baselineRmse - testRmse) / baselineRmse * 100
println("The best model improves the baseline by " + "%1.2f".format(improvement) + "%.")

building-a-recommendation-engine-with-spark-ml-on-amazon-emr-using-zeppelin12

モデルを使って実際にレコメンデーションする

モデルを使って実際にレコメンデーションしてみます。ここではuser_id = 100のユーザーに対するレコメンデーションを行います。

val candidates = sc.parallelize(movies.keys.toSeq)
val recommendations = bestModel.get
  .predict(candidates.map((100, _)))
  .collect()
  .sortBy(- _.rating)
  .take(10)

var i = 1
println("Movies recommended for you:")
recommendations.foreach { r =>
  println("%2d".format(i) + ": " + movies(r.product))
  i += 1
}

building-a-recommendation-engine-with-spark-ml-on-amazon-emr-using-zeppelin13

ジャンルをComedyに限定したレコメンデーション

先程は全映画からTop10の映画をレコメンデーションしましたが、今度はジャンルをComedyに限定してTop5のレコメンデーションを行います。

val moviesWithGenres = sc.textFile(movieLensHomeDir + "movies.dat").map { line =>
  val fields = line.split("::")
  // format: (movieId, movieName, genre information)
  (fields(0).toInt, fields(2))
}.collect.toMap
val comedyMovies = moviesWithGenres.filter(_._2.matches(".*Comedy.*")).keys
val candidates = sc.parallelize(comedyMovies.toSeq)
val recommendations = bestModel.get
  .predict(candidates.map((100, _)))
  .collect()
  .sortBy(- _.rating)
  .take(5)

var i = 1
println("Comedy Movies recommended for you:")
recommendations.foreach { r =>
  println("%2d".format(i) + ": " + movies(r.product))
  i += 1
}

building-a-recommendation-engine-with-spark-ml-on-amazon-emr-using-zeppelin14

モデルのS3への保存

今回作成したレコメンデーションのモデルをS3に保存します。こうすることで後で再利用することが出来ます。なお、YOUR_BUCKET_NAMEの箇所は自身の環境に合わせて変更して下さい。

// Save and load model
bestModel.get.save(sc, "s3n://YOUR_BUCKET_NAME/movieLens/model/recommendation")
val sameModel = MatrixFactorizationModel.load(sc,  "s3n://YOUR_BUCKET_NAME/movieLens/model/recommendation")

building-a-recommendation-engine-with-spark-ml-on-amazon-emr-using-zeppelin15

このコードを実行すると実際にS3にモデルが保存されていることを確認できます。 building-a-recommendation-engine-with-spark-ml-on-amazon-emr-using-zeppelin16

まとめ

いかがだったでしょうか。Zeppelinを利用することでローカルのブラウザでAWS上のSparkを操作できるのはかなり便利かと思います。また、MLlibの利用方法についても雰囲気を掴んで頂けたのではないかと思います。簡単に試せますので、ぜひ一度お試し下さい。