Akka StreamsでMySQLからデータを取得してCSVに書き出す

こんにちは。山崎です。

MySQLのデータをCSVに書き出すバッチ処理をAkka Streamsで実装する方法を紹介します。

Akka Streamsとは

ノンブロッキングなバックプレッシャ付きの非同期ストリーム処理の標準を定めるReactive StreamsのAkkaを使った実装です。 Akka Streamsに関しましては、こちらの記事に詳しくまとまっていますのでご覧ください。

実際にコードを書いてみる

今回は「商品の情報をDBから取得しCSVファイルに書き出す」という例で実装していきます。

処理は3つの部品から構成されます

  • DBから商品の情報を取得するSource
  • 流れてきた商品情報をCSVの1行分に変換するFlow
  • ファイルに流し込むSink

これらの部品を実装してつなげることで処理を実装します。

実際にコードを見ていきましょう。

依存関係

今回使用するライブラリをbuild.sbtに記述します。

libraryDependencies ++= Seq(
  "org.scalikejdbc" %% "scalikejdbc" % "3.0.2",
  "org.scalikejdbc" %% "scalikejdbc-streams" % "3.0.2",
  "com.typesafe.akka" %% "akka-stream" % "2.5.4",
  "mysql" % "mysql-connector-java" % "5.1.44",
  "com.lightbend.akka" %% "akka-stream-alpakka-csv" % "0.8"
)

DBから商品の情報を取得するSource

ScalikeJDBC Streamsで実装します。ScalikeJDBC Streamsは、デフォルトでは1000個ずつDBからフェッチしてくれるようです。

case class Goods(id: Long, name: String, price: Int)

object Goods extends SQLSyntaxSupport[Goods] {

/**
GoodsクラスとGoodsテーブルを関連付けるScalike JDBCのコード(省略)
**/

def streamAll(): Publisher[Goods] = DB readOnlyStream {
  withSQL{ select.from(Goods as g) }
    .map(rs => Goods(g)(rs)).iterator
  }
}

ScalikeJDBC Streamsでは、DBからデータを取得するストリームを作成するDB.readOnlyStream()の戻り値として、Reactive StreamsのAPIであるPublisherが返ってきます。今回はAkka Streamsで処理を構築するため、Sourceに変換して使用します。

val source: Source[Goods, NotUsed] = Source.fromPublisher(Goods.streamAll)

流れてきた商品情報をCSVの1行分に変換するFlow

S3やSQS等様々な対象をAkka Streamsで扱う機能を提供しているAlpakkaのCSVモジュールを使用します。流れてきた要素をCSVの1行分のByteStringに変換する際、",といったCSV内でエスケープを行う必要がある文字についてエスケープを行ってくれます。また、CSVファイルの文字コードとしてShift-JISを指定することもでき便利です。

//GoodsをCSVの1行を表すIterable[String]に変換する
def convertToCSVLine(g: Goods): immutable.Iterable[String] = immutable.Seq(g.id.toString, g.name, g.price.toString)

val flow = Flow[Goods].map(convertToCSVLine).via(CsvFormatting.format(charset = Charset.forName("Shift-JIS")))

ファイルに流し込むSink

FileIO.toPath()でローカルファイルに書き込むSinkを作ることができます。

val sink: Sink[Goods, Future[IOResult]] = FileIO.toPath(csvOutputPath)

つなげて実行!

では今までに作ったSource、Flow、Sinkをつなげて実行してみましょう。

for {
  ioResult <- source.via(flow).runWith(sink)
  _ <- Future.fromTry(ioResult.status)
} yield ()

DBに要素が登録されている状態で実行すると、以下のような内容のCSVファイルが作成されます。

1,apple,100
2,orange,1000
3,"big,apple",1000
4,"small""orange""",1000

しっかりエスケープも行われています。

最後に

Akka Streamsはパイプライン処理ができるため大量のデータを扱うバッチ処理と相性が良さそうです。また、今回CSVファイルの作成に使用したAlpakkaにはS3やSQSなどをAkka Streamsで扱うためのモジュールもあり、周辺ライブラリもかなり充実してきた印象です。今後ブログ内でそちらの周辺ライブラリについても紹介していけたらなと思っています。

今回の記事のコード

https://github.com/cm-yamasaki-michihiro/akka-streams-db-to-csv

使用したライブラリ

参考資料