初めてのEMR!Hadoopクラスタを起動してみた。

こんにちは、データインテグレーション部の平野です。

前職ではHadoopをぼちぼち触っていたのですが、EMRは心理的なハードルのせいか触ったことがありませんでした。 AWSに触ることにも少しずつ慣れてきたので、EMRでHadoopクラスタの作成を試してみました。 基本的にEMRについては「Hadoopのクラスタが作成できる」という知識しかなかったくらいなので、同じくらいの土俵にいる方の参考になればと思います。

クラスタの起動

まずはサービス一覧からEMRを選択します。

「クラスターの起動」を選択します。 すでに起動したクラスタが2つくらいあるのは目を瞑ってください。

各種設定項目です。

一般設定

クラスター名

お好きな名前を付けてください。

ログ記録

クラスタのログを記録するかどうか、記録する際は、どのS3バケットに保存するかを指定します。 今回はログ記録を有効にしてみました。

起動モード

「クラスター」と「ステップ実行」があります。

「クラスター」は、オンプレでのHadoopクラスタと同様に、クラスタを起動して、その上で各種アプリケーションを流します。アプリケーションを動かしていないときにもクラスタが起動したままになります。

「ステップ実行」は、Hadoopクラスタの起動と同時に、指定したジョブを実行させます。そしてそのジョブが終了したらHadoopクラスタごと終了させるというモードです。

私はまだステップ実行についてはよくわかっていないのですが、上記のような理解で良さそうです。 とりあえず今回は(オンプレ的には)普通に「クラスター」モードで起動してみます。

ソフトウェア設定

リリース

EMRのバージョンの指定です。特に理由がないので最新を選択します。

アプリケーション

クラスタにあらかじめインストールしておくアプリケーションのセットを選択します。 今回はまずはHadoopでMapReduceを使用することを目標にしているので、「Core Hadoop」を選択しました。

テーブルメタデータにAWS Glue Data Catalogを使用

「外部Hiveメタストアを提供します」とあり、今回はシンプルなHadoopでMapReduce機能だけを使えれば良いのでチェックしませんでした。

ハードウェア構成

インスタンスタイプ

クラスタを構成するインスタンスタイプを選択します。 各インスタンスタイプを選択した場合の、デフォルトのリソース設定は Hadoop デーモン構成設定 のページに記載がありますので、適切なものを選んでください。

なお、私は最初に最小なものということで「c1.medium」を選択したのですが、これだと後述のサンプルプログラムすらメモリが足りずに動かなかったので、改めて、デフォルトで選択されていた「c3.xlarge」を選択してクラスタを構築し直しました。

インスタンス数

マスターとスレーブに使うの合計のインスタンス台数を設定します。 マスターに1台使いますので、ここで設定した数値-1のスレーブが立ち上がりますのでご注意ください。

セキュリティとアクセス

EC2キーペア

クラスタ内のホストにアクセスするためのキーペアを指定します。

アクセス権限

IAM関連のロール等の設定です。 今回はまずはお試しということでデフォルトのまま進めます。

設定は以上にて完了です。 「クラスターを作成」を押します。

クラスタ起動完了

クラスターが立ち上がるまで数分待ちます。 マスターパブリックDNSが表示されれば、クラスターの準備完了です。

ssh接続

早速立ち上がったクラスタにsshでアクセスしてみます。 「SSH」というリンクを押すと下記のようにアクセスするコマンドが表示されます。 基本的にはキーペアのファイルを正しく指定し直すだけで接続ができます。

$ ssh -i hirano-key-01.pem hadoop@ec2-XXX-XXX-XXX-XXX.ap-northeast-1.compute.amazonaws.com
The authenticity of host 'ec2-XXX-XXX-XXX-XXX.ap-northeast-1.compute.amazonaws.com (XX.XX.XX.XX)' can't be established.
ECDSA key fingerprint is SHA256:+VJBPjaIWBUSNHhYJ/XCpRnz7qkwvovXYRchnUmDrsM.
Are you sure you want to continue connecting (yes/no)? yes
Warning: Permanently added 'ec2-XXX-XXX-XXX-XXX.ap-northeast-1.compute.amazonaws.com,XX.XX.XXX.XXX' (ECDSA) to the list of known hosts.

       __|  __|_  )
       _|  (     /   Amazon Linux AMI
      ___|\___|___|

https://aws.amazon.com/amazon-linux-ami/2018.03-release-notes/
3 package(s) needed for security, out of 8 available
Run "sudo yum update" to apply all updates.

EEEEEEEEEEEEEEEEEEEE MMMMMMMM           MMMMMMMM RRRRRRRRRRRRRRR
E::::::::::::::::::E M:::::::M         M:::::::M R::::::::::::::R
EE:::::EEEEEEEEE:::E M::::::::M       M::::::::M R:::::RRRRRR:::::R
  E::::E       EEEEE M:::::::::M     M:::::::::M RR::::R      R::::R
  E::::E             M::::::M:::M   M:::M::::::M   R:::R      R::::R
  E:::::EEEEEEEEEE   M:::::M M:::M M:::M M:::::M   R:::RRRRRR:::::R
  E::::::::::::::E   M:::::M  M:::M:::M  M:::::M   R:::::::::::RR
  E:::::EEEEEEEEEE   M:::::M   M:::::M   M:::::M   R:::RRRRRR::::R
  E::::E             M:::::M    M:::M    M:::::M   R:::R      R::::R
  E::::E       EEEEE M:::::M     MMM     M:::::M   R:::R      R::::R
EE:::::EEEEEEEE::::E M:::::M             M:::::M   R:::R      R::::R
E::::::::::::::::::E M:::::M             M:::::M RR::::R      R::::R
EEEEEEEEEEEEEEEEEEEE MMMMMMM             MMMMMMM RRRRRRR      RRRRRR

上記のように「EMR」が表示されれば接続完了です。

コマンド実行

HDFS

hadoopコマンドにはパスが通っているので、HDFSへのアクセスは下記のコマンドでできます。

$ hadoop fs -ls

今の所ホームディレクトリには何もファイルがないので出力はありませんが、他のディレクトリにもアクセスできることが確認できます。

なおEMRで立ち上げたクラスタのHDFSは、クラスタを一度削除したら消えてしまいますので、プログラム内で一時的な中間ファイル置き場にする以外はユーザーが明示的にHDFSを使用することはない(使用しない方が良い)です。 EMRFSというS3をHDFSとして使用する仕組みがあるので、データの入出力にはS3を使用します。

Exampleプログラム

mapreduceのexampleを動かしてみます。

下記は準モンテカルロ法で円周率を求めるサンプルになります。 2つのMapプロセスでそれぞれ1000サンプルのデータを出力して、Reduceにて円周率の計算を行なっています。

[hadoop@ip-172-31-12-4 ~]$ hadoop jar /usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar pi 2 1000
Number of Maps  = 2
Samples per Map = 1000
Wrote input for Map #0
Wrote input for Map #1
Starting Job
18/11/08 02:15:19 INFO client.RMProxy: Connecting to ResourceManager at ip-172-31-12-4.ap-northeast-1.compute.internal/172.31.12.4:8032
18/11/08 02:15:20 INFO input.FileInputFormat: Total input files to process : 2
18/11/08 02:15:20 INFO mapreduce.JobSubmitter: number of splits:2
18/11/08 02:15:20 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1541642551220_0001
18/11/08 02:15:21 INFO impl.YarnClientImpl: Submitted application application_1541642551220_0001
18/11/08 02:15:21 INFO mapreduce.Job: The url to track the job: http://ip-172-31-12-4.ap-northeast-1.compute.internal:20888/proxy/application_1541642551220_0001/
18/11/08 02:15:21 INFO mapreduce.Job: Running job: job_1541642551220_0001
18/11/08 02:15:29 INFO mapreduce.Job: Job job_1541642551220_0001 running in uber mode : false
18/11/08 02:15:29 INFO mapreduce.Job:  map 0% reduce 0%
18/11/08 02:15:37 INFO mapreduce.Job:  map 100% reduce 0%
18/11/08 02:15:43 INFO mapreduce.Job:  map 100% reduce 100%
18/11/08 02:15:44 INFO mapreduce.Job: Job job_1541642551220_0001 completed successfully
18/11/08 02:15:44 INFO mapreduce.Job: Counters: 49
        File System Counters
                FILE: Number of bytes read=50
                FILE: Number of bytes written=508164
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=606
                HDFS: Number of bytes written=215
                HDFS: Number of read operations=11
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=3
        Job Counters
                Launched map tasks=2
                Launched reduce tasks=1
                Data-local map tasks=2
                Total time spent by all maps in occupied slots (ms)=528210
                Total time spent by all reduces in occupied slots (ms)=298710
                Total time spent by all map tasks (ms)=11738
                Total time spent by all reduce tasks (ms)=3319
                Total vcore-milliseconds taken by all map tasks=11738
                Total vcore-milliseconds taken by all reduce tasks=3319
                Total megabyte-milliseconds taken by all map tasks=16902720
                Total megabyte-milliseconds taken by all reduce tasks=9558720
        Map-Reduce Framework
                Map input records=2
                Map output records=4
                Map output bytes=36
                Map output materialized bytes=68
                Input split bytes=370
                Combine input records=0
                Combine output records=0
                Reduce input groups=2
                Reduce shuffle bytes=68
                Reduce input records=4
                Reduce output records=0
                Spilled Records=8
                Shuffled Maps =2
                Failed Shuffles=0
                Merged Map outputs=2
                GC time elapsed (ms)=234
                CPU time spent (ms)=2350
                Physical memory (bytes) snapshot=1088983040
                Virtual memory (bytes) snapshot=10859073536
                Total committed heap usage (bytes)=1059586048
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters
                Bytes Read=236
        File Output Format Counters
                Bytes Written=97
Job Finished in 25.0 seconds
Estimated value of Pi is 3.14400000000000000000

無事動きました。 2000サンプルで算出された円周率は3.144でした。

なお、先ほども書きましたが、最小のインスタンスタイプc1.mediumで実行してみたところ、メモリが足りずにジョブが失敗してしまいました。

[hadoop@ip-172-31-20-254 ~]$ hadoop jar /usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar pi 2 1000
Number of Maps  = 2
Samples per Map = 1000
Wrote input for Map #0
Wrote input for Map #1
Starting Job
18/11/08 01:53:33 INFO client.RMProxy: Connecting to ResourceManager at ip-172-31-20-254.ap-northeast-1.compute.internal/172.31.20.254:8032
18/11/08 01:53:34 INFO input.FileInputFormat: Total input files to process : 2
18/11/08 01:53:35 INFO mapreduce.JobSubmitter: number of splits:2
18/11/08 01:53:35 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1541641244893_0002
18/11/08 01:53:36 INFO impl.YarnClientImpl: Submitted application application_1541641244893_0002
18/11/08 01:53:37 INFO mapreduce.Job: The url to track the job: http://ip-172-31-20-254.ap-northeast-1.compute.internal:20888/proxy/application_1541641244893_0002/
18/11/08 01:53:37 INFO mapreduce.Job: Running job: job_1541641244893_0002
18/11/08 01:53:46 INFO mapreduce.Job: Job job_1541641244893_0002 running in uber mode : false
18/11/08 01:53:46 INFO mapreduce.Job:  map 0% reduce 0%
18/11/08 01:53:46 INFO mapreduce.Job: Job job_1541641244893_0002 failed with state FAILED due to: Application application_1541641244893_0002 failed 2 times due to AM Container for appattempt_1541641244893_0002_000002 exited with  exitCode: -103
Failing this attempt.Diagnostics: Container [pid=9174,containerID=container_1541641244893_0002_02_000001] is running beyond virtual memory limits. Current usage: 108.7 MB of 512 MB physical memory used; 2.8 GB of 2.5 GB virtual memory used. Killing container.
Dump of the process-tree for container_1541641244893_0002_02_000001 :
        |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
        |- 9182 9174 9174 9174 (java) 504 26 2860437504 27118 /usr/lib/jvm/java-openjdk/bin/java -Djava.io.tmpdir=/mnt/yarn/usercache/hadoop/appcache/application_1541641244893_0002/container_1541641244893_0002_02_000001/tmp -Dlog4j.configuration=container-log4j.properties -Dyarn.app.container.log.dir=/var/log/hadoop-yarn/containers/application_1541641244893_0002/container_1541641244893_0002_02_000001 -Dyarn.app.container.log.filesize=0 -Dhadoop.root.logger=INFO,CLA -Dhadoop.root.logfile=syslog -Xmx1024m org.apache.hadoop.mapreduce.v2.app.MRAppMaster
        |- 9174 9172 9174 9174 (bash) 0 0 115863552 701 /bin/bash -c /usr/lib/jvm/java-openjdk/bin/java -Djava.io.tmpdir=/mnt/yarn/usercache/hadoop/appcache/application_1541641244893_0002/container_1541641244893_0002_02_000001/tmp -Dlog4j.configuration=container-log4j.properties -Dyarn.app.container.log.dir=/var/log/hadoop-yarn/containers/application_1541641244893_0002/container_1541641244893_0002_02_000001 -Dyarn.app.container.log.filesize=0 -Dhadoop.root.logger=INFO,CLA -Dhadoop.root.logfile=syslog  -Xmx1024m org.apache.hadoop.mapreduce.v2.app.MRAppMaster 1>/var/log/hadoop-yarn/containers/application_1541641244893_0002/container_1541641244893_0002_02_000001/stdout 2>/var/log/hadoop-yarn/containers/application_1541641244893_0002/container_1541641244893_0002_02_000001/stderr

Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143
For more detailed output, check the application tracking page: http://ip-172-31-20-254.ap-northeast-1.compute.internal:8088/cluster/app/application_1541641244893_0002 Then click on links to logs of each attempt.
. Failing the application.
18/11/08 01:53:46 INFO mapreduce.Job: Counters: 0
Job job_1541641244893_0002 failed!
[hadoop@ip-172-31-20-254 ~]$ hadoop jar /usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar pi 2 10
Number of Maps  = 2
Samples per Map = 10
Wrote input for Map #0
Wrote input for Map #1
Starting Job
18/11/08 01:54:14 INFO client.RMProxy: Connecting to ResourceManager at ip-172-31-20-254.ap-northeast-1.compute.internal/172.31.20.254:8032
18/11/08 01:54:15 INFO input.FileInputFormat: Total input files to process : 2
18/11/08 01:54:15 INFO mapreduce.JobSubmitter: number of splits:2
18/11/08 01:54:16 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1541641244893_0003
18/11/08 01:54:16 INFO impl.YarnClientImpl: Submitted application application_1541641244893_0003
18/11/08 01:54:16 INFO mapreduce.Job: The url to track the job: http://ip-172-31-20-254.ap-northeast-1.compute.internal:20888/proxy/application_1541641244893_0003/
18/11/08 01:54:16 INFO mapreduce.Job: Running job: job_1541641244893_0003
18/11/08 01:54:27 INFO mapreduce.Job: Job job_1541641244893_0003 running in uber mode : false
18/11/08 01:54:27 INFO mapreduce.Job:  map 0% reduce 0%
18/11/08 01:54:28 INFO mapreduce.Job: Job job_1541641244893_0003 failed with state FAILED due to: Application application_1541641244893_0003 failed 2 times due to AM Container for appattempt_1541641244893_0003_000002 exited with  exitCode: -103
Failing this attempt.Diagnostics: Container [pid=9178,containerID=container_1541641244893_0003_02_000001] is running beyond virtual memory limits. Current usage: 111.6 MB of 512 MB physical memory used; 2.8 GB of 2.5 GB virtual memory used. Killing container.
Dump of the process-tree for container_1541641244893_0003_02_000001 :
        |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
        |- 9178 9176 9178 9178 (bash) 0 0 115863552 720 /bin/bash -c /usr/lib/jvm/java-openjdk/bin/java -Djava.io.tmpdir=/mnt/yarn/usercache/hadoop/appcache/application_1541641244893_0003/container_1541641244893_0003_02_000001/tmp -Dlog4j.configuration=container-log4j.properties -Dyarn.app.container.log.dir=/var/log/hadoop-yarn/containers/application_1541641244893_0003/container_1541641244893_0003_02_000001 -Dyarn.app.container.log.filesize=0 -Dhadoop.root.logger=INFO,CLA -Dhadoop.root.logfile=syslog  -Xmx1024m org.apache.hadoop.mapreduce.v2.app.MRAppMaster 1>/var/log/hadoop-yarn/containers/application_1541641244893_0003/container_1541641244893_0003_02_000001/stdout 2>/var/log/hadoop-yarn/containers/application_1541641244893_0003/container_1541641244893_0003_02_000001/stderr
        |- 9186 9178 9178 9178 (java) 543 26 2856353792 27846 /usr/lib/jvm/java-openjdk/bin/java -Djava.io.tmpdir=/mnt/yarn/usercache/hadoop/appcache/application_1541641244893_0003/container_1541641244893_0003_02_000001/tmp -Dlog4j.configuration=container-log4j.properties -Dyarn.app.container.log.dir=/var/log/hadoop-yarn/containers/application_1541641244893_0003/container_1541641244893_0003_02_000001 -Dyarn.app.container.log.filesize=0 -Dhadoop.root.logger=INFO,CLA -Dhadoop.root.logfile=syslog -Xmx1024m org.apache.hadoop.mapreduce.v2.app.MRAppMaster

Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143
For more detailed output, check the application tracking page: http://ip-172-31-20-254.ap-northeast-1.compute.internal:8088/cluster/app/application_1541641244893_0003 Then click on links to logs of each attempt.
. Failing the application.
18/11/08 01:54:28 INFO mapreduce.Job: Counters: 0
Job job_1541641244893_0003 failed!

Hadoopは小さいジョブでも、結構潤沢な環境でないと動かないという印象を持っていましたが、そこはEMRでもやはり変わらないようです。 チューニングする方法はあるかと思いますが、まずはある程度大きめのインスタンスタイプを設定した方が良さそうです。

アプリケーションの履歴

「アプリケーションの履歴」タブから、実行したアプリケーションの確認ができます。 しかし当該画面にも書いてある通り、Spark以外のアプリケーションの詳細は、ここでは見ることは出来ないようです。

クラスタログの確認

クラスタのログをS3に保存する設定をしていましたので、確認して見ます。

指定したバケットに、クラスタIDのディレクトリができていることが確認できます。

クラスタの削除

今回は「クラスター」モードでの起動でしたので、当然今回使ったクラスタはずっと残り続けています。 EMRはEC2の料金に加えて、EMRの料金もかかってしまいますので、不要になったクラスタは早めに削除したいところです。

下記のように、クラスタの一覧から停止したいクラスタを選択して「削除」ボタンを押すだけです。

まとめ

EMRについてはほとんど何も知らない状態からHadoopのクラスタ起動とサンプルアプリの実行までをやってみました。

オンプレ環境でマスターとスレーブの複数台のマシンにOSをインストールして、いろいろと設定してクラスタを構築するのを考えると、驚くほど簡単にクラスタが出来てしまいました。 またステップ実行モードという存在には驚きました。 Hadoopクラスタまで使い捨てとは、なんだか隔世の感があります。 しかしHDFSなどもいろいろと煩雑なことが多いので、クラスタ自体を使い捨てにするというのは理に適っているなぁ、とも感じました。

ファイルの入出力をS3に対応させるなどの処置は必要になる(scpなどで自力で取ってくることも可能ではあると思いますが・・・)ものの、思ったよりもオンプレと同じ感覚で使うことができると感じました。 試すだけならば特に大きく迷うところもなかったので、もっと早く試してみればよかったな、というのが正直なところです。

以上、誰かの参考になれば幸いです。