Amazon EMR 4.0.0で始める Apache Spark 1.4.1

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

Amazon EMRがApache Sparkをサポート

つい先日のニュースで、Amazon EMRがApache Sparkをサポートすることが発表されました。また、昨日、最新のSpark1.4.1がサポートされたことも発表されました。そこで、今回は早速使ってみたいと思います!

New – Apache Spark on Amazon EMR

screenshot 2015-06-20 14.12.37

セットアップ手順

Amazon EMRのセットアップでSparkを指定できますので、早速やってみましょう。

インストールするアプリケーションの指定

EMRでは、コンソールで利用するアプリケーションを指定してインストールすることができます。今回は、SQLライクにMapReduceを記述できるHiveとPig、そしてSpark、さらに、Mahautもインストールします。

screenshot 2015-07-25 23.46.23

ノードの指定

EMRは、司令塔となるMasterノードと、作業をするSlaveノードに分かれます。インスタンスタイプは当然高いほうが良いかと思いますが、今回は試用なのでSpotインスタンスを活用してみましょう。Spotインスタンス履歴からほんのちょっとだけ高め(0.001とかw)の指値で入札すると早く立ち上がるかもしれませんねw。

screenshot 2015-06-20 14.51.42

その他

基本的には画面に沿って進めるだけで直ぐにクラスタが起動しますが、事前にいくつか決めておいた方が良いこともあります。例えば、EMRをVPC内で起動するのであればVPCとSubnetを作成しておきましょう。SSHログインして作業するのであれば鍵を指定しましょう。起動時のアクションを指定するのであればJarを作成しておきましょう。

SSHログイン

クラスターを立ち上げる際にキーの指定をしておいてください。これがないとSSHログインできません。

Last login: Wed Jul 22 07:52:54 2015 from XXX.XXX.XXX.XXX

       __|  __|_  )
       _|  (     /   Amazon Linux AMI
      ___|\___|___|

https://aws.amazon.com/amazon-linux-ami/2015.03-release-notes/
10 package(s) needed for security, out of 21 available
Run "sudo yum update" to apply all updates.
                                                                    
EEEEEEEEEEEEEEEEEEEE MMMMMMMM           MMMMMMMM RRRRRRRRRRRRRRR    
E::::::::::::::::::E M:::::::M         M:::::::M R::::::::::::::R   
EE:::::EEEEEEEEE:::E M::::::::M       M::::::::M R:::::RRRRRR:::::R 
  E::::E       EEEEE M:::::::::M     M:::::::::M RR::::R      R::::R
  E::::E             M::::::M:::M   M:::M::::::M   R:::R      R::::R
  E:::::EEEEEEEEEE   M:::::M M:::M M:::M M:::::M   R:::RRRRRR:::::R 
  E::::::::::::::E   M:::::M  M:::M:::M  M:::::M   R:::::::::::RR   
  E:::::EEEEEEEEEE   M:::::M   M:::::M   M:::::M   R:::RRRRRR::::R  
  E::::E             M:::::M    M:::M    M:::::M   R:::R      R::::R
  E::::E       EEEEE M:::::M     MMM     M:::::M   R:::R      R::::R
EE:::::EEEEEEEE::::E M:::::M             M:::::M   R:::R      R::::R
E::::::::::::::::::E M:::::M             M:::::M RR::::R      R::::R
EEEEEEEEEEEEEEEEEEEE MMMMMMM             MMMMMMM RRRRRRR      RRRRRR

無事にマスターノードにログインできました。

PySparkを使ってみる

spark操作をpythonで行えるPySparkを使ってみたいと思います。まずはhadoopユーザに切り替えます。

$ sudo su - hadoop

全国郵便番号データを取得

次に操作対象のデータを用意します。今回は、全国の郵便番号データを取得しています。

$ wget http://www.post.japanpost.jp/zipcode/dl/roman/ken_all_rome.zip
$ unzip ken_all_rome.zip 
$ cd ken_all_rome
$ ls
KEN_ALL_ROME.CSV

このままだとShift_JISだと思うので、nkfコマンドでUTF-8に変換します。

$ sudo wget http://mirror.centos.org/centos/6/os/x86_64/Packages/nkf-2.0.8b-6.2.el6.x86_64.rpm
$ sudo rpm -ivh nkf-2.0.8b-6.2.el6.x86_64.rpm
$ nkf -w KEN_ALL_ROME.CSV > KEN_ALL_ROME_UTF.CSV
$ tail KEN_ALL_ROME_UTF.CSV 
"9071432","沖縄県","八重山郡 竹富町","古見","OKINAWA KEN","YAEYAMA GUN TAKETOMI CHO","KOMI"
"9071543","沖縄県","八重山郡 竹富町","崎山","OKINAWA KEN","YAEYAMA GUN TAKETOMI CHO","SAKIYAMA"
"9071431","沖縄県","八重山郡 竹富町","高那","OKINAWA KEN","YAEYAMA GUN TAKETOMI CHO","TAKANA"
"9071101","沖縄県","八重山郡 竹富町","竹富","OKINAWA KEN","YAEYAMA GUN TAKETOMI CHO","TAKETOMI"
"9071434","沖縄県","八重山郡 竹富町","南風見","OKINAWA KEN","YAEYAMA GUN TAKETOMI CHO","HAIMI"
"9071433","沖縄県","八重山郡 竹富町","南風見仲","OKINAWA KEN","YAEYAMA GUN TAKETOMI CHO","HAIMINAKA"
"9071751","沖縄県","八重山郡 竹富町","波照間","OKINAWA KEN","YAEYAMA GUN TAKETOMI CHO","HATERUMA"
"9071544","沖縄県","八重山郡 竹富町","鳩間","OKINAWA KEN","YAEYAMA GUN TAKETOMI CHO","HATOMA"
"9071800","沖縄県","八重山郡 与那国町","以下に掲載がない場合","OKINAWA KEN","YAEYAMA GUN YONAGUNI CHO","IKANIKEISAIGANAIBAAI"
"9071801","沖縄県","八重山郡 与那国町","与那国","OKINAWA KEN","YAEYAMA GUN YONAGUNI CHO","YONAGUNI"

これで下準備はOKです。

HDFSにデータを置く

用意したCSVデータをHDFSに置きます。

$ hadoop fs -mkdir /user/hadoop/
$ hadoop fs -put KEN_ALL_ROME_UTF.CSV /user/hadoop/KEN_ALL_ROME_UTF.CSV

PySparkの起動

コマンド1つで起動します

$ pyspark
Python 2.6.9 (unknown, Apr  1 2015, 18:16:00) 
[GCC 4.8.2 20140120 (Red Hat 4.8.2-16)] on linux2
....
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.4.1
      /_/

Using Python version 2.6.9 (unknown, Apr  1 2015 18:16:00)
SparkContext available as sc, HiveContext available as sqlContext.
>>>

全国郵便番号データを操作する

まずはレコード件数から

>>> data = sc.textFile('/user/hadoop/KEN_ALL_ROME_UTF.CSV')
....
>>> data.count()
....
123699

TOKYOが含まれる件数

>>> data.filter(lambda s: 'TOKYO' in s).count()
....
3773

Spark SQLを使う

PySparkを使ってCSVを元にしたテーブルを定義したいと思います。DataFrameと呼ばれるものです。

>>> from pyspark.sql import Row
>>> from pyspark.sql import SQLContext
>>> sqlContext = SQLContext(sc)
>>> lines = sc.textFile('/user/hadoop/KEN_ALL_ROME_UTF.CSV')
>>> cells = lines.map(lambda l: l.split(","))
>>> rows = cells.map(lambda x: Row(
  ZipCode=x[0].replace('"',''), 
 	Prefecture_en=x[4].replace('"',''),
	City_en=x[5].replace('"',''), 
	Address_en_=x[6].replace('"','')))
>>> sdf = sqlContext.createDataFrame(rows)
>>> sdf.show()
+--------------------+-------------------+-------------+-------+
|         Address_en_|            City_en|Prefecture_en|ZipCode|
+--------------------+-------------------+-------------+-------+
|IKANIKEISAIGANAIBAAI|SAPPORO SHI CHUO KU|     HOKKAIDO|0600000|
|          ASAHIGAOKA|SAPPORO SHI CHUO KU|     HOKKAIDO|0640941|
|        ODORIHIGASHI|SAPPORO SHI CHUO KU|     HOKKAIDO|0600041|
|ODORINISHI(1-19-C...|SAPPORO SHI CHUO KU|     HOKKAIDO|0600042|
|ODORINISHI(20-28-...|SAPPORO SHI CHUO KU|     HOKKAIDO|0640820|
|     KITA1-JOHIGASHI|SAPPORO SHI CHUO KU|     HOKKAIDO|0600031|
|KITA1-JONISHI(1-1...|SAPPORO SHI CHUO KU|     HOKKAIDO|0600001|
|KITA1-JONISHI(20-...|SAPPORO SHI CHUO KU|     HOKKAIDO|0640821|
|     KITA2-JOHIGASHI|SAPPORO SHI CHUO KU|     HOKKAIDO|0600032|
|KITA2-JONISHI(1-1...|SAPPORO SHI CHUO KU|     HOKKAIDO|0600002|
|KITA2-JONISHI(20-...|SAPPORO SHI CHUO KU|     HOKKAIDO|0640822|
|     KITA3-JOHIGASHI|SAPPORO SHI CHUO KU|     HOKKAIDO|0600033|
|KITA3-JONISHI(1-1...|SAPPORO SHI CHUO KU|     HOKKAIDO|0600003|
|KITA3-JONISHI(20-...|SAPPORO SHI CHUO KU|     HOKKAIDO|0640823|
|KITA4-JOHIGASHI(1...|SAPPORO SHI CHUO KU|     HOKKAIDO|0600034|
|KITA4-JONISHI(1-1...|SAPPORO SHI CHUO KU|     HOKKAIDO|0600004|
|KITA4-JONISHI(20-...|SAPPORO SHI CHUO KU|     HOKKAIDO|0640824|
|     KITA5-JOHIGASHI|SAPPORO SHI CHUO KU|     HOKKAIDO|0600035|
|KITA5-JONISHI(1-2...|SAPPORO SHI CHUO KU|     HOKKAIDO|0600005|
|KITA5-JONISHI(25-...|SAPPORO SHI CHUO KU|     HOKKAIDO|0640825|
+--------------------+-------------------+-------------+-------+

次に、県名で集計してみたいと思います。

>>> sdf.groupBy('Prefecture_en').count().show()
+-------------+-----+
|Prefecture_en|count|
+-------------+-----+
| ISHIKAWA KEN| 2542|
|    HYOGO KEN| 5216|
|  FUKUOKA KEN| 3287|
|     NARA KEN| 1934|
|     GIFU KEN| 3365|
|YAMAGUCHI KEN| 1803|
| SHIZUOKA KEN| 2939|
|     SAGA KEN|  872|
|    FUKUI KEN| 2260|
|   NAGANO KEN| 1684|
|KAGOSHIMA KEN| 1458|
| WAKAYAMA KEN| 1599|
|HIROSHIMA KEN| 2155|
|  OKINAWA KEN|  797|
|  OKAYAMA KEN| 2188|
|    CHIBA KEN| 3586|
|      MIE KEN| 2474|
|  SAITAMA KEN| 2940|
|    IWATE KEN| 2071|
| YAMAGATA KEN| 1953|
+-------------+-----+

HiveQLを使う

SparkSQLと同様に、HiveQLを使ってみたいと思います。

>>> from pyspark.sql import Row
>>> from pyspark.sql import HiveContext
>>> sqlContext = HiveContext(sc)
>>> lines = sc.textFile('/user/hadoop/KEN_ALL_ROME_UTF.CSV')
>>> cells = lines.map(lambda l: l.split(","))
>>> rows = cells.map(lambda x: Row(
  ZipCode=x[0].replace('"',''), 
 	Prefecture_en=x[4].replace('"',''),
	City_en=x[5].replace('"',''), 
	Address_en_=x[6].replace('"','')))
>>> sdf = sqlContext.createDataFrame(rows)
>>> sqlContext.registerDataFrameAsTable(sdf, "address")
>>> sqlContext.sql("SELECT Prefecture_en,count(*) as Count FROM address GROUP BY Prefecture_en").show()
+-------------+-----+
|Prefecture_en|Count|
+-------------+-----+
| ISHIKAWA KEN| 2542|
|    HYOGO KEN| 5216|
|  FUKUOKA KEN| 3287|
|     NARA KEN| 1934|
|     GIFU KEN| 3365|
|YAMAGUCHI KEN| 1803|
| SHIZUOKA KEN| 2939|
|     SAGA KEN|  872|
|    FUKUI KEN| 2260|
|   NAGANO KEN| 1684|
|KAGOSHIMA KEN| 1458|
| WAKAYAMA KEN| 1599|
|HIROSHIMA KEN| 2155|
|  OKINAWA KEN|  797|
|  OKAYAMA KEN| 2188|
|    CHIBA KEN| 3586|
|      MIE KEN| 2474|
|  SAITAMA KEN| 2940|
|    IWATE KEN| 2071|
| YAMAGATA KEN| 1953|
+-------------+-----+

SQLに慣れている人はこっちのほうが直感的かも

まとめ

今回は、Amazon EMR新バージョンリリースに合わせて、PySparkの動作確認を行い、SparkSQLとかHiveQLを試してみました。PySparkはとても楽ですね!直感的に記述することができますし、途中で記述を間違えてもやり直しがすぐできます。Sparkの操作は、Java、Scala、Pythonなどを用いることができますので、お好みに合わせて使ってみてください。それにしても動作が早いです。今後の動向が楽しみですね。

参考資料

PySpark 1.4.1 documentation » pyspark.sql module

SparkSQL - HiveContext/SQLContextの性能比較

Spark 1.4.1 - Quick Start

Spark 1.4.1 - Spark SQL and DataFrame Guide

住所の郵便番号(ローマ字・zip形式)

簡単な集約/変換処理を PySpark & pandas の DataFrame で行う