ちょっと話題の記事

EMR上でMahoutを使ってレコメンデーション

2014.09.05

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

7月にAWS Big Data Blogというブログが始まったのですが、最初の記事がBuilding a Recommender with Apache Mahout on Amazon Elastic MapReduce (EMR)というタイトルでEMR上でMahoutを使ってレコメンデーションを行ってみるというものでした。EMR上でMahoutというと既にAmazon Elastic MapReduce入門 〜 Apache Mahoutでレコメンデーション!というエントリーがありますが、こちらはAmazon EMR CLIを使っていることもあり、ブログにしてみました。

Building a Recommender with Apache Mahout on Amazon Elastic MapReduce (EMR)について

まず機械学習の概要について説明した上でMahoutを使ってレコメンデーションを行う方法の紹介という流れになっています。

  • 機械学習とは(Classification、Clustering、Recommenders)
  • Apache Mahoutとは
  • Mahoutを利用してRecommenderを作成
    • 映画の評価データ(MovieLens)に対してアイテムベース協調フィルタリングを使ってレコメンデーションデータを作成。類似度の計算にはコサイン類似度を使用
    • レコメンデーションデータを取得できる簡易的なWebサービスを作成

試してみた

サイトに書かれている通りの手順で実行できました。Amazon EMR CLIをインストール済みであれば30分ぐらいで試せます。

EMRクラスターの起動

まずはEMRクラスターを起動します。5分ぐらいで起動してMasterノードにSSHでログインできます。

[elastic-mapreduce-ruby]$ ./elastic-mapreduce --create --alive --name mahout-tutorial --num-instances 4 --master-instance-type m1.xlarge --slave-instance-type m2.2xlarge --ami-version 3.1 --ssh
Created job flow j-1H6YH0O5YCAEJ
2014-09-05 02:42:17 UTC INFO Jobflow is in state STARTING, waiting....
2014-09-05 02:42:48 UTC INFO Jobflow is in state STARTING, waiting....
2014-09-05 02:43:19 UTC INFO Jobflow is in state STARTING, waiting....
2014-09-05 02:43:49 UTC INFO Jobflow is in state STARTING, waiting....
2014-09-05 02:44:20 UTC INFO Jobflow is in state STARTING, waiting....
2014-09-05 02:44:50 UTC INFO Jobflow is in state STARTING, waiting....
2014-09-05 02:45:22 UTC INFO Jobflow is in state STARTING, waiting....
2014-09-05 02:45:52 UTC INFO Jobflow is in state STARTING, waiting....
2014-09-05 02:46:23 UTC INFO Jobflow is in state STARTING, waiting....
2014-09-05 02:46:53 UTC INFO Jobflow is in state STARTING, waiting....
2014-09-05 02:47:24 UTC INFO Jobflow is in state STARTING, waiting....
ssh -o ServerAliveInterval=10 -o StrictHostKeyChecking=no -i XXXX.pem hadoop@ec2-54-64-25-XXX.ap-northeast-1.compute.amazonaws.com
Warning: Permanently added 'ec2-54-64-25-XXX.ap-northeast-1.compute.amazonaws.com,54.64.25.XXX' (RSA) to the list of known hosts.
Last login: Fri Sep  5 02:46:42 2014

       __|  __|_  )
       _|  (     /   Amazon Linux AMI
      ___|\___|___|

https://aws.amazon.com/amazon-linux-ami/2014.03-release-notes/
7 package(s) needed for security, out of 21 available
Run "sudo yum update" to apply all updates.

--------------------------------------------------------------------------------

Welcome to Amazon Elastic MapReduce running Hadoop and Amazon Linux.

Hadoop is installed in /home/hadoop. Log files are in /mnt/var/log/hadoop. Check
/mnt/var/log/hadoop/steps for diagnosing step failures.

The Hadoop UI can be accessed via the following commands:

  ResourceManager    lynx http://ip-172-31-31-52.ap-northeast-1.compute.internal:9026/
  NameNode           lynx http://ip-172-31-31-52.ap-northeast-1.compute.internal:9101/

--------------------------------------------------------------------------------


[hadoop@ip-172-31-31-52 ~]$

Mahoutの実行

ではMasterノード上でMahoutを実行してレコメンデーションデータを作成します。MovieLensという英語の評価データが公開されていて、この評価データからユーザー毎におすすめの映画を10個選出するというのが今回の作業内容です。

まずは処理対象となるMovieLensのデータをwgetコマンドでMasterノード上にダウンロードします。

[hadoop@ip-172-31-31-52 ~]$ wget http://files.grouplens.org/datasets/movielens/ml-1m.zip

--2014-09-05 03:42:57--  http://files.grouplens.org/datasets/movielens/ml-1m.zip
files.grouplens.org (files.grouplens.org) をDNSに問いあわせています... 128.101.34.146
files.grouplens.org (files.grouplens.org)|128.101.34.146|:80 に接続しています... 接続しました。
HTTP による接続要求を送信しました、応答を待っています... 200 OK
長さ: 6008687 (5.7M) [application/zip]
`ml-1m.zip' に保存中

100%[===================================================================================================================================================>] 6,008,687   2.32MB/s 時間 2.5s

2014-09-05 03:43:00 (2.32 MB/s) - `ml-1m.zip' へ保存完了 [6008687/6008687]

[hadoop@ip-172-31-31-52 ~]$ unzip ml-1m.zip
Archive:  ml-1m.zip
   creating: ml-1m/
  inflating: ml-1m/movies.dat
  inflating: ml-1m/ratings.dat
  inflating: ml-1m/README
   creating: __MACOSX/
   creating: __MACOSX/ml-1m/
  inflating: __MACOSX/ml-1m/._README
  inflating: ml-1m/users.dat

Mahoutで処理できるようにファイルのフォーマットをCSV形式に変換します。

[hadoop@ip-172-31-31-52 ~]$ cat ml-1m/ratings.dat | sed 's/::/,/g' | cut -f1-3 -d, > ratings.csv

ちなみに、ratings.csvの中身は以下のようになっています。先頭からUserID、MovieID、Ratingとなっています。このデータを元にUserID毎におすすめのMovieIDを10個を選出するのが今回の処理内容です。

[hadoop@ip-172-31-31-52 ~]$ head ratings.csv
1,1193,5
1,661,3
1,914,3
1,3408,4
1,2355,5
1,1197,3
1,1287,5
1,2804,5
1,594,4
1,919,4

Mahoutの入力データとして利用できるようにhadoop fs -putコマンドでMasterノード上のratings.csvをHDFS上にアップロードします。実際にアップロードされているかはhadoop fs -lsコマンドで確認できます。

[hadoop@ip-172-31-31-52 ~]$ hadoop fs -put ratings.csv /ratings.csv
[hadoop@ip-172-31-31-52 ~]$ hadoop fs -ls /ratings.csv
-rw-r--r--   1 hadoop supergroup   11553456 2014-09-05 03:45 /ratings.csv

ではMahoutを実行してレコメンデーションデータを作ります。10分ほどかかります。ちなみに、MapReduceのジョブは10個実行されます。

[hadoop@ip-172-31-31-52 ~]$ mahout recommenditembased --input /ratings.csv --output recommendations --numRecommendations 10 --outputPathForSimilarityMatrix similarity-matrix --similarityClassname SIMILARITY_COSINE
MAHOUT_LOCAL is not set; adding HADOOP_CONF_DIR to classpath.

Running on hadoop, using /home/hadoop/bin/hadoop and HADOOP_CONF_DIR=/home/hadoop/conf
MAHOUT-JOB: /home/hadoop/mahout/mahout-examples-0.9-job.jar
14/09/05 03:45:44 INFO common.AbstractJob: Command line arguments: {--booleanData=[false], --endPhase=[2147483647], --input=[/ratings.csv], --maxPrefsInItemSimilarity=[500], --maxPrefsPerUser=[10], --maxSimilaritiesPerItem=[100], --minPrefsPerUser=[1], --numRecommendations=[10], --output=[recommendations], --outputPathForSimilarityMatrix=[similarity-matrix], --similarityClassname=[SIMILARITY_COSINE], --startPhase=[0], --tempDir=[temp]}
14/09/05 03:45:44 INFO common.AbstractJob: Command line arguments: {--booleanData=[false], --endPhase=[2147483647], --input=[/ratings.csv], --minPrefsPerUser=[1], --output=[temp/preparePreferenceMatrix], --ratingShift=[0.0], --startPhase=[0], --tempDir=[temp]}
14/09/05 03:45:45 INFO Configuration.deprecation: mapred.input.dir is deprecated. Instead, use mapreduce.input.fileinputformat.inputdir
14/09/05 03:45:45 INFO Configuration.deprecation: mapred.compress.map.output is deprecated. Instead, use mapreduce.map.output.compress
14/09/05 03:45:45 INFO Configuration.deprecation: mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir
14/09/05 03:45:45 INFO client.RMProxy: Connecting to ResourceManager at /172.31.31.52:9022
14/09/05 03:45:48 INFO input.FileInputFormat: Total input paths to process : 1
14/09/05 03:45:48 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library from the embedded binaries
14/09/05 03:45:48 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev 77cfa96225d62546008ca339b7c2076a3da91578]
14/09/05 03:45:48 INFO mapreduce.JobSubmitter: number of splits:1
14/09/05 03:45:49 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1409885209108_0001
14/09/05 03:45:49 INFO impl.YarnClientImpl: Submitted application application_1409885209108_0001
14/09/05 03:45:49 INFO mapreduce.Job: The url to track the job: http://172.31.31.52:9046/proxy/application_1409885209108_0001/
14/09/05 03:45:49 INFO mapreduce.Job: Running job: job_1409885209108_0001
14/09/05 03:45:58 INFO mapreduce.Job: Job job_1409885209108_0001 running in uber mode : false
14/09/05 03:45:58 INFO mapreduce.Job:  map 0% reduce 0%
14/09/05 03:46:10 INFO mapreduce.Job:  map 100% reduce 0%
14/09/05 03:46:21 INFO mapreduce.Job:  map 100% reduce 6%
14/09/05 03:46:23 INFO mapreduce.Job:  map 100% reduce 9%
14/09/05 03:46:25 INFO mapreduce.Job:  map 100% reduce 11%

(中略)

14/09/05 03:57:23 INFO driver.MahoutDriver: Program took 699200 ms (Minutes: 11.65335)
[hadoop@ip-172-31-31-52 ~]$

Mahoutの処理結果がHDFS上に出力されていることをhadoop fs -lsコマンドで確認します。

[hadoop@ip-172-31-31-52 ~]$ hadoop fs -ls recommendations
Found 36 items
-rw-r--r--   1 hadoop supergroup          0 2014-09-05 03:57 recommendations/_SUCCESS
-rw-r--r--   1 hadoop supergroup      16651 2014-09-05 03:57 recommendations/part-r-00000
-rw-r--r--   1 hadoop supergroup      16869 2014-09-05 03:57 recommendations/part-r-00001
-rw-r--r--   1 hadoop supergroup      16969 2014-09-05 03:56 recommendations/part-r-00002
-rw-r--r--   1 hadoop supergroup      16619 2014-09-05 03:57 recommendations/part-r-00003
-rw-r--r--   1 hadoop supergroup      16881 2014-09-05 03:57 recommendations/part-r-00004
-rw-r--r--   1 hadoop supergroup      17052 2014-09-05 03:57 recommendations/part-r-00005
-rw-r--r--   1 hadoop supergroup      16673 2014-09-05 03:57 recommendations/part-r-00006
(以下略)

処理結果ファイルの中身をhadoop fs -catコマンドで確認します。左側がUserIDで右側がMovieIDとRatingが対になったものがカンマ区切りで10個書かれています。先頭行の場合UserIDが35のユーザーにはMovieIDが3504, 3341などがおすすめという意味になります。

[hadoop@ip-172-31-31-52 ~]$ hadoop fs -cat recommendations/part-r-00000 | head
35	[3504:5.0,3341:5.0,3108:5.0,2857:5.0,2349:5.0,3740:5.0,599:5.0,535:5.0,1078:5.0,1916:5.0]
70	[3740:5.0,1911:5.0,3108:5.0,3753:5.0,2420:5.0,47:5.0,832:5.0,1678:5.0,3300:5.0,1385:5.0]
105	[368:5.0,1129:5.0,2133:5.0,349:5.0,47:5.0,2478:5.0,2628:5.0,1358:5.0,3697:5.0,1388:5.0]
140	[1281:5.0,947:5.0,3740:5.0,3194:5.0,965:5.0,910:5.0,2182:5.0,1946:5.0,2303:5.0,3093:5.0]
175	[3366:5.0,1407:5.0,369:5.0,3819:5.0,3479:5.0,3198:5.0,3362:5.0,3635:5.0,3173:5.0,3007:5.0]
210	[2694:5.0,1129:5.0,2541:5.0,805:5.0,368:5.0,3476:5.0,832:5.0,1201:5.0,3300:5.0,1911:5.0]
245	[3740:5.0,1129:5.0,1078:5.0,3504:5.0,2303:5.0,47:5.0,1678:5.0,3108:5.0,1358:5.0,3435:5.0]
280	[3341:5.0,965:5.0,2133:5.0,3740:5.0,1916:5.0,2478:5.0,3108:5.0,1078:5.0,2349:5.0,3178:5.0]
315	[3360:5.0,3681:5.0,2470:5.0,1955:5.0,3639:5.0,517:5.0,3682:5.0,3479:5.0,1962:5.0,786:5.0]
350	[368:5.0,2336:5.0,3300:5.0,373:5.0,47:5.0,1129:5.0,832:5.0,3108:5.0,3740:5.0,1610:5.0]

Webサービスの構築

レコメンデーションデータがあるだけではレコメンデーションはできません。実際にはレコメンデーションデータをアプリケーションに組み込むことになります。ということで、このブログではMasterノード上に簡易的なWebサービスを構築しています。具体的にはPythonのTwistedとRedisを利用したWebサービスを構築します。なお、MasterノードはEMRクラスターを破棄すれば一緒に破棄されますので、実際にはHDFS上に出力されたレコメンデーションデータをS3に退避もしくはアプリケーションで利用するデータストアに格納した上でEMRクラスターを破棄するようにして下さい。

では最初に必要なPythonのパッケージをインストールします。

sudo easy_install twisted
sudo easy_install klein
sudo easy_install redis

次にRedisをインストールしてバックグラウンドで起動します。

wget http://download.redis.io/releases/redis-2.8.7.tar.gz
tar xzf redis-2.8.7.tar.gz
cd redis-2.8.7
make
./src/redis-server &

hello.pyという簡易的なWebサービスを作ります。vimコマンドなどで作成して下さい。なお、hello.pyの中でHDFSからレコメンデーションデータを取得してRedisに投入しています。

from klein import run, route
import redis
import os
 
# Start up a Redis instance
r = redis.StrictRedis(host='localhost', port=6379, db=0)
 
# Pull out all the recommendations from HDFS
p = os.popen("hadoop fs -cat recommendations/part*")
 
# Load the recommendations into Redis
for i in p:
 
  # Split recommendations into key of user id 
  # and value of recommendations
  # E.g., 35^I[2067:5.0,17:5.0,1041:5.0,2068:5.0,2087:5.0,
  #       1036:5.0,900:5.0,1:5.0,081:5.0,3135:5.0]$
  k,v = i.split('\t')
 
  # Put key, value into Redis
  r.set(k,v)
 
# Establish an endpoint that takes in user id in the path
@route('/<string:id>')
 
def recs(request, id):
  # Get recommendations for this user
  v = r.get(id)
  return 'The recommendations for user '+id+' are '+v
 
 
# Make a default endpoint
@route('/')
 
def home(request):
  return 'Please add a user id to the URL, e.g. http://localhost:8080/1234\n'
 
# Start up a listener on port 8080
run("localhost", 8080)

twistdコマンドでhello.pyを起動します。

[hadoop@ip-172-31-31-52 redis-2.8.7]$ twistd -noy hello.py &
[2] 7491

curlコマンドでUserIDに対応するレコメンデーションデータ取得できることを確認します。

[hadoop@ip-172-31-31-52 redis-2.8.7]$ curl localhost:8080/37
2014-09-05 05:50:51+0000 [HTTPChannel,0,127.0.0.1] "127.0.0.1" - - [05/Sep/2014:05:50:51 +0000] "GET /37 HTTP/1.1" 200 125 "-" "curl/7.36.0"
The recommendations for user 37 are [3740:5.0,1688:5.0,2857:5.0,1946:5.0,368:5.0,2478:5.0,832:5.0,3108:5.0,2133:5.0,237:5.0]

なお、2行目はTwistedのアクセスログです。バックグラウンドで実行しているため表示されます。別途ターミナルを開いてcurlコマンドを実行すると以下のように表示されません。

[ec2-user@ip-172-31-31-52 ~]$ curl localhost:8080/37
The recommendations for user 37 are [3740:5.0,1688:5.0,2857:5.0,1946:5.0,368:5.0,2478:5.0,832:5.0,3108:5.0,2133:5.0,237:5.0]

EMRクラスタの破棄

以上でMahoutによるレコメンデーションデータの作成とWebサービスの構築が完了したのでEMRクラスタを破棄します。まずMasterノードからexitします。そしてelastic-mapreduce --listコマンドで現在実行しているEMRクラスタのJOB_FLOW_IDを確認し、elastic-mapreduce --terminateコマンドでEMRクラスタを破棄します。暫くはステータスがSHUTTING_DOWNですが完全に破棄されるとTERMINATEDに変わります。

[hadoop@ip-172-31-31-52 ~]$ exit
logout
Connection to ec2-54-64-25-XXX.ap-northeast-1.compute.amazonaws.com closed.
[elastic-mapreduce-ruby]$ ./elastic-mapreduce --list
j-1H6YH0O5YCAEJ     WAITING        ec2-54-64-25-XXX.ap-northeast-1.compute.amazonaws.commahout-tutorial
[elastic-mapreduce-ruby]$ ./elastic-mapreduce --terminate j-1H6YH0O5YCAEJ
Terminated job flow j-1H6YH0O5YCAEJ
[elastic-mapreduce-ruby]$ ./elastic-mapreduce --list
j-1H6YH0O5YCAEJ     SHUTTING_DOWN  ec2-54-64-25-XXX.ap-northeast-1.compute.amazonaws.commahout-tutorial
[elastic-mapreduce-ruby]$ ./elastic-mapreduce --list
j-1H6YH0O5YCAEJ     TERMINATED     ec2-54-64-25-XXX.ap-northeast-1.compute.amazonaws.commahout-tutorial