Hadoop MapReduce入門 | Hadoop Advent Calendar 2016 #01
こんにちは、小澤です。この記事はHadoop Advent Calendar 1日目のものとなります。
今回から始まります、Hadoop Advent Calendar 2106 初日ということでまずはHadoopの始まりとも言えるMapReduceについて解説したいと思います。
MapReudce自体を直接書くという場面は現在ではほとんどないかもしれませんが、Hadoop上で動く他のエコシステムにも多くの影響を与えているものです。 Hadoopは成熟してきてはいますが、まだまだ内部を動きを意識しておかないとうまくいかない場面も多いので知っておくと役立つこともあるかと思います。
MapReduceと分散処理
MapReduceとは、分散処理のフレームワークの一つです。 もともとはGoogleが発表した「MapReduce: Simplified Data Processing on Large Clusters」という論文に書かれてる内容に基づいた実装となっています。
具体的にどういうものかというと、利用者はMapと呼ばれる処理とReduceと呼ばれる処理のみ実装すれば分散処理環境でそれを簡単に実行できるというものです。
MapReduceが具体的にどのような処理を行うかをWord Countと呼ばれる処理のフローとともに追っていきたいと思います。 Word Countは文章中の各単語の出現回数を求めるというHadoop界隈ではHello World!のような存在ですが、実際に利用していく中でもこのWord Countに似た処理を実装する場面は多いことから、実用的な例とも言えます。 今回は、入力は複数の英語の短文が改行区切りで入っているテキストファイルを入力とします。
Map
HadoopにおけるMapとは入力を受け取り、それを変換しkey-valueの形式で出力を行う処理を実装するためのものになります。
Word Countでは1つ1つの短文が入力となります。 入力された短文は英語なのでスペースで分割することで個々の単語に分解できます。これに対して(単語, 1)というkey-vakueのペアを生成することで、1つの文章中に"単語"が"1"回出てきたという情報を生成することができます。 文章中に同じ単語が2回出てきた場合は、全く同じ単語で(単語, 1)という組み合わせを2つ、3回の場合はこの組み合わせを3つ生成します。 (単語, 2)や(単語, 3)としてしまった方がいいように見えるかもしれません。1つの短文に対しては通常のプログラムが記載できるので、そうすることも可能ですがここではあえてそうしていません。
個々の短文は他の短文への処理に影響していないためMap処理は並列して複数同時実行が可能となっています。 このため、データの規模が大きくなっても複数のコンピュータで分散させて実行することが可能となっています。
Reduce
ReduceではMapで出力したkey-valueのペアのうち同じkeyを持つものを集約して処理を行います。 この処理では別々なコンピュータで処理されたMapでもkeyが同じであれば同一のReduceに集約された状態で実行されます。
Word Countの例ではMapで(単語, 1)というペアが生成されているため、 (単語1, (1, 1, ..., 1)), (単語2, (1, 1, ..., 1))のようにMapで同一のkeyとして出力されたkeyごとにvalueの値が配列のような形式で与えられます。 この全ての"1"を足し合わせることで、全体で個々単語の出現回数をカウントできるという仕組みになっています。
Reduceも別なkeyを持つものは個々の処理には影響しないので分散させて並列で処理可能な仕組みになっているわけです。
その他の仕組み
MapReduceのフレームワークに乗せることで、MapやReduceを何台のコンピュータで分散処理させるかや、Mapの出力をReduceに渡すための間の処理はHadoopがやってくれるため利用者が意識しなくてよくなっています。 しかし、それらを制御するための仕組みや設定も存在していますが今回は入門ということでそれらは割愛さていただきます。
Word Countの実装
最後にWord Countを実装して動かしてみます。
Mapperクラス
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private Text outputKey = new Text(); private IntWritable outputValue = new IntWritable(1); @Override public void map(LongWritable inputKey, Text inputValue, Context context) throws IOException, InterruptedException { // 入力から文章を取り出してスペース区切りにする for (String word : inputValue.toString().split(" ")) { outputKey.set(word.trim().toLowerCase()); // 個々の単語ごとの(単語, 1)を出力 context.write(outputKey, outputValue); } } }
Reducerクラス
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable outputValue = new IntWritable(); @Override public void reduce(Text inputKey, Iterable<IntWritable> inputValue, Context context) throws IOException, InterruptedException { int result = 0; for (IntWritable i : inputValue) { // 同一keyのすべてのvalueを足し合わせる result += i.get(); } outputValue.set(result); context.write(inputKey, outputValue); } }
Mainクラス
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class WordCountMain extends Configured implements Tool { @Override public int run(String[] args) throws Exception { // hadoopの設定に関する処理 Configuration conf = this.getConf(); Job job = Job.getInstance(conf, "Word Count"); // Map, Reduceに使うクラスの指定 job.setJarByClass(WordCountMain.class); job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); // 出力の型に関する情報 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 入出力先の指定 FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // jobの実行 return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { System.exit(ToolRunner.run(new WordCountMain(), args)); } }
その他
Hadoop Clientへの依存を記述します。Mavenの場合
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.3</version> </dependency>
のような依存を追加します。他のビルドツールを利用してる場合は適宜置き換えてください。
処理を実行するにはhadoopコマンドを利用します。 動作を確認するだけであればクラスタの構築の必要ないので、例えばMacであれば
$ brew install hadoop
のようにhomebrewでインストールしてしまうのが簡単です。
準備が整いましたので実行してみます。
$ hadoop jar <jarファイルのパス> <mainメソッドのあるクラス名> <入力ファイル> <出力ディレクトリ>
入力に関してはディレクトリを指定した場合はそれ以下に含まれるすべてのファイルが対象となります。 出力はディレクトリが作成されその中に"part-r-00000"のような形式でreduceの数に応じた連番で出力されます。
おわりに
以上で今回の内容は終了となります。 MapReduceを直接触る機会は昔に比べてほとんどなくなってしまいましたが、この動作はHadoop上で動作する他のフレームワークやエコシステムを利用する際にも知っておくと役にたつかと思います。 動かすだけであれば分散処理環境を構築せずとも可能なので今後Hadoopに触る予定のある方や今一度基礎の確認をしておきたい方は一度実装し、動かしてみることをお勧めします。
明日は「MapReduce応用編」として、MapReduceに関連する処理の動作を制御する方法について書かせていただく予定です。
ぜひ、お楽しみに!