Amazon EMR 4.0.0で始める Apache Spark 1.4.1
Amazon EMRがApache Sparkをサポート
つい先日のニュースで、Amazon EMRがApache Sparkをサポートすることが発表されました。また、昨日、最新のSpark1.4.1がサポートされたことも発表されました。そこで、今回は早速使ってみたいと思います!
New – Apache Spark on Amazon EMR
セットアップ手順
Amazon EMRのセットアップでSparkを指定できますので、早速やってみましょう。
インストールするアプリケーションの指定
EMRでは、コンソールで利用するアプリケーションを指定してインストールすることができます。今回は、SQLライクにMapReduceを記述できるHiveとPig、そしてSpark、さらに、Mahautもインストールします。
ノードの指定
EMRは、司令塔となるMasterノードと、作業をするSlaveノードに分かれます。インスタンスタイプは当然高いほうが良いかと思いますが、今回は試用なのでSpotインスタンスを活用してみましょう。Spotインスタンス履歴からほんのちょっとだけ高め(0.001とかw)の指値で入札すると早く立ち上がるかもしれませんねw。
その他
基本的には画面に沿って進めるだけで直ぐにクラスタが起動しますが、事前にいくつか決めておいた方が良いこともあります。例えば、EMRをVPC内で起動するのであればVPCとSubnetを作成しておきましょう。SSHログインして作業するのであれば鍵を指定しましょう。起動時のアクションを指定するのであればJarを作成しておきましょう。
SSHログイン
クラスターを立ち上げる際にキーの指定をしておいてください。これがないとSSHログインできません。
Last login: Wed Jul 22 07:52:54 2015 from XXX.XXX.XXX.XXX __| __|_ ) _| ( / Amazon Linux AMI ___|\___|___| https://aws.amazon.com/amazon-linux-ami/2015.03-release-notes/ 10 package(s) needed for security, out of 21 available Run "sudo yum update" to apply all updates. EEEEEEEEEEEEEEEEEEEE MMMMMMMM MMMMMMMM RRRRRRRRRRRRRRR E::::::::::::::::::E M:::::::M M:::::::M R::::::::::::::R EE:::::EEEEEEEEE:::E M::::::::M M::::::::M R:::::RRRRRR:::::R E::::E EEEEE M:::::::::M M:::::::::M RR::::R R::::R E::::E M::::::M:::M M:::M::::::M R:::R R::::R E:::::EEEEEEEEEE M:::::M M:::M M:::M M:::::M R:::RRRRRR:::::R E::::::::::::::E M:::::M M:::M:::M M:::::M R:::::::::::RR E:::::EEEEEEEEEE M:::::M M:::::M M:::::M R:::RRRRRR::::R E::::E M:::::M M:::M M:::::M R:::R R::::R E::::E EEEEE M:::::M MMM M:::::M R:::R R::::R EE:::::EEEEEEEE::::E M:::::M M:::::M R:::R R::::R E::::::::::::::::::E M:::::M M:::::M RR::::R R::::R EEEEEEEEEEEEEEEEEEEE MMMMMMM MMMMMMM RRRRRRR RRRRRR
無事にマスターノードにログインできました。
PySparkを使ってみる
spark操作をpythonで行えるPySparkを使ってみたいと思います。まずはhadoopユーザに切り替えます。
$ sudo su - hadoop
全国郵便番号データを取得
次に操作対象のデータを用意します。今回は、全国の郵便番号データを取得しています。
$ wget http://www.post.japanpost.jp/zipcode/dl/roman/ken_all_rome.zip $ unzip ken_all_rome.zip $ cd ken_all_rome $ ls KEN_ALL_ROME.CSV
このままだとShift_JISだと思うので、nkfコマンドでUTF-8に変換します。
$ sudo wget http://mirror.centos.org/centos/6/os/x86_64/Packages/nkf-2.0.8b-6.2.el6.x86_64.rpm $ sudo rpm -ivh nkf-2.0.8b-6.2.el6.x86_64.rpm $ nkf -w KEN_ALL_ROME.CSV > KEN_ALL_ROME_UTF.CSV $ tail KEN_ALL_ROME_UTF.CSV "9071432","沖縄県","八重山郡 竹富町","古見","OKINAWA KEN","YAEYAMA GUN TAKETOMI CHO","KOMI" "9071543","沖縄県","八重山郡 竹富町","崎山","OKINAWA KEN","YAEYAMA GUN TAKETOMI CHO","SAKIYAMA" "9071431","沖縄県","八重山郡 竹富町","高那","OKINAWA KEN","YAEYAMA GUN TAKETOMI CHO","TAKANA" "9071101","沖縄県","八重山郡 竹富町","竹富","OKINAWA KEN","YAEYAMA GUN TAKETOMI CHO","TAKETOMI" "9071434","沖縄県","八重山郡 竹富町","南風見","OKINAWA KEN","YAEYAMA GUN TAKETOMI CHO","HAIMI" "9071433","沖縄県","八重山郡 竹富町","南風見仲","OKINAWA KEN","YAEYAMA GUN TAKETOMI CHO","HAIMINAKA" "9071751","沖縄県","八重山郡 竹富町","波照間","OKINAWA KEN","YAEYAMA GUN TAKETOMI CHO","HATERUMA" "9071544","沖縄県","八重山郡 竹富町","鳩間","OKINAWA KEN","YAEYAMA GUN TAKETOMI CHO","HATOMA" "9071800","沖縄県","八重山郡 与那国町","以下に掲載がない場合","OKINAWA KEN","YAEYAMA GUN YONAGUNI CHO","IKANIKEISAIGANAIBAAI" "9071801","沖縄県","八重山郡 与那国町","与那国","OKINAWA KEN","YAEYAMA GUN YONAGUNI CHO","YONAGUNI"
これで下準備はOKです。
HDFSにデータを置く
用意したCSVデータをHDFSに置きます。
$ hadoop fs -mkdir /user/hadoop/ $ hadoop fs -put KEN_ALL_ROME_UTF.CSV /user/hadoop/KEN_ALL_ROME_UTF.CSV
PySparkの起動
コマンド1つで起動します
$ pyspark Python 2.6.9 (unknown, Apr 1 2015, 18:16:00) [GCC 4.8.2 20140120 (Red Hat 4.8.2-16)] on linux2 .... Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 1.4.1 /_/ Using Python version 2.6.9 (unknown, Apr 1 2015 18:16:00) SparkContext available as sc, HiveContext available as sqlContext. >>>
全国郵便番号データを操作する
まずはレコード件数から
>>> data = sc.textFile('/user/hadoop/KEN_ALL_ROME_UTF.CSV') .... >>> data.count() .... 123699
TOKYOが含まれる件数
>>> data.filter(lambda s: 'TOKYO' in s).count() .... 3773
Spark SQLを使う
PySparkを使ってCSVを元にしたテーブルを定義したいと思います。DataFrameと呼ばれるものです。
>>> from pyspark.sql import Row >>> from pyspark.sql import SQLContext >>> sqlContext = SQLContext(sc) >>> lines = sc.textFile('/user/hadoop/KEN_ALL_ROME_UTF.CSV') >>> cells = lines.map(lambda l: l.split(",")) >>> rows = cells.map(lambda x: Row( ZipCode=x[0].replace('"',''), Prefecture_en=x[4].replace('"',''), City_en=x[5].replace('"',''), Address_en_=x[6].replace('"',''))) >>> sdf = sqlContext.createDataFrame(rows) >>> sdf.show() +--------------------+-------------------+-------------+-------+ | Address_en_| City_en|Prefecture_en|ZipCode| +--------------------+-------------------+-------------+-------+ |IKANIKEISAIGANAIBAAI|SAPPORO SHI CHUO KU| HOKKAIDO|0600000| | ASAHIGAOKA|SAPPORO SHI CHUO KU| HOKKAIDO|0640941| | ODORIHIGASHI|SAPPORO SHI CHUO KU| HOKKAIDO|0600041| |ODORINISHI(1-19-C...|SAPPORO SHI CHUO KU| HOKKAIDO|0600042| |ODORINISHI(20-28-...|SAPPORO SHI CHUO KU| HOKKAIDO|0640820| | KITA1-JOHIGASHI|SAPPORO SHI CHUO KU| HOKKAIDO|0600031| |KITA1-JONISHI(1-1...|SAPPORO SHI CHUO KU| HOKKAIDO|0600001| |KITA1-JONISHI(20-...|SAPPORO SHI CHUO KU| HOKKAIDO|0640821| | KITA2-JOHIGASHI|SAPPORO SHI CHUO KU| HOKKAIDO|0600032| |KITA2-JONISHI(1-1...|SAPPORO SHI CHUO KU| HOKKAIDO|0600002| |KITA2-JONISHI(20-...|SAPPORO SHI CHUO KU| HOKKAIDO|0640822| | KITA3-JOHIGASHI|SAPPORO SHI CHUO KU| HOKKAIDO|0600033| |KITA3-JONISHI(1-1...|SAPPORO SHI CHUO KU| HOKKAIDO|0600003| |KITA3-JONISHI(20-...|SAPPORO SHI CHUO KU| HOKKAIDO|0640823| |KITA4-JOHIGASHI(1...|SAPPORO SHI CHUO KU| HOKKAIDO|0600034| |KITA4-JONISHI(1-1...|SAPPORO SHI CHUO KU| HOKKAIDO|0600004| |KITA4-JONISHI(20-...|SAPPORO SHI CHUO KU| HOKKAIDO|0640824| | KITA5-JOHIGASHI|SAPPORO SHI CHUO KU| HOKKAIDO|0600035| |KITA5-JONISHI(1-2...|SAPPORO SHI CHUO KU| HOKKAIDO|0600005| |KITA5-JONISHI(25-...|SAPPORO SHI CHUO KU| HOKKAIDO|0640825| +--------------------+-------------------+-------------+-------+
次に、県名で集計してみたいと思います。
>>> sdf.groupBy('Prefecture_en').count().show() +-------------+-----+ |Prefecture_en|count| +-------------+-----+ | ISHIKAWA KEN| 2542| | HYOGO KEN| 5216| | FUKUOKA KEN| 3287| | NARA KEN| 1934| | GIFU KEN| 3365| |YAMAGUCHI KEN| 1803| | SHIZUOKA KEN| 2939| | SAGA KEN| 872| | FUKUI KEN| 2260| | NAGANO KEN| 1684| |KAGOSHIMA KEN| 1458| | WAKAYAMA KEN| 1599| |HIROSHIMA KEN| 2155| | OKINAWA KEN| 797| | OKAYAMA KEN| 2188| | CHIBA KEN| 3586| | MIE KEN| 2474| | SAITAMA KEN| 2940| | IWATE KEN| 2071| | YAMAGATA KEN| 1953| +-------------+-----+
HiveQLを使う
SparkSQLと同様に、HiveQLを使ってみたいと思います。
>>> from pyspark.sql import Row >>> from pyspark.sql import HiveContext >>> sqlContext = HiveContext(sc) >>> lines = sc.textFile('/user/hadoop/KEN_ALL_ROME_UTF.CSV') >>> cells = lines.map(lambda l: l.split(",")) >>> rows = cells.map(lambda x: Row( ZipCode=x[0].replace('"',''), Prefecture_en=x[4].replace('"',''), City_en=x[5].replace('"',''), Address_en_=x[6].replace('"',''))) >>> sdf = sqlContext.createDataFrame(rows) >>> sqlContext.registerDataFrameAsTable(sdf, "address") >>> sqlContext.sql("SELECT Prefecture_en,count(*) as Count FROM address GROUP BY Prefecture_en").show() +-------------+-----+ |Prefecture_en|Count| +-------------+-----+ | ISHIKAWA KEN| 2542| | HYOGO KEN| 5216| | FUKUOKA KEN| 3287| | NARA KEN| 1934| | GIFU KEN| 3365| |YAMAGUCHI KEN| 1803| | SHIZUOKA KEN| 2939| | SAGA KEN| 872| | FUKUI KEN| 2260| | NAGANO KEN| 1684| |KAGOSHIMA KEN| 1458| | WAKAYAMA KEN| 1599| |HIROSHIMA KEN| 2155| | OKINAWA KEN| 797| | OKAYAMA KEN| 2188| | CHIBA KEN| 3586| | MIE KEN| 2474| | SAITAMA KEN| 2940| | IWATE KEN| 2071| | YAMAGATA KEN| 1953| +-------------+-----+
SQLに慣れている人はこっちのほうが直感的かも
まとめ
今回は、Amazon EMR新バージョンリリースに合わせて、PySparkの動作確認を行い、SparkSQLとかHiveQLを試してみました。PySparkはとても楽ですね!直感的に記述することができますし、途中で記述を間違えてもやり直しがすぐできます。Sparkの操作は、Java、Scala、Pythonなどを用いることができますので、お好みに合わせて使ってみてください。それにしても動作が早いです。今後の動向が楽しみですね。
参考資料
PySpark 1.4.1 documentation » pyspark.sql module
SparkSQL - HiveContext/SQLContextの性能比較
Spark 1.4.1 - Spark SQL and DataFrame Guide
住所の郵便番号(ローマ字・zip形式)