はじめてのSpark SQL!Amazon EMRを使って10分で試してみる

Amazon EMR

はじめに

Spark SQLに触ってみたので手順などをまとめました。Spark SQLというのは Apache Hiveのようにクエリ実行することで分散処理ができるものです。Hiveとの違いはインメモリであるために高速に処理できることとクエリ言語にSQLが使えることです。10分位で試せると思いますのでSparkやEMRに触ったことがない方はぜひやってみてください。

1.EC2のキーペアを用意する

EC2インスタンスにSSHで接続するのでキーペアがない場合は作成する必要があります。以下のAWSのサイトを見て作ってください。 キーペアがすでにある方はスキップしてもらって結構です。

Amazon EC2 のキーペア - Amazon Elastic Compute Cloud

2.サンプルのファイルを用意する

今回は私が趣味でやっているポケモンGOで捕まえたポケモンの名前、CP、タイプ情報が入った pokemon_go というテーブルを作成してみます。以下のようなUTF8のテキストファイルを作成しS3にアップロードして下さい。

pokemon_go.txt

シャワーズ 2063 みず
ウインディ 1711 ほのお
サンダース 1586 でんき
カイリキー 1532 かくとう
ゴルダック 1345 みず
エレブー 1337 でんき
スリーパー 1229 エスパー
キュウコン 1227 ほのお
オコリザル 1039 かくとう
アーボック 998 どく

3.クラスターを作成する

まずはクラスターを作成しましょう。Management ConsoleからEMRの画面を開いてください。 ベンダーはAmazon、リリースはemr-5.0.0、アプリケーションはSparkを選択してください。今回はログ記録はチェックを外しました。 節約のためにインスタンス数を1で試しましたが問題なく動きました。EC2キーペアには作成したキーペアを指定して下さい。 他の値はデフォルトになります。最後にクラスターを作成ボタンを押してください。

sample-emr-spark-1

クラスターを作成すると以下のような画面になります。この後にSSHでマスターノードに接続するので、masterのセキュリティグループの設定を変更してください。 インバウンドの22ポートを開ければOKです。


sample-emr-spark-2

4.SSHでマスターノードに接続してScalaのREPLを起動する

準備ができましたのでマスターノードに接続してみましょう。EC2インスタンスにSSHで接続するのと同じ要領です。 ユーザ名はhadoopになります。パスワードはなしでpemファイルを指定します。接続すると以下の画像のような画面が表示されます。


sample-emr-spark-3

spark-shell コマンドを実行してみましょう。ScalaのREPLを起動するので、対話形式でプログラムを実行できます。

$spark-shell
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
16/09/01 08:49:26 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
16/09/01 08:49:39 WARN SparkContext: Use an existing SparkContext, some configuration may not take effect.
Spark context Web UI available at http://172.31.19.27:4040
Spark context available as 'sc' (master = yarn, app id = application_1472719263238_0002).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.0.0
      /_/

Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_101)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

5.プログラムを実装する

REPLで以下のソースコートをコピペするとプログラムが実行されると思います。 inputのファイルとoutputのファイルが置かれるバケット名は適宜置き換えてください。 プログラムが終了したら:quitと打ち込むとREPLを終了することができます。

import org.apache.spark.sql._
import org.apache.spark.sql.types._

// S3からテキストファイルを読み込む
var textFile = sc.textFile("s3://cm-emr-test/input/pokemon_go.txt")

// テキストファイルからRDD(Resilient Distributed Dataset)を作成
var rdd = textFile.map(_.split(" ")).map(attributes => Row(attributes(0), attributes(1).toInt, attributes(2)))
 
// テーブル定義を作成
var fields = Array(StructField("name", StringType), StructField("cp", IntegerType), StructField("type", StringType))
var schema = StructType(fields)
 
// RDDとテーブル定義からデータフレームを作成する
var sqlContext = new SQLContext(sc)
var dataFrame = sqlContext.createDataFrame(rdd, schema)
dataFrame.registerTempTable("pokemon_go")
 
var results = sqlContext.sql("select type, max(cp), avg(cp), count(cp) from pokemon_go group by type order by avg(cp) desc")
results.show()

// S3に出力したい場合
//results.rdd.saveAsTextFile("s3://cm-emr-test/output/pokemon_go")

簡単に説明します。1,2行目はただのimport文なので飛ばします。まず5行目でS3上のファイルを読み込みます。scという変数がありますが、これはSparkContextというオブジェクトで定義しなくても使えます。8行目で読み込んだファイルをスペースで区切ります。1つめのデータを1つ目のカラム(name)、2つめのデータを2つ目のカラム(cp)、3つめのデータを3つ目のカラム(type)に割り当てます。ここで生成されたオブジェクトはRDDというデータセットになります。次に11行目でテーブル定義を作成しています。StructFieldでカラム情報を定義し配列にします。StringType、IntegerTypeというのはデータ型になります。以下のページにある型を指定できます。

org.apache.spark.sql.types.DataType

次に16行目でRDDとテーブル定義情報からDataFrameという構造化されたオブジェクトを作成します。DataFrameとSQLで指定するテーブル名を18行目で紐づけています。準備ができたのでsqlメソッドを実行します。今回はポケモンのタイプごとにポケモンの数、CPの平均、CPの最高値を求めるSQLを実行してみます。結果はDataFrame型のオブジェクトになります。中身を出力したい場合はshowメソッドで出力します。

6.結果を見てみる

showメソッドを実行すると以下のように表示されました。集計関数も問題なく実行できているようです。ファイルにして出力したい場合は上のソースコードの24行目にあるようにresultsのインスタンス変数rddのsaveAsTextFileメソッドを使えば出力できます。

scala> results.show()
+----+-------+-------+---------+
|type|max(cp)|avg(cp)|count(cp)|
+----+-------+-------+---------+
|  みず|   2063| 1704.0|        2|
| ほのお|   1711| 1469.0|        2|
| でんき|   1586| 1461.5|        2|
|かくとう|   1532| 1285.5|        2|
|エスパー|   1229| 1229.0|        1|
|  どく|    998|  998.0|        1|
+----+-------+-------+---------+

最後に

Spark SQLはSQL実行するだけなのでプログラミングでMapReduceを実装するよりも分かりやすくでいいですね。Apache SparkにはStreamingというものもあるらしいので今度試してみたいと思います。最後にEMRは実行してなくてもEC2の料金はかかるので、試した後は必ずTerminateするのを忘れないようにしましょう。