[Apache Spark]RDDについて簡単にまとめてみた

Apache_Spark

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

t.hondaです。前回の最後に書いたように、今回はRDDについて書いてみたいと思います。

RDD(Resilient Distributed Dataset)

RDDとは、以前にも書きましたが「不変(イミュータブル)で並列実行可能な(分割された)コレクション」です。Apache Sparkのプログラミングでは、このRDDにデータを保持して操作することがメインとなります。RDDの操作には用意されているメソッドを使うことで、Sparkは自動的に分散処理を行い、開発者は分散処理を意識することなくプログラミングできます。

RDDのメソッドの種類

RDDに保持したデータを操作するメソッドは大きく分けて2つに分類されます。「Transformations」と「Actions」です。「Transformations」はRDDを操作し、結果を新しいRDDとして返します。「Actions」はRDDのデータを操作し、結果をRDD以外の形式で返すか保存を行います。大雑把に言えば、RDDを返すのが「Transformations」、そうではないものが「Actions」であると言えると思います。

以下、「Transformations」と「Actions」についてのサンプルを見て行きたいと思います。

サンプルソース

サンプルでは「KEN_All_ROME.CSV」という住所の郵便番号データを読み込み、RDDのメソッドを試してみました。読み込んだデータはCSV形式で、以下のようになっています。

"0600000","北海道","札幌市 中央区","以下に掲載がない場合","HOKKAIDO","SAPPORO SHI CHUO KU","IKANIKEISAIGANAIBAAI"
"0640941","北海道","札幌市 中央区","旭ケ丘","HOKKAIDO","SAPPORO SHI CHUO KU","ASAHIGAOKA"
"0600041","北海道","札幌市 中央区","大通東","HOKKAIDO","SAPPORO SHI CHUO KU","ODORIHIGASHI"
(以降略)

以下、サンプルソースです。

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object Startup{
  def printRDD(filterName: String, rdd: org.apache.spark.rdd.RDD[_]) = {
    println(filterName)

    rdd.foreach {r => {
        println(r)
      }
    }
  }

  def main(args: Array[String]) :Unit = {
    val conf = new SparkConf().setAppName("RddSample").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val inputRDD = sc.textFile("KEN_All_ROME.CSV")

    //mapの例
    val addresses = inputRDD.map{line =>
      val splited = line.replace("\"", "").split(",")

      var result: Array[String] = null
      if (splited(6) == "IKANIKEISAIGANAIBAAI")
        result = Array(splited(0), splited(4), splited(5))
      else
        result = Array(splited(0), splited(4), splited(5), splited(6))

      result.mkString(" ")
    }

    printRDD("mappedRDD", addresses)
    
    //filterとunionの例
    val filtered1 = addresses.filter(line => line.contains("OSAKA")).filter(line => line.contains("AOBADAI"))
    val filtered2 = addresses.filter(line => line.contains("KANAGAWA")).filter(line => line.contains("WAKABADAI"))
    val unioned = filtered1.union(filtered2)

    printRDD("filtered RDD 1", filtered1)
    printRDD("filtered RDD 2", filtered2)
    printRDD("unioned RDD", unioned)

    //flatMapの例
    val flatmapped = unioned.flatMap(line => line.split(" "))
    printRDD("flatmapped", flatmapped)

    //reduceの例
    val reduced = flatmapped.reduce((x, y) => x + " " + y)
    println("reduced")
    println(reduced)

    //countの例
    val count = inputRDD.count
    println("count")
    println(count)

  }
}

Transformations

「Transformations」の代表的なメソッドは、mapやfilterです。mapやfilterを含め、いくつかのメソッド使ったサンプルを作ってみました。以下、各メソッドの説明です。

map

mapはRDD内のデータの一つ一つに対して記述した処理を行い、結果を返します。サンプルでは21〜31行目でmapを行っています。やっていることは

  1. 住所の郵便番号データを一行ずつ読み込む(21行目)
  2. ダブルクォートを消し、カンマで分割する(22行目)
  3. 郵便番号・住所のローマ字を取得して配列に格納する。この時7カラム目が「IKANIKEISAIGANAIBAAI」(以下に掲載がない場合)かどうかを判定し、左記の場合は配列に格納しないようにする。(24〜29行目)
  4. 取得した郵便番号・住所のローマ字をスペース区切りで連結する(30行目)


となります。実行結果は以下のようになります。

5031316 GIFU KEN YORO GUN YORO CHO OSHIKOSHI
0600000 HOKKAIDO SAPPORO SHI CHUO KU
0640941 HOKKAIDO SAPPORO SHI CHUO KU ASAHIGAOKA
(以降略)

mapの中で条件分岐できるなど、ビジネスロジックを記述し易いと言えると思います。

filter

filterは文字通りフィルタリングするメソッドです。上記のmapで取得したRDDに対して、以下のフィルタリングを行っています。

  1. 「OSAKA」(大阪)、「AOBADAI」(青葉台)を含む行を抽出(36行目)
  2. 「KANAGAWA」(神奈川)、「WAKABADAI」(若葉台)を含む行を抽出(37行目)


を行っております。実行結果は以下のようになります。

5860068 OSAKA FU KAWACHINAGANO SHI KITAAOBADAI
5860067 OSAKA FU KAWACHINAGANO SHI MINAMIAOBADAI
5941153 OSAKA FU IZUMI SHI AOBADAI
5900407 OSAKA FU SENNAN GUN KUMATORI CHO AOBADAI
(中略)
2410801 KANAGAWA KEN YOKOHAMA SHI ASAHI KU WAKABADAI
2520112 KANAGAWA KEN SAGAMIHARA SHI MIDORI KU WAKABADAI

union

unionはRDD同士を連結するものです。filterで取得したRDDをunionしています(38行目)。実行結果は以下のようになります。

5860068 OSAKA FU KAWACHINAGANO SHI KITAAOBADAI
5860067 OSAKA FU KAWACHINAGANO SHI MINAMIAOBADAI
5941153 OSAKA FU IZUMI SHI AOBADAI
5900407 OSAKA FU SENNAN GUN KUMATORI CHO AOBADAI
2410801 KANAGAWA KEN YOKOHAMA SHI ASAHI KU WAKABADAI
2520112 KANAGAWA KEN SAGAMIHARA SHI MIDORI KU WAKABADAI

flatMap

flatMapでは各カラムを分割し、一つのカラムにしています。サンプルではunionした結果をスペースで分割し、一つのカラムにしています(45行目)。上手く説明できないのですが(汗)、実行結果を見れば分かるかと思います。

5860068
OSAKA
FU
KAWACHINAGANO
SHI
(以降略)

Actions

「Actions」の代表的なメソッドは、reduceやcountです。以下、これらのメソッドの説明となります。

reduce

reduceはRDD内の2つの要素に対して操作を行い、結果を返します。サンプルではflatMapで一つのカラムにしたデータに対して、再びスペース区切りで連結しています(49行目)。実行結果は以下の通りです。

2410801 KANAGAWA KEN YOKOHAMA SHI ASAHI KU WAKABADAI 2520112 KANAGAWA KEN SAGAMIHARA SHI MIDORI KU WAKABADAI 5860068 OSAKA FU KAWACHINAGANO SHI KITAAOBADAI (以降略)

並び順がflatMapの結果と異なっていますが、スペース区切りで連結されています。

count

countは文字通りRDDのデータの件数を返すメソッドです。サンプルでは54行目で行っており、実行結果は以下のようになります。

123699

まとめ

RDDにあるいくつかのメソッドを見てきました。mapで行ったように通常の高級言語のプログラムを書く感覚で、条件分岐などを行うことができます。またこれらのデータの操作がSparkによって自動的に分散処理されることも、ビッグデータを扱う際などには重要なことかと思います。
RDDには上記で紹介したメソッド以外にも、様々なメソッドがあります。以下の公式サイトなどを参考にしてみてください。
Spark Programming Guide

参考文献

Learning Spark