Spark入門 | Hadoop Advent Calendar 2016 #15

こんにちは、小澤です。 この記事はHadoop Advent Calendar 15日目のものとなります。

前回はSparkがどのような構成で動作しているのかについて書かせていただきました。
今回はSparkで、Word Countを実装したいと思います。

Sparkの環境構築

Sparkはローカル環境でも動かすことができるため、今回はクラスタ構築はせずローカルで動かしてみることにします。 環境構築は簡単で、例えばMacであればhomebrewで導入することができます。

$ brew install apache-spark

インストールが完了したらターミナル上で「spark」とまで打ってタブを押すといくつかの保管候補が出るかと思います。

$ spark[tab]
spark-beeline  spark-class    spark-shell    spark-sql      spark-submit   sparkR 

これらが表示されていればインストールは完了です。

今回はhomebrewでインストールされる2.0.1を前提にさせていただいています。 簡単なプログラムの作成のみなので問題ないかと思いますが、1系で動作確認は行っていませんのでご注意ください。

サンプルプログラムを動かす

まずは一番簡単な方法でサンプルプログラムを動かしてみます。 spark-examples_-.jarというのに入っているSparkPiというものを動かします。

homebrewでインストールした場合これは/usr/local/Cellar/apache-spark//libexec/examples/jars/に入っているかと思います。 これを実行してみます。

spark-submit --class org.apache.spark.examples.SparkPi --master local[1] <jarのpath>
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/11/09 17:45:43 INFO SparkContext: Running Spark version 2.0.1
16/11/09 17:45:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
...
...
Pi is roughly 3.136715683578418

長いログ出力が出た後に「Pi is roughly 3.136715683578418」のように出力されていれば成功です。数値の部分はここで表示される結果と異なるかもしれません。

プログラムの実装

次にSparkで動くWord Countを実装してみます。

spark-shell

まずは、spark-shellを使ってやってみます。spark-shellとはscalaのREPLを拡張してSparkContexなどいくつかの変数を最初から要したお手軽な環境となります。 scalaのREPLなので記述する言語はscalaとなります。scalaは別途インストールしておいてください。 また、他の言語についてはこの後で解説していきます。

$ spark-shell
scala> val input = sc.textFile("wordcount")
input: org.apache.spark.rdd.RDD[String] = wordcount MapPartitionsRDD[21] at textFile at <console>:24

scala> val flatten = input.flatMap(line => line.split(" "))
flatten: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[23] at flatMap at <console>:26

scala> val map = flatten.map(word => (word, 1))
map: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[24] at map at <console>:28

scala> val result = map.reduceByKey((a, b) => a + b)
result: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[25] at reduceByKey at <console>:30

scala> result.collect()
res6: Array[(String, Int)] = Array((Hadoop,1), (Commodity,1), (For,1), (this,3), (country,1), (under,1), (it,1), (The,4), (Jetty,1), (Software,2), (Technology,1), (<http://www.wassenaar.org/>,1), (have,1), (http://wiki.apache.org/hadoop/,1), (BIS,1), (classified,1), (This,1), (following,1), (which,2), (security,1), (See,1), (encryption,3), (Number,1), (export,1), (reside,1), (for,3), ((BIS),,1), (any,1), (at:,2), (software,2), (makes,1), (algorithms.,1), (re-export,2), (latest,1), (your,1), (SSL,1), (the,8), (Administration,1), (includes,2), (import,,2), (provides,1), (Unrestricted,1), (country's,1), (if,1), (740.13),1), (Commerce,,1), (country,,1), (software.,2), (concerning,1), (laws,,1), (source,1), (possession,,2), (Apache,1), (our,2), (written,1), (as,1), (License,1), (regulations,...

このようにsparkはREPL形式のshellで動作を確認しながら実行することもできるため、試行錯誤する段階にも向いています。

jarから実行

次にspark-submitでjarファイルに記述されたプログラムを実行してみます。 とはいえ、実行の方法は先ほどのSparkPiと同じですし、プログラムの内容はspark-shellのものとほとんど変わりません。

ビルドツールなどはお使いの環境に任せるものとして以下のように実装します。

import org.apache.spark.{SparkConf, SparkContext}

object WordCount {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Word Count").setMaster("local")
    val sc = new SparkContext(conf)

    // 入出力先は直接書くのではなく引数で受け取るようにした
    val input = sc.textFile(args(0))
    
    // 一つ一つを変数で受け取らずメソッドチェーンで記述するように変更した
    val wordcount = input.flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_ + _)

    // Scalaのコレクションとして受け取るのではなくファイルに出力する
    wordcount.saveAsTextFile(args(1))
  }
}

また、依存ライブラリとしてspark-coreを追加してください。ここではmavenでの例を示します。scalaやsparkのバージョンはお使いの環境に合わせて設定してください。

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>2.0.1</version>
</dependency>

これで

$ spark-submit --class WordCount <作成したjarファイル> <入力ディレクトリ> <出力ディレクトリ>

とすることで実行できます。

Javaでの実装

Javaでの記述にはラムダ式が利用できます。 基本的な流れとしてはScalaで実装したものと特に違いはありませんが、flatMapの戻り値がIterableではなくIteratorを要求するのでその点だけ注意してください。 実行方法についてもjarファイル作成後はScalaと同様です。

import java.util.Arrays;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaPairRDD;

import scala.Tuple2;

public class WordCount {
  public static void main(String[] args) {
    SparkConf conf = new SparkConf().setAppName("Word Count").setMaster("local");
    JavaSparkContext sc = new JavaSparkContext(conf);
    
    JavaRDD<String> input = sc.textFile(args[0]);
    JavaPairRDD<String, Integer> wordcount = input.flatMap(s -> Arrays.asList(s.split(" ")).iterator())
        .mapToPair(s -> new Tuple2<String, Integer>(s, 1))
        .reduceByKey((a, b) -> a + b);
    wordcount.saveAsTextFile(args[1]);
        
  }
}

Pythonでの実装

Pythonでの実装も基本的には同じですが、SparkContextのオブジェクトを生成する際にキーワード引数confを明示的に指定する必要があるのでその点だけ注意が必要です。 ScalaやJava同様lambdaを使って処理内容を記述できますが、Pythonの場合lambda内で複数行の処理が記述できないため複雑な処理を実装したい場合はやや面倒かもしれません。

また、Pythonのバージョンについてですが、下記プログラムは2.7と3.5のどちらでも動くことを確認していますのであまり気にせず実行してみてください。

import sys
from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName('Word Count').setMaster('local')
sc = SparkContext(conf=conf)

input = sc.textFile(sys.argv[1])
wordcount = input.flatMap(lambda line: line.split(' ')) \
            .map(lambda word: (word, 1)) \
            .reduceByKey(lambda a, b: a + b)
wordcount.saveAsTextFile(sys.argv[2])

実行方法はScalaやJavaと同様にspark-submitコマンドを利用します

$ spark-submit <pythonファイル> <入力ディレクトリ> <出力ディレクトリ>

また、pysparkコマンドを実行するとScala同様、REPL形式のshellでの実行も行えます。

Jupyter Notebookから動かしてみる

pysparkの起動時や、Jupyter側設定などいくつかの方法がありますが。 今回はpysparkコマンドでjupyterを起動するようにしてみましょう。

最低限の設定として以下3つを記述します。

export PYSPARK_PYTHON=<pythonコマンドのpath>
export PYSPARK_DRIVER_PYTHON=<jupyterコマンドのpath>
export PYSPARK_DRIVER_PYTHON_OPTS="notebook"

この設定を行うと、pysparkコマンドをjupyterで実行してくれるようになります。

spark-jupyter

standaloneモード

これまではローカルの環境で動かしていましたが、最後にstandaloneモードで動かして擬似的に分散処理をしてる様子を見てみます。 master(Driver)とslave(Worker)をローカル環境で立ち上げてそこにジョブを投げます。

ノードの立ち上げは簡単です。

$ cd /usr/local/Cellar/apache-spark/2.0.1/libexec/sbin
$ ./start-master.sh
$ ./start-slave.sh spark://localhost:7077 # 引数にはmasterのURLを指定する

ブラウザからlocalhost:8080にアクセスするとWeb UIが表示されるはずです。 この状態でspark-shellをmasterを指定して立ち上げるとRunning Applicationsにそれが表示されます。

$ spark-shell --master spark://localhost:7077

spark-ui

また、Actionを含む処理を実行した場合ジョブ、Web UIから詳細をたどることができます。 ここでは生成されたDAGの情報なども確認できます。

スクリーンショット 2016-11-10 13.37.22

確認ができたら、stop-master.sh, stop-slave.shあるいはstop-all.shで停止します。

masterやWeb UIのポートを変更したいときなどはstart-master.shのオプションで指定できます。 詳しくはヘルプを確認してください。

今回はstandaloneモードで立ち上げていますが、YARNやMesos上で動いているSparkを使うときも同様にmasterを指定することになります。

おわりに

今回はSpark入門としてWord Countの実装をしてみました。プログラム自体は簡単なものなのであまり面白みはなかったかもしれませんが、どのようにSparkを使い始めるかについては理解いただけたかと思います。 Sparkには他にも色々な要素がありますが、ローカル環境で動かしてみるのは非常に簡単なのでまずはやってみることをお勧めします。

明日はSpark SQLとDataFrame APIについて書かせていただく予定です。
ぜひ、お楽しみに!