Sparkの構成 | Hadoop Advent Calendar 2016 #14

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、小澤です。 この記事はHadoop Advent Calendar 14日目のものとなります。

前回はHadoop上で利用かのなカラムナストレージの紹介をさせていただきました
今回はApache Sparkとはどういったものなのかについて解説したいと思います。

Sparkとは

よく「Hadoop/Spark」などのようにHadoopと同じ並びで名前が上がることが多いSparkですが、果たしてSparkとは何者なのでしょうか?Hadoopとどのように違うのでしょうか? ということを今回は解説していければと思います。

Hadoopの技術スタックについてのおさらい

まずはHadoopの技術スタックがどのように成り立っていたかを改めておさらいしておきます。 下の図はSparkを除くシステムを例にした技術スタックの図です。

spark1

データを永続化するHDFSとHadoopクラスタのリソースを管理するYANRの層がまずあり、その上にMapReduceやTezといったYARN上で実際に動くフレームワークがあります。 一番上にはHiveやPigといったフレームワークをラップし書きやすくするためのエコシステムが存在しています。

このようにいくつかのレイヤに分かれて役割分担が成り立っているHadoopですが、Sparkはこの中のどのレイヤに入るのでしょうか?あるいはHadoopとは独立した別なものなのでしょうか?

Sparkのレイヤ

結論から言ってしまうと、Sparkは上記の図においてはMapReduceやTezと同じレイヤにあたります。 しかし、Sparkは必ずしもHadoop上で動かす必要はありません。現状ではMesosというHadoopとは全く別な分散処理システム上でも動きますし、standaloneモードもあります。

また、Sparkの特徴的な要素としてMLlib, Spark SQL, Spark Streaming, GraphXの存在が挙げられます。 これらは上記の図で言うところのHiveやPigに近いレイヤになっており、Sparkはこの2つのレイヤを包含したものとも言えるかもしれません。

Sparkを含めた技術スタックとしては下の図のようになるでしょう。

spark2

この図からもSparkは非常に多くの範囲をカバーするようなものであることが伺えるかと思います。

HadoopとSparkの違い

上記の図からもお分かりいただけるかと思いますが、HadoopとSparkは決して競合でも排他的にしか活用できないものでもありません。実際Hiveなどは実行エンジンとしてSparkを選択することもできます。Spark on TezやTensorFlowをバックエンドとしてSparkといった話もあります。

また、Hadoop自体も一言でHadoopと言ったときにどこまでの範囲を指すのかの判断が難しいこともあり、HadoopとSparkで比較という単位での比較は難しいでしょう。

Sparkの特徴

では、比較の話はこのくらいにしてSparkの特徴を見ていきましょう。

RDD

Sparkの特徴はRDD(Resilient Distributed Dataset)というデータ構造にあります。これは

  • イミュータブルなコレクション
  • Partitionという単位に分割されるため分散可能
  • 耐障害性を持つ
  • 遅延処理によるActionの実行
  • インメモリでデータを保持する

というような特徴を持っています。

Sparkではまず処理のフローを記述します。 これは、入力→処理1→処理2→...→出力 という流れになります。 入力されたデータに対しての変換や演算などの処理を順に行っていき、最終的に求める結果を得られたらそれを出力するというイメージです。

SparkにはTransformationとActionと呼ばれる2種類の処理がます。

Transformation

Transformationとは、上記での処理にあたる部分が該当します。 この処理はmapやfilterなどRDDを操作して、別なRDDを生成するような内容の処理が該当します。 RDDはイミュータブルなので、Transformationの関数を実行する際は必ず入力して利用したRDDとは別なRDDが生成されます。

Action

一番最後の出力にあたる部分がActionと呼ばれるものになります。ここでは便宜的に出力としていますがRDDをScalaのコレクションに変換するような処理はこのActionにあたります。 ActionはRDDを結果として返却するcollectやデータ数を返すcountなどがあります。 Sparkで処理した内容をSparkの外に持ち出して利用したり、画面やファイルに出力をしたりなどする時はこのActionに該当する処理を実行することになります。

spark3

SparkではActionに該当する関数が呼び出されるまで処理が実行されません。 Actionが実行されたタイミングでそこに至るまでのTrasformationからDAGを生成し、順次処理していきます。 この時読み込んだ入力データはPartitionの単位で別々なノードで実行が可能なため、分散処理が実現可能となっています。

また、RDDがイミュータブルであるのはこのDAGから実際の処理が行われる過程でノードの障害などでどこかが失敗した場合にDAGの過程を辿って行って、失敗したPartitionを生成しなおすのに必要な処理のみを再実行すればいいという発想で動きます。 そのため、乱数生成など再現性のない処理にも注意が必要です。

広い依存と狭い依存

Transformationの処理には広い依存が発生するものと狭い依存のみで解決するものがあります。

例えばmap関数のように1行のデータを別な1行のデータに変換するような処理はその行のみで完結します。 このように処理内容が単一のPartition内で完結するようなものは狭い依存と呼ばれます。

一方でReduceByKeyという処理があり、これはMapReduceでのreduceのように同一のkeyごとに値を集約するような処理になります。この場合同一のkeyを持つデータというが複数のPartitionにまたがって存在している可能性があります。 こういった処理の場合shuffleが必要なのでネットワークをまたいでのデータ転送が必要になります。 また、ノードの障害などでデータを再生成する際には多くのPartitionに依存することになり、再実行しなければならないものが増えてしまいます。

Sparkでの処理は広い依存が発生するタイミングでStage呼ばれるものに分割されます。 DAG全体での処理はこのStageの単位でスケジューリングされます。

spark4

データのキャッシュ

SparkにはMLlibという機械学習ライブラリが標準で含まれています。 機械学習の多くのアルゴリズムは値が収束するまで、学習データを利用したループ処理が何度も実行されます。

この時に以下の2点について考えてみます。

  • 学習データはSpark上で前処理もしているため、前処理を終えた後のものをどこかに保持しておきたい
  • 学習のループ回数が多いのでどこかのPartitionの再生成が必要なった時に非常に多くの時間がかかる

そんな時に有効なのがこのキャッシュという機能です。

persistという関数を実行するとこで特定のRDDをキャッシュに残しておくことができます。 そうすることにより、persistしたRDDがチェックポイントのような役割となり再処理がDAG上のその場所から可能となります。

キャッシュの仕方も複数種類があります。 メモリのみをキャッシュに使うかディスクのみを使うか、あるいは両方を使うかとキャッシュする際にシリアライズするかの2つです。 シリアライズしてキャッシュしておけばデータサイズは少なくなりますが、デシリアライズにCPUリソースを使うことになります。 Sparkを実行するクラスタのハードウェア構成や処理内容に応じたキャッシュ使用量などでこれらを判断することになります。

Sparkを構成するコンポーネント

Sparkを構成する主なコンポーネントはマスターであるDriverとスレーブであるWorkerから成り立ちます。

Driver

マスターとなるコンポーネントです。 DriverがDAGを生成し、SparkContextを通してクラスタ上にあるWorkerに実際の処理を依頼します。 また、Actionコマンドによって得られた結果を受け取るのものDriverとなります。

Worker

実際に処理をするスレーブノードで通常は複数台存在します。 Workerはキャッシュ領域と処理を実行するためのプロセスであるExecutorから構成されます。

Sparkを動かす上での大きなポイントとなるのがExecutorに関する設定です。 Executorの数やコア数、メモリ量などの設定をどのようにすれば良いかは下記のスライドなどが参考になるかと思います。

終わりに

今回はSparkがどのようなものなのかを解説しました。
Sparkというワード自体はHadoopと並べて登場したり、比較対象となったりすることも多いですが、Hadoopなどをベースとした分散処理環境でのレイヤや思想の違いなどがわかっていただければ幸いです。

明日はSparkを使ってWord Countを実装する方法を書かせていただく予定です。
ぜひ、お楽しみに!