はじめてのApache Spark !Amazon EMRを使って10分で試してみる

Amazon EMR

はじめに

先日、AWSのEMR(Elastic MapReduce)というサービスを使ってApache Sparkに初めて触ってみました。サンプルプログラムを作成するところまでの手順を自分でまとめてみました。前提としてS3とEC2などに触ったことがあり、キーペアやセキュリティグループは分かる方を対象としています。10分位で試せるようにしてありますので、SparkやEMRに触ったことがない方はぜひやってみてください。

1.EC2のキーペアを用意する

EC2インスタンスにSSHで接続するのでキーペアがない場合は作成する必要があります。以下のAWSのサイトを見て作ってください。キーペアがすでにある方はスキップしてもらって結構です。

Amazon EC2 のキーペア - Amazon Elastic Compute Cloud

2.サンプルのファイルを用意する

今回はテキストファイルの中に単語の出現回数をカウントするサンプルプログラムを作ってみようと思います。 サンプルプログラムが読み込むテキストファイルを用意する必要がありますが、今回は以下のようなダミーテキストを作成してくれるサイトを使いました。 テキストファイルにしてS3にアップロードします。私はgzip形式に圧縮したものをアップロードしました。

Dummy Text Generator | Lorem ipsum for webdesigners

3.クラスターを作成する

それではいよいよクラスターを作成しましょう。Management ConsoleからEMRの画面を開いてください。 ベンダーはAmazon、リリースはemr-5.0.0、アプリケーションはSparkを選択してください。今回はログ記録はチェックを外しました。 節約のためにインスタンス数を1で試しましたが問題なく動きました。EC2キーペアには作成したキーペアを指定して下さい。 他の値はデフォルトになります。最後にクラスターを作成ボタンを押してください。

sample-emr-spark-1

クラスターを作成すると以下のような画面になります。この後にSSHでマスターノードに接続するので、masterのセキュリティグループの設定を変更してください。 インバウンドの22ポートを開ければOKです。


sample-emr-spark-2

4.SSHでマスターノードに接続してScalaのREPLを起動する

準備ができましたのでマスターノードに接続してみましょう。EC2インスタンスにSSHで接続するのと同じ要領です。 ユーザ名はhadoopになります。パスワードはなしでpemファイルを指定します。接続すると以下の画像のような画面が表示されます。


sample-emr-spark-3

spark-shellコマンドを実行してみましょう。ScalaのREPLを起動するので、対話形式でプログラムを実行できます。

$spark-shell
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
16/09/01 08:49:26 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
16/09/01 08:49:39 WARN SparkContext: Use an existing SparkContext, some configuration may not take effect.
Spark context Web UI available at http://172.31.19.27:4040
Spark context available as 'sc' (master = yarn, app id = application_1472719263238_0002).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.0.0
      /_/

Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_101)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

5.実装する

REPLで以下のように入力してEnterするとプログラムが実行されると思います。 inputのファイルとoutputのファイルが置かれるバケット名は適宜置き換えてください。 プログラムが終了したら:quitと打ち込むとREPLを終了することができます。

sc.textFile("s3://cm-emr-test/input/sample-emr-spark.txt.gz").flatMap(_.split(" ").map(word => (word,1))).reduceByKey(_+_).saveAsTextFile("s3://cm-emr-test/output")

scという変数がありますが、これはSpark Contextというオブジェクトで定義しなくても使えます。 Scalaが分からない方のためにJavaで行を分けて書くと以下のようになります。

// テキストファイルの読み込みます
JavaRDD<String> lines = sc.textFile("s3://cm-emr-test/input/sample-emr-spark.txt.gz");

// スペース区切りで単語単位に分割します
JavaRDD<String> words = lines.flatMap(line ->  Arrays.asList(line.split(" ")).iterator());

// (word, 1)のタプルへ変換
JavaPairRDD<String, Integer> pairs = words.mapToPair(word -> new Tuple2<>(word, 1));

// 重複単語をreduceします
JavaPairRDD<String, Integer> counts = pairs.reduceByKey(Integer::sum);

// s3へ保存します
counts.saveAsTextFile("s3://cm-emr-test/output");

簡単に説明すると2行目でS3上のファイルを読み込んで、5行目でスペース区切りにします。 8行目で(単語, 1) のようなタプルへ変換しています。単語の部分はキーと呼ばれます。JavaなどのMapオブジェクトのようなものでしょうか。 これがMapReduceのMap処理になります。まだこの時点では出現回数は合算されておらず、まだ以下のような状態です。

(bbb, 1)
(aaa, 1)
(ccc, 1)
(aaa, 1)
(bbb, 1)
(aaa, 1)
(ccc, 1)
(aaa, 1)
(bbb, 1)

11行目で同じ単語があった場合、同じキーの値を足しています。これがMapReduceのReduce処理になります。 因みにMapReduce はMap、Shuffle、Reduce の3つのフェイズに分かれていますが、Shuffle は内部で自動的に行われています。 最終的に以下のような状態になり、14行目でS3へテキストファイルとして出力しています。

(aaa, 4)
(bbb, 3)
(ccc, 2)

6.結果を見てみる

指定したS3の場所にファイルが出力されていると思いますので確認します。分散処理されているので以下のようにファイルが複数出力されていると思います。

sample-emr-spark-4

どれか1つダウンロードしてみましょう。ファイルを開いて以下のように単語と出現回数のテキストになっていれば成功です!

(vulputate,2)
(pretium,1)
(faucibus,1)
(sapien,1)
(eu,1)
(nunc,,2)
(nec,,2)
(varius,1)
(quam,3)
(ante.,1)
(eleifend,2)
(rhoncus,,1)
(ligula,1)

最後に

Sparkなどの分散処理には前から興味があったのですが、面倒なイメージがあリ触れてみることがありませんでした。今回初めてAmazon EMR上で動かしてみたところ思ってたよりもすぐ試せて驚きました。今後はJavaでMapReduceを実装したりHiveも試してみようと思います。最後にEMRは実行してなくても料金はかかるので、試した後は必ずTerminateするのを忘れないようにしましょう。

  • Dai seki

    質問なのですが、
    「sc.textFile(“s3://cm-emr-test/input/sample-emr-spark.txt.gz”).flatMap(_.split(” “).map(word => (word,1))).reduceByKey(_+_).saveAsTextFile(“s3://cm-emr-test/output”)」

    こちらを、自分のバケット名、ファイル名に置き換えて実行すると、

    org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: s3://….
    とエラーがおきます。

    何度も見返したのですが、エラーが変わらないのでご教授頂けたら幸いです。
    よろしくお願い申し上げます。m(_ _)m