MapReduce応用編 | Hadoop Advent Calendar 2016 #02

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、小澤です。 この記事はHadoop Advent Calendar 2日目のものとなります。

前回はMapReduce入門として、Hadoop界でのHello World!であるWord CountをMapReduceで実装する方法を紹介しました。
今回はMapReduceでのMapとReduce以外の部分について書かせていただきたいと思います。

はじめに

HadoopのMapReduceといえばMapとReduceのみを記述すればそれ以外の部分はフレームワーク側で吸収してくれるものになっています。 しかし、実際の処理内容によってはそれ以外の部分の挙動も把握したり必要に応じた工夫が必要になったりする場合があります。 そこで今回はそれらの挙動について解説します

Combiner

CombinerとはMapperごとのReducerのようなものです。 どういうことかと言いますと、例えばWord Countの場合各Mapperは同じ単語に対して複数の(単語, 1)という出力を行う場合があります。

これには1つの文章単位で同一の単語が複数回登場する場合の他に、同一のMapperでの処理対象となった複数の文章間に同一の単語が含まれる場合も含んでいます。

このCombinerの処理を挟むことによって、各MapperごとにReudcerと同等の処理を行うことになり、Mapperでの出力が(単語, 1)から(単語, 出現回数)となります。ただし、各Mapperで処理を行ったデータ内での集約なので、Reducerと違って複数のMapperで同じkeyとなる単語が分散しており、最後に全体でのReudce処理が必要になることに変わりはありません。

では、同じような処理をCombinerとReducerで2回行う必要があるのでしょうか? これはMapとReduceを繋ぐ部分でネットワークをまたいでのデータ転送が発生する部分が存在するため、Combinerの処理を挟むことによってネットワークでのデータ転送量を削減できるからとなっております。 集約の部分を2段階に分けても結果が変わらない場合は積極的に使っていく方がいいでしょう。

Combinerを利用するには、mainメソッドでの処理に

job.setCombinerClass(WordCountReducer.class);

のように指定します。 この時はReducerクラスを継承し、reduceメソッドを実装したクラスが指定できます(通常のReducerと全く同じものが使えます)。ただし、制約としてCombinerの出力はReducerへ渡されるため、入力のkey-valueと出力key-valueが同一の型になっている必要があります。

ShuffleとSort

ShuffleとSortはMapperからReducerへデータを受け渡すときに行われる処理になります。

Shuffle

ShuffleではどのデータをどのReducerへ渡すかの処理を記載します。 これはMapperの出力においてkeyのハッシュ値(のようなもの)を返すことで、どのMapperでも同一keyには同一のハッシュ値が返され、結果的に同じReducerへと渡されることになります。

Partitionerと呼ばれるクラスを継承したクラスを実装することでこの挙動も変更することができます。 0からReducer数-1の乱数を与えることで完全にバラバラに散らすことも可能ですが、そんなことをしてもあまり意味がありません。 例えばソートをするときにn分位ごとに分けることで分散させてのソートが可能になるなどが考えられます。

Partitionerの実装は、org.apache.hadoop.mapreduce.Partitioner<KEY, VALUE>を継承し、getPartition(KEY key, VALUE value, int numPartitions)メソッドを実装することで実現できます。 実装したPartitionerはmainメソッドで

job.setPartitionerClass(<実装したクラス>.class);

とすれば利用されます。

Sort

Reducerに渡ってきたデータは実はkeyごとにソートされています。 そのため、Reducerでの処理は上から順に実行されるというルールに則った処理も実は可能となっています。

ただし、どのReducerにどのkeyの値が来るかを想定する必要がある場合にはPartitionerと合わせて使う必要があります。 例えばPartitionerでkeyの先頭一文字だけを基準にハッシュ値を生成した場合、同一の文字から始まる値は全て同一のReducerにあり、かつ辞書順にソートされている、といった状態を作ることが可能です。

DistributedCache(分散キャッシュ)

分散キャッシュとは実行する全ノードに対して同じデータを配布する手段です。

例えばマスターデータと実績データのような、小さいデータと大きいデータを結合したい場合があります。 この場合、Mapperで両方のデータを読んで結合対象の値を同じkeyとして出力、Reducerで結合した結果を最終出力にするという方法が考えられます。 ここでマスターデータは小さいものであることを利用し、分散キャッシュとして各Mapperのメモリ上に保持しておくことでMapperのみでこの処理を実現することが可能となります。

MapReduceではReduceのみの処理はできませんが、Map処理のみのjobというのはありえますので、余計な処理を省くことができます。 後続の処理がある場合はもう1段階のMapReduceを実行せずに手の空いたReducer側で実行するといった方法も考えられるでしょう。

分散キャッシュの利用はmainメソッドに

job.addCacheFile(<ファイルのパスを記載したURIクラス>);

と記述します。やorg.apache.hadoop.fs.Path#toUriメソッドが利用できます。

分散キャッシュからデータを取り出すにはMapperで

URI[] uris = context.getCacheFiles();

とするとで分散キャッシュのリストを取得したり、パスを直接指定してnewするなどで可能となります。

全体像

MapReduerの処理フローに今回のないようのものを赤字記載した処理フロー全体像はこのようになります。

advanced-hadoop

ここで挙げた要素は普段意識しなくても、MapReduceをラップしたエコシステム内部ではMapReduceの処理のチューニングとして処理時間向上の使われていたりするものをあります。

終わりに

今回は、HadoopのMapReduceフレームワークにおいて、MapとReduceの間を受け渡しするコンポーネントの挙動を変更する方法などを紹介しました。

明日はMapReduceから離れて、Hadoopを構成する重要なコンポーネントの1つであるHDFSがどのようなものなのかを書かせていただく予定です。
ぜひ、お楽しみに!