Spark Dataset APIについて | Hadoop Advent Calendar 2016 #17

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、小澤です。 この記事はHadoop Advent Calendar 17日目のものとなります。

前回はSpark SQLとDataFrame APIについて紹介させていただきました。
今回はSparkのDataset APIというものについて書かせていただきます。

Dataset APIについて

Datasetは現在の最新版である2.0の一つ前の1.6から導入された機能であるため、Sparkの他のコンポーネントと比較してご存じない方も多いのではないでしょうか?

Dataset APIはRDDを使用した際の柔軟性とDataFrameを利用した際の利便性を兼ね備えた仕組みとして作られました。 Dataset APIの要件として挙げられているものをざっくりと説明すると、

  • RDDと同等かそれ以上に高速に動作する
  • 型の不整合をコンパイル時に気づけるタイプセーフなものである
  • 様々なオブジェクトをエンコードできる
  • JavaとScalaで共通のAPIを利用可能
  • DataFrameとの間の変換はシームレスに行える
  • といったものになります。より詳細な情報はJiraのチケットをご確認ください(英語です)。

    Dataset APIの利用

    Dataset APIは現在のところ、ScalaとJavaのみAPIを提供しています。 PythonとRに関しては言語の性質上Dataset APIによって得られるメリットのほとんどがDataFrameですでに達成しているという見解のようです(参考: 公式ドキュメント)
    今回はScalaから利用したいと思います。

    SparkSessionの作成

    DataFrame API同様、Dataset APIも2.0以降であればSparkSession、1.6であればSQLContextをまずは作成します。 一番最後のimportを忘れないように注意しましょう。

    import org.apache.spark.sql.SparkSession
    
    val spark = SparkSession
      .builder()
      .appName("Dataset API")
      .getOrCreate()
    
    import spark.implicits._

    Datasetの作成

    ここから先はspark-shellでの実行例となります。

    ScalaのSeqから

    まずは一番簡単な例としてScalaのSeqからDatasetを作成してみます

    scala> val ds = Seq(1, 2, 3, 4, 5).toDS()
    scala> ds
    res0: org.apache.spark.sql.Dataset[Int] = [value: int]
    scala> ds.show()
    +-----+
    |value|
    +-----+
    |    1|
    |    2|
    |    3|
    |    4|
    |    5|
    +-----+

    Int型のDatasetになっていることと、DataFrameのようにshowメソッドでデータが確認できることがわかるかと思います。

    RDDから

    RDDからも同様にtoDS()が使えまが、こちらはSparkSessionを利用しても作成できます。

    scala> val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5))
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24
    
    scala> rdd.toDS()
    res3: org.apache.spark.sql.Dataset[Int] = [value: int]
    
    scala> spark.createDataset(rdd)
    res4: org.apache.spark.sql.Dataset[Int] = [value: int]

    どちらも特に問題ないかと思います。

    DataFrameから

    DataFrameからDatasetを作るには型の情報が必要なのでまずクラスを作成ます。 作成したクラスを型として指定してDatasetを作成ます。
    今回はcase classを利用してRDDからDataFrame作成し、Dataset作成時の型指定には利用したcase classを指定します。

    scala> case class Iris(sepal_length: Double, sepal_width: Double, petal_length: Double, petal_width: Double, species: String)
    defined class Iris
    
    scala> val df = sc.textFile("iris").map {line =>
         | val item = line.split(",")
         | Iris(item(0).toDouble, item(1).toDouble, item(2).toDouble, item(3).toDouble, item(4))
         | }.toDF()
    df: org.apache.spark.sql.DataFrame = [sepal_length: double, sepal_width: double ... 3 more fields]
    
    scala> df.show(3)
    +------------+-----------+------------+-----------+-----------+
    |sepal_length|sepal_width|petal_length|petal_width|    species|
    +------------+-----------+------------+-----------+-----------+
    |         5.1|        3.5|         1.4|        0.2|Iris-setosa|
    |         4.9|        3.0|         1.4|        0.2|Iris-setosa|
    |         4.7|        3.2|         1.3|        0.2|Iris-setosa|
    +------------+-----------+------------+-----------+-----------+
    only showing top 3 rows
    
    scala> val ds = df.as[Iris]
    ds: org.apache.spark.sql.Dataset[Iris] = [sepal_length: double, sepal_width: double ... 3 more fields]
    
    scala> ds.show(3)
    +------------+-----------+------------+-----------+-----------+
    |sepal_length|sepal_width|petal_length|petal_width|    species|
    +------------+-----------+------------+-----------+-----------+
    |         5.1|        3.5|         1.4|        0.2|Iris-setosa|
    |         4.9|        3.0|         1.4|        0.2|Iris-setosa|
    |         4.7|        3.2|         1.3|        0.2|Iris-setosa|
    +------------+-----------+------------+-----------+-----------+
    only showing top 3 rows

    Datasetの利用

    Datasetを使う際のAPIは基本的にDataFrameと一緒です。以下はfilterの例です。

    scala> ds.filter(_.species == "Iris-setosa").show(3)
    +------------+-----------+------------+-----------+-----------+
    |sepal_length|sepal_width|petal_length|petal_width|    species|
    +------------+-----------+------------+-----------+-----------+
    |         5.1|        3.5|         1.4|        0.2|Iris-setosa|
    |         4.9|        3.0|         1.4|        0.2|Iris-setosa|
    |         4.7|        3.2|         1.3|        0.2|Iris-setosa|
    +------------+-----------+------------+-----------+-----------+
    only showing top 3 rows

    DataFrameのとの違いとして、selectなどをする際の型の指定があります。 これをつけないとDataFrameに変換されるようです。

    scala> ds.select('species)
    res22: org.apache.spark.sql.DataFrame = [species: string]
    
    scala> ds.select('species.as[String])
    res23: org.apache.spark.sql.Dataset[String] = [species: string]

    DataFrameやRDDへの変換

    これも簡単にできます。シームレスに変換できるようがわかりやすいかと思います。

    scala> ds.rdd
    res25: org.apache.spark.rdd.RDD[Iris] = MapPartitionsRDD[22] at rdd at <console>:35
    
    scala> ds.toDF()
    res26: org.apache.spark.sql.DataFrame = [sepal_length: double, sepal_width: double ... 3 more fields]

    注意点としては、先ほどのDatasetの利用であげた通りかなり柔軟に対応してくれるのでDatasetを扱っていたつもりがいつの間にかDataFrameになっていたということがあるかと思います。
    特に、タイプセーフを期待して使っている場合は、いつの間にかDataFrameになっており途中で間違った型を指定していたのにコンパイル時に気付けなかったということもあり得るかと思います。

    終わりに

    今回はDataset APIについて解説しました。 今すぐに必要になるという場面はまだ少ないかと思いますが、このような新しい機能も存在していることを知っておくといずれ役立つかもしれません。

    明日はSparkで機械学習をするためのMLlibについて書かせていただく予定です。
    ぜひ、お楽しみに!