[Spark][Scala] Spark2.0でEncoderを用いてDatasetを生成してみる

2016.08.22

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

分散環境での大規模データ処理エンジンであるSparkの最新バージョン 2.0 が先月リリースされました。主にSQL周りのAPIとDataFrame/Dataset周りのAPIに改良がなされており、具体的には以下のような変更がありました。

  • DataFrameとDatasetの統合
  • 従来のSQLContext,HiveContextに代わる新しいエントリポイントであるSparkSessionの導入
  • 共有変数であるアキュムレータAPIのリプレース(従来のAPIはDeprecated)
  • DataFrameで動く機械学習API(org.apache.spark.ml.~)の拡充
  • 機械学習APIで構築したモデルの永続化と他言語SDKへの共有
  • R向け分散アルゴリズム

Datasetは1.6からすでに導入されているAPIですが、今回2.0の変更にともなって改めて外部のデータからDatasetに格納するまでの手順を再現してみました。本記事はそのメモになります。

Dataset

DatasetはSpark 1.6から導入されたAPIで、以下の様な特徴を備えています

  • 静的型付け(オブジェクトに対応する型引数Tをもつ)
  • イミュータブル

この性質はSparkのインターフェイスとして用いられる言語であるScalaやJavaと相性がよく、Spark内部におけるメモリ最適化等を行うTungstenの恩恵もコンパイル時に受けられるようになっています。もちろん既存のAPIであるRDDやDataFrameとの相互変換も簡単に行えるようになっており、既存のコードからの移行もスムーズに行えそうです。

Encoder

EncoderはSparkSQLの実行の最適化を図るモジュールCatalyst optimizerでのシリアライズとデシリアライズの責務を担います。JVMのオブジェクトTについて、そのEncoder(Scalaの記法で書くとEncoder[T])はSparkSQLの内部バイナリフォーマットへTを変換する役割を果たします。

SparkSessionを用いたDataSetの生成サンプル

新しいSparkのエントリポイントSparkSessionを用いてEncoderの動作を確認してみます。

環境情報

  • java 1.8.0_92
  • sbt 0.13.8
  • scala 2.11.8
  • spark 2.0.0

実行にあたってはサンプルコードを以下のレポジトリにあげてあります。

UsrNameu1/Spark2Study - Github

クローン後、プロジェクトルートで下記のコマンドを実行して

sbt run

下記のような出力を得ます

class[id[0]: bigint, name[0]: string]Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
+---+-------+
| id|   name|
+---+-------+
| 12|   John|
| 24|Gozzila|
+---+-------+

サンプルの内容

まずプロジェクトのディレクトリでsparkが動作するよう、以下の様なbuild.sbtを配置します。

PROJECTPATH/build.sbt

name := "Spark2Study"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "2.0.0",
  "org.apache.spark" %% "spark-mllib" % "2.0.0",
  "org.apache.spark" %% "spark-sql" % "2.0.0"
)

サンプルではJson文字列の記された以下のファイルからDatasetを生成します。

PROJECTPATH/persons.json

{"id": 12, "name": "John"}
{"id": 24, "name": "Gozzila"}

次に下記のような実行サンプルコードを配置します。

PROJECTPATH/src/main/scala-2.11/EncoderSample.scala

import org.apache.spark.sql.{Encoders, SparkSession}
import org.apache.log4j.Logger
import org.apache.log4j.Level


object EncoderSample extends App {

  case class Person(id: Long, name: String)

  Logger.getLogger("org").setLevel(Level.OFF) 
  Logger.getLogger("akka").setLevel(Level.OFF)

  val encoder = Encoders.product[Person]
  print(encoder)

  val spark = SparkSession.builder().master("local").appName("sample").getOrCreate()
  import spark.implicits._

  val sqlContext = spark.sqlContext
  val persons = sqlContext.read.json("./persons.json").as[Person]
  print(persons.show)
}

l6

Appを継承してEncoderSampleをサンプル実行のエントリポイントとします。

l8

Datasetの型引数Tに対応するPersonオブジェクトをcase classで宣言します。case classがProductトレイトを継承しているため、のちの暗黙的変換がスムーズに行えます。このPersonオブジェクトの各フィールドの名前はpersons.jsonに定義されたjsonフィールドに対応していることに注目してください。Sparkが自動生成するEncoderは実際のオブジェクトのフィールド名にもとづいてjsonをパースするため、対応していないとエラーになります。

l10,11

Log4jのログを切ります

l13,14

実際のEncoderの生成手順を確認するためにPerson向けのEncoderを生成し、下記のような出力をおこないます。この部分のコード自体は以降のスコープのコードに影響を与えません。

class[id[0]: bigint, name[0]: string]

l16

Spark2.0のエントリポイントSparkSessionを生成します。これはローカル向けの設定で、実際のEMR等の分散環境では違った設定が必要になります。

l17

SparkSessionに付随したオブジェクトimplicits以下をインポートすることで後コードがEncoder[T]への暗黙的な変換を様々なオブジェクトに対して行えるようにします。implicitsそのものは抽象クラスSQLImplicitsを継承した単純なオブジェクトで、implicitsそのものに宣言はほとんどされていません。ですが、SQLImplicitsには下記のように多くの暗黙的変換を司る関数が定義されています。

/**
 * A collection of implicit methods for converting common Scala objects into [[Dataset]]s.
 *
 * @since 1.6.0
 */
abstract class SQLImplicits {

  protected def _sqlContext: SQLContext

  /**
   * Converts $"col name" into a [[Column]].
   *
   * @since 2.0.0
   */
  implicit class StringToColumn(val sc: StringContext) {
    def $(args: Any*): ColumnName = {
      new ColumnName(sc.s(args: _*))
    }
  }

  /** @since 1.6.0 */
  implicit def newProductEncoder[T <: Product : TypeTag]: Encoder[T] = Encoders.product[T]

  // Primitives

  /** @since 1.6.0 */
...

このうち実際に今回重要な役割を担っているのが newProductEncoder 関数で、これはProductを継承した型TにたいしてEncoder[T]への暗黙的変換を実施します。先ほども言及したようにcase classはProductトレイトを継承しているために、任意のcase classに対してこのような暗黙的変換が成立し、後のコードを実行するスコープの中にEncoder[Person]が常に存在する前提を持ってコードを書き進められるというわけです。

l19-21

person.jsonファイルからJsonを読み込み、それをDataset[Person]として扱いpersons定数に格納します。read.json("./persons.json")ではJsonからデータを読み込んで実際のJsonフィールドにもとづいてDataFrameを生成しますが、DataFrameはSpark2.0からDataset[Row]のタイプエイリアスでしかなく、Datasetに定義されたメソッドを用いれます。Datasetのメソッドas[Person]で行われている内容を深追いすると、以下の様なDataSetを生成するapply関数にたどり着きます。

def apply[T: Encoder](sparkSession: SparkSession, logicalPlan: LogicalPlan): Dataset[T] = {
  new Dataset(sparkSession, logicalPlan, implicitly[Encoder[T]])
}

ここでスコープの中にEncoder[Person]があるという前提があるため、Datasetの生成が無事行われるというわけです。

print(persons) で下記のような出力が行われます。

+---+-------+
| id|   name|
+---+-------+
| 12|   John|
| 24|Gozzila|
+---+-------+

まとめ

本記事ではSpark2.0でJson文字列からDataset[T]を生成するまでの手順を取り上げました。去年の夏くらいに1.5がリリースされていたと思ったらあっという間に2.0まできてしまいました。Scalaのような静的な型言語をインターフェイスとして採用している恩恵が得られるようなアップデートが今後も続くことをねがっています。

参考