Hadoop Streamingジョブの処理結果を圧縮する

2017.06.20

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

今回はHadoop Streamingジョブの処理結果を圧縮する方法について紹介します。以前紹介した以下のエントリーに対してEMRで利用可能な圧縮方式を試しています。

grepコマンドをHadoop Streaming上で実行する

結論

先に結論を書いておきます。EMRで利用可能な圧縮形式の一覧は以下でした。

  • DEFLATE(zlib)
  • gzip
  • bzip2
  • LZO
  • LZ4
  • Snappy

また、Hadoop Streamingジョブの処理結果を圧縮する際は以下の2つのオプションを利用します。

  • mapreduce.output.fileoutputformat.compress
  • mapreduce.output.fileoutputformat.compress.codec

mapreduce.output.fileoutputformat.compressオプションにtrueを指定すると圧縮されるようになります。mapreduce.output.fileoutputformat.compress.codecオプションは圧縮形式に対応したクラス名を指定します。例えばgzipの場合はorg.apache.hadoop.io.compress.GzipCodecを指定します。

実行環境

  • emr-5.6.0 でアプリケーションは Hadoop のみ
    • Hadoop 2.7.3
  • ハードウェア構成は m1.medium を 1 台(検証用なのでマスターノードのみ)
  • 東京リージョン

EMRクラスタの作成

まずはEMRクラスタを作成します。AWS CLIを利用する場合は以下のようなコマンドになります。SubnetId, log-uriを自身の環境に合わせて書き換えて下さい。

aws emr create-cluster --auto-scaling-role EMR_AutoScaling_DefaultRole --applications Name=Hadoop \
  --ec2-attributes '{"InstanceProfile":"EMR_EC2_DefaultRole","SubnetId":"subnet-XXXX"}' \
  --service-role EMR_DefaultRole --enable-debugging --release-label emr-5.6.0 \
  --log-uri 's3n://aws-logs-XXXX-ap-northeast-1/elasticmapreduce/' --name 'compress' \
  --instance-groups '[{"InstanceCount":1,"InstanceGroupType":"MASTER","InstanceType":"m1.medium","Name":"Master Instance Group"}]' \
  --scale-down-behavior TERMINATE_AT_INSTANCE_HOUR --region ap-northeast-1

実際に実行すると作成したEMRクラスタのClusterIdが表示されます。ClusterIdはジョブを実行する際に利用するので控えておいて下さい。

$ aws emr create-cluster ...
{
    "ClusterId": "j-XXXXXX"
}

gzip形式

まずはgzip形式で出力してみましょう。AWS CLIの引数に指定するSONファイルを作成して下さい。ファイル名はcompress_gzip_step.jsonとし、出力先であるs3://mybucket/outputを自身の環境に合わせて書き換えて下さい。

compress_gzip_step.json

[
  {
     "Name": "compress_gzip",
     "Type": "STREAMING",
     "ActionOnFailure": "CONTINUE",
     "Args": [
         "-D stream.non.zero.exit.is.failure=false",
         "-D mapreduce.job.reduces=0",
         "-D stream.map.output=keyonlytext",
         "-D mapreduce.output.fileoutputformat.compress=true",
         "-D mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec",
         "-mapper",
         "grep hbase",
         "-input",
         "s3://ap-northeast-1.elasticmapreduce.samples/elb-access-logs/data",
         "-output",
         "s3://mybucket/output"]
  }
]

grepコマンドをHadoop Streaming上で実行する | Developers.IOと比べて以下の2つのオプションを指定しています。

  • -D mapreduce.output.fileoutputformat.compress=true
  • -D mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec

ではcompress_gzip_step.jsonを利用してジョブを登録したいと思います。j-XXXXXXXを起動したEMRクラスタのClusterIdに書き換えて以下のコマンドを実行して下さい。

aws emr add-steps --cluster-id j-XXXXXXX --steps file://./compress_gzip_step.json --region ap-northeast-1

m1.mediumでマスターノード1台のみ構成なので、10分ほどするとS3に処理結果が出力されると思います。入力ファイルごとに拡張子がgzとなりgzip圧縮されていることが分かります。

compress-output-of-hadoop-streaming-on-emr_gzip

その他の圧縮形式

その他の圧縮形式を指定した際の結果を貼っておきます。クラス名はorg.apache.hadoop.io.compressパッケージに含まれている *1Codecクラスを順番に試してみました。

DEFLATE(zlib)

-D mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.DeflateCodecを指定します。

compress-output-of-hadoop-streaming-on-emr_deflate

bzip2

-D mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.BZip2Codecを指定します。

compress-output-of-hadoop-streaming-on-emr_bzip2

LZO

-D mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.LzoCodecを指定します。

compress-output-of-hadoop-streaming-on-emr_lzo

LZ4

-D mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.Lz4Codecを指定します。

compress-output-of-hadoop-streaming-on-emr_lz4

Snappy

-D mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodecを指定します。

compress-output-of-hadoop-streaming-on-emr_snappy

EMRクラスタの削除

作成したEMRクラスタは忘れずに削除しましょう。AWS CLIで削除する場合は以下のコマンドで削除して下さい。

aws emr terminate-clusters --cluster-id j-XXXXXXX --region ap-northeast-1

削除できているかの確認は以下のコマンドを利用して下さい。"TERMINATED"と出力されると削除済みという意味です。

aws emr describe-cluster --cluster-id j-XXXXXXX --query Cluster.Status.State --region ap-northeast-1

デフォルトの圧縮形式

mapreduce.output.fileoutputformat.compress.codecオプションを指定しない場合はデフォルトの圧縮形式としてorg.apache.hadoop.io.compress.DefaultCodecが利用されるようになっています。そして、DefaultCodecが適用された場合の圧縮形式はDEFLATE(zlib)となります。この辺りは歴史の話になるのですが、先にDefaultCodecが作成され、DEFLATEを明示的に指定できるようにDeflateCodecが作成された *2という経緯があったりします。そのため、DeflateCodec.javaの実装はDefaultCodecを継承しているだけ *3だったりしますw

最後に

EMRで利用可能な圧縮形式を知りたくて調査してみました。LZ4は知らなかったので勉強になりました。参考になれば幸いです。

参考

脚注