EMRでHadoop Streamingジョブを実行する

みなさま、Hadoop Streamingという機能はご存知でしょうか。Hadoopが開発された当初から存在する機能でいわゆるMapReduceをJavaではなくてPythonやRubyで実装できるという機能です。EMRにおいてもストリーミングでのデータ処理という名前で利用できるようになっています。いまならSparkを覚えた方がいいのかもしれませんが、S3に存在するファイルに対してちょっとしたフィルタリング処理を行ったりするには便利だったりするので紹介したいと思います。

前提

  • emr-5.5.0 でアプリケーションは Hadoop のみ
  • ハードウェア構成は m1.medium を 1 台(検証用なのでマスターノードのみ)
  • 東京リージョン
  • 言語はPython 2.7.12

EMRクラスタの作成

まずはEMRクラスタを作成します。AWS CLIを利用する場合は以下のようなコマンドになります。SubnetId, log-uriを自身の環境に合わせて書き換えて下さい。

aws emr create-cluster --auto-scaling-role EMR_AutoScaling_DefaultRole --applications Name=Hadoop \
  --ec2-attributes '{"InstanceProfile":"EMR_EC2_DefaultRole","SubnetId":"subnet-dXXXX"}' \
  --service-role EMR_DefaultRole --enable-debugging --release-label emr-5.5.0 \
  --log-uri 's3n://aws-logs-XXXX-ap-northeast-1/elasticmapreduce/' --name 'WordCount' \
  --instance-groups '[{"InstanceCount":1,"InstanceGroupType":"MASTER","InstanceType":"m1.medium","Name":"Master Instance Group"}]' \
  --scale-down-behavior TERMINATE_AT_INSTANCE_HOUR --region ap-northeast-1

実際に実行すると作成したEMRクラスタのClusterIdが表示されます。ClusterIdは次のWordCountジョブの実行する際に利用するので控えておいて下さい。

$ aws emr create-cluster --auto-scaling-role EMR_AutoScaling_DefaultRole --applications Name=Hadoop \
>   --ec2-attributes '{"InstanceProfile":"EMR_EC2_DefaultRole","SubnetId":"subnet-dXXXXXX"}' \
>   --service-role EMR_DefaultRole --enable-debugging --release-label emr-5.5.0 \
>   --log-uri 's3n://aws-logs-XXXXXX-ap-northeast-1/elasticmapreduce/' --name 'WordCount' \
>   --instance-groups '[{"InstanceCount":1,"InstanceGroupType":"MASTER","InstanceType":"m1.medium","Name":"Master Instance Group"}]' \
>   --scale-down-behavior TERMINATE_AT_INSTANCE_HOUR --region ap-northeast-1
{
    "ClusterId": "j-XXXXXX"
}

WordCountジョブの実行

Hadoop関連の「Hello World」と言えばWordCountですよね。ということで、Hadoop Streamingを使ってWordCountしてみたいと思います。通常であればWordCount対象となる入力データの用意と処理用のPythonプログラムのアップロードをS3に行う必要がありますが、今回はAWSが用意しているサンプルを利用したいと思います。

  • 入力データ : s3://ap-northeast-1.elasticmapreduce.samples/wordcount/data/
  • Pythonプログラム : s3://ap-northeast-1.elasticmapreduce.samples/wordcount/code/wordSplitter.py

AWS CLIを使ってWordCountジョブを登録したいと思います。引数にすると改行が入れられず見づらいのでJSONファイルを利用したいと思います。まずは以下のJSONファイルを作成して下さい。ファイル名はWordCount_step.jsonとし、出力先であるs3://mybucket/outputを自身の環境に合わせて書き換えて下さい。

[
  {
     "Name": "WordCount",
     "Type": "STREAMING",
     "ActionOnFailure": "CONTINUE",
     "Args": [
         "-files",
         "s3://ap-northeast-1.elasticmapreduce.samples/wordcount/code/wordSplitter.py",
         "-mapper",
         "wordSplitter.py",
         "-reducer",
         "aggregate",
         "-input",
         "s3://ap-northeast-1.elasticmapreduce.samples/wordcount/data/",
         "-output",
         "s3://mybucket/output"]
  }
]

ではWordCount_step.jsonを利用してWordCountジョブを登録したいと思います。j-XXXXXXXを起動したEMRクラスタのClusterIdに書き換えて以下のコマンドを実行して下さい。

aws emr add-steps --cluster-id j-XXXXXXX --steps file://./WordCount_step.json --region ap-northeast-1

マネジメントコンソールで確認すると前述のコマンドでStepとしてジョブが追加されていることが確認できます。まずはEMRサービスの[クラスターリスト]画面を表示し、[ID]列を利用して今回起動したEMRクラスタを特定して下さい。ID列の値がClusterIdと一致します。

hadoop-streaming-on-emr_1

特定したら、そのクラスタの[名前]列のリンクをクリックして下さい。クラスタの詳細画面が表示されます。

hadoop-streaming-on-emr_2

詳細画面を下にスクロールすると[ステップ]があるのでクリックして開くとステップ、つまりジョブの一覧が表示されます。そこでWordCountのステップの[▶]をクリックするとWordCountジョブの詳細を表示できます。[引数]の中身がWordCount_step.jsonに対応していることが分かります。

hadoop-streaming-on-emr_3

m1.mediumでマスターノード1台のみ構成なので、5分ほどするとS3に処理結果が出力されていると思います。

hadoop-streaming-on-emr_4

ソースの解説

まずは全体像です。今回はHadoopに予め用意されているAggregateパッケージを利用しています。そのため、Mapperに対応するPythonファイルは存在しますが、Reducerに対応するPythonファイルは存在しません。入力であるs3://ap-northeast-1.elasticmapreduce.samples/wordcount/data配下には 0001 - 0012 までの12個のテキストファイルが存在します。それぞれのファイルがwordSplitter.pyに読み込まれ単語ごとに"LongValueSum:" + word.lower() + "\t" + "1"という形式で出力されます。タブ文字がKeyとValueのデリミタになっていて、"LongValueSum:" + word.lower()までがKeyつまり単語で、"1"がValueつまり出現回数になっています。そしてaggregateではKey毎に入力が集約されてValueである"1"を足し上げることで単語の出現回数が分かります *1

hadoop-streaming-on-emr_WordCount

wordSplitter.pyの実装は以下の通りです。まず単語を抽出するための正規表現のパターンを作って、後はsys.stdin経由で入力ファイルを1行ずつ読み取り、そこから抽出した単語毎にprint関数を利用してKey-Value形式で出力しています。

#!/usr/bin/python
import sys
import re

def main(argv):
    pattern = re.compile("[a-zA-Z][a-zA-Z0-9]*")
    for line in sys.stdin:
        for word in pattern.findall(line):
            print "LongValueSum:" + word.lower() + "\t" + "1"


if __name__ == "__main__":
    main(sys.argv)

まとめると以下になります。

  • wordSplitter.pyがMapperとして実行され入力から単語を抽出してReducerに出力する
  • aggregateはHadoopに予め用意されているReducerで単語の出現回数を集計している
  • Hadoop Streamingのプログラムで入力ファイルを読み取る際は標準入力(sys.stdin)を利用し、処理結果を出力する際は標準出力(print)を利用する

出力先のディレクトリが存在する場合はエラーになるので注意

WordCountジョブを再度実行するとどうなるでしょうか。

aws emr add-steps --cluster-id j-XXXXXXX --steps file://./WordCount_step.json --region ap-northeast-1

再度実行するとエラーになります。マネジメントコンソールで確認すると[理由]にOutput directory already exists.と表示され、[詳細]にOutput directory s3://XXXXXX/wordcount already existsと表示されています。これはHadoop Streamingの仕様として、出力先のディレクトリが存在する場合は処理の実行自体を行わないようになっているためです。そのため同じジョブを実行する場合は、出力先のディレクトリを変更するか、出力先のディレクトリを削除した上で実行する必要があります。

hadoop-streaming-on-emr_5

EMRクラスタの削除

作成したEMRクラスタは忘れずに削除しましょう。AWS CLIで削除する場合は以下のコマンドで削除して下さい。

aws emr terminate-clusters --cluster-id j-XXXXXXX --region ap-northeast-1

削除できているかの確認は以下のコマンドを利用して下さい。"TERMINATED"と出力されると削除済みという意味です。

aws emr describe-cluster --cluster-id j-XXXXXXX --query Cluster.Status.State --region ap-northeast-1

最後に

いかがでしたでしょうか。Hadoop Stremingジョブの実行方法の雰囲気をつかんでいただけたでしょうか。今後、Hadoop Streamingの利用例について紹介していければと考えています。

脚注

  1. ここで足し上げているのはKeyのプレフィックスに "LongValueSum:" と指定しているためです。このプレフィックスによってaggregateは集計処理の中身を切り替える実装になっています詳細はhadoop/ValueAggregatorReducer.java at release-2.7.3-RC2 · apache/hadoopを参照して下さい。