注目の記事

Treasure Dataの新データ転送ツールEmbulkを触ってみた #dtm_meetup

2015.01.28

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

ども、大瀧です。
本日開催されたデータ転送ミドルウェア勉強会で、開発者の@frsyukiさんによって語られたEmbulkを早速触ってみました。

Embulkとは

EmbulkはOSSのデータ転送ソフトウェアです。もう少し細かく分類するとBulk Loaderと呼ばれる、バッチ処理のためのデータ読み込み、変換処理を行う機能を持ちます。特徴は柔軟なプラグイン構造で、データを読み込むInputPlugin、データを出力するOutputPlugin、さらに読み込み処理自体もExecutorPluginと、機能のほとんどをプラグインとして後から追加、カスタマイズできるよう高度に抽象化されたアーキテクチャになっています。データのハンドリングもプラグインでの実装を想定しているようなので、ここまで来るとミドルウェアというより、データ転送のインターフェースを定義するフレームワークの実装、という方が近い感じかなという印象です。

上の図を見て「fluentdと似てるなぁ」と思われる方もいると思います。EmbulkはfluentdのBulk Loader版、とTDさんの発表でも紹介されていました。ですので、fluentdと同様、プラグインの充実がEmbulkが流行るかどうかのキーポイントになりそうですね。そのほかにもデータ転送ツールに必要なETL的な動きや読み込み時のエラー処理などいろいろ考慮されています。その辺りの概要は、@frsyukiさんの発表資料が詳しいです。

[slideshare id=43934176&doc=embuk-release-150127020151-conversion-gate01&w=480&p=15]

触ってみた

GitHubのREADMEページには、Quick Startとしてチュートリアルが公開されています。これに沿って簡単に動かすことができたので、解説してみます。

EmbulkはコアをJava、プラグインをJRubyで記述されています。動かすだけであれば、JavaのインストールされたPCでBintrayのJarファイルがあれば動きます。まずは、Jarファイルをダウンロードします。

$ wget https://bintray.com/artifact/download/embulk/maven/embulk-0.2.0.jar -O embulk.jar
--2015-01-28 00:05:59--  https://bintray.com/artifact/download/embulk/maven/embulk-0.2.0.jar
  :
HTTP request sent, awaiting response... 200 OK
Length: 50672932 (48M) [application/unknown]
Saving to: 'embulk.jar'

100%[====================================================================================>] 50,672,932  2.77MB/s   in 14s

2015-01-28 00:06:14 (3.39 MB/s) - 'embulk.jar' saved [50672932/50672932]

$

ダウンロードしたJarファイルをjavaコマンドで指定すれば、いきなり動きます。ヘルプで様子を見てみます。

$ java -jar embulk.jar --help
usage: <command> [--options]
commands:
   bundle    [directory]                              # create or update plugin environment.
   run       <config.yml>                             # run a bulk load transaction.
   preview   <config.yml>                             # dry-run the bulk load without output and show preview.
   guess     <partial-config.yml> -o <output.yml>     # guess missing parameters to create a complete configuration file.
   gem       <install | list | help>                  # install a plugin or show installed plugins.
                                                      # plugin path is /Users/ryuta/.embulk/jruby/1.9
   example   [path]                                   # creates an example config file and csv file to try embulk.

Use `<command> --help` to see description of the commands.
$

では、使ってみましょう。本来はロード元データとEmbulkの設定ファイルをそれぞれ用意するところですが、exampleサブコマンドでサンプルデータをさくっと生成できるので、今回はこれで様子を見てみます。

$ java -jar embulk.jar example ./try1
Creating ./try1 directory...
  Creating ./try1/
  Creating ./try1/csv/
  Creating ./try1/csv/sample_01.csv.gz
  Creating ./try1/example.yml

Run following subcommands to try embulk:

   1. guess ./try1/example.yml -o config.yml
   2. preview config.yml
   3. run config.yml

$

チュートリアル的にそのあと実行するべきコマンドラインまで出てくる、親切仕様です(笑
生成されたファイルを確認してみます。

$ tree try1/
try1/
├── csv
│   └── sample_01.csv.gz
└── example.yml

1 directory, 2 files
$

サンプルデータとしてGzip形式で圧縮されたCSVファイルtry1/csv/sample_01.csv.gzがあります。中身はこんな感じ。

$ gzcat try1/csv/sample_01.csv.gz
id,account,time,purchase,comment
1,32864,2015-01-27 19:23:49,20150127,embulk
2,14824,2015-01-27 19:01:23,20150127,embulk jruby
3,27559,2015-01-28 02:20:02,20150128,embulk core
4,11270,2015-01-29 11:54:36,20150129,"Embulk ""csv"" parser plugin"
$

よくあるCSVファイルですね。それから、Embulkの設定ファイルtry1/example.ymlです。

$ cat try1/example.yml
in:
  type: file
  paths: ["/Users/ryuta/Temp/embulk-test/try1/csv"]
out:
  type: stdout
$

なんとなく読めますね。今回はデータ元として/Users/ryuta/Temp/embulk-test/try1/csvディレクトリ以下のファイル、転送先として標準出力(stdout)が見えますね。まだドキュメントがないので他の書き方は手探りな感じですが、GitHubのexamplesが参考になると思います。

で、本来はこのファイルのin:以下に要素を追加して、Gzipの展開やCSVのスキーマ定義を記述するところなのですが、Embulkには元データの先頭を読んで展開処理やスキーマ定義を自動生成するguessサブコマンドが用意されています。賢い!

$ java -jar embulk.jar guess ./try1/example.yml -o config.yml
2015-01-28 00:18:55,277 [INFO]: main:org.embulk.standards.LocalFileInputPlugin: Listing local files with prefix '/Users/ryuta/Temp/embulk-test/try1/csv'
exec: {}
in:
  type: file
  paths: [/Users/ryuta/Temp/embulk-test/try1/csv]
  decoders:
  - {type: gzip}
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    header_line: true
    columns:
    - {name: id, type: long}
    - {name: account, type: long}
    - {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
    - {name: purchase, type: timestamp, format: '%Y%m%d'}
    - {name: comment, type: string}
out: {type: stdout}
$

ぺろっとできたconfig.ymlファイルが、スキーマ定義が追加されたEmbulkの設定ファイルです。では、この設定ファイルを利用して、CSVファイルを読み込んでみます。

膨大なデータの場合、実際にデータロードを行う前にちゃんとパースされているのかDRY-RUNしたいことが多いと思います。Embulkではpreviewサブコマンドでできます。

$ java -jar embulk.jar preview config.yml
2015-01-28 00:20:31,991 [INFO]: main:org.embulk.standards.LocalFileInputPlugin: Listing local files with prefix '/Users/ryuta/Temp/embulk-test/try1/csv'
+---------+--------------+-------------------------+-------------------------+----------------------------+
| id:long | account:long |          time:timestamp |      purchase:timestamp |             comment:string |
+---------+--------------+-------------------------+-------------------------+----------------------------+
|       1 |       32,864 | 2015-01-27 19:23:49 UTC | 2015-01-27 00:00:00 UTC |                     embulk |
|       2 |       14,824 | 2015-01-27 19:01:23 UTC | 2015-01-27 00:00:00 UTC |               embulk jruby |
|       3 |       27,559 | 2015-01-28 02:20:02 UTC | 2015-01-28 00:00:00 UTC |                embulk core |
|       4 |       11,270 | 2015-01-29 11:54:36 UTC | 2015-01-29 00:00:00 UTC | Embulk "csv" parser plugin |
+---------+--------------+-------------------------+-------------------------+----------------------------+
$

表形式の出力結果から、ちゃんとパースできていることがわかりますね。@frsyukiさんが発表の中で、「タイムスタンプの解析がいろいろできます」と言っていた通り、purchase列のようなちょっと変則的なタイムスタンプ(例 : 20150127)もちゃんとtimestampとしてパースされています。これまた賢い!

では、本実行してみます。

$ java -jar embulk.jar run config.yml
2015-01-28 00:21:16,941 [INFO]: main:org.embulk.standards.LocalFileInputPlugin: Listing local files with prefix '/Users/ryuta/Temp/embulk-test/try1/csv'
2015-01-28 00:21:17,027 [INFO]: main:org.embulk.exec.LocalExecutor: Running 1 tasks using 8 local threads
2015-01-28 00:21:17,027 [INFO]: main:org.embulk.exec.LocalExecutor: {done:  0 / 1, running: 0}
1,32864,2015-01-27 19:23:49,20150127,embulk
2,14824,2015-01-27 19:01:23,20150127,embulk jruby
3,27559,2015-01-28 02:20:02,20150128,embulk core
4,11270,2015-01-29 11:54:36,20150129,Embulk "csv" parser plugin
2015-01-28 00:21:17,164 [INFO]: main:org.embulk.exec.LocalExecutor: {done:  1 / 1, running: 0}
2015-01-28 00:21:17,165 [INFO]: main:org.embulk.command.Runner: next config: {}
$

今回は標準出力ということだったのでコマンドの実行結果にデータが含まれる形ですが、これがデータベースなどなんらかのデータストアへのデータ格納として処理されるわけですね。現時点ではPostgreSQL JSONのOutputPluginRedisのInput/OutputPluginなどが公開されています。

まとめ

チュートリアルの紹介でしたが、Embulkを実行するイメージは持っていただけたんじゃないかと思います。

Embulkのキモはプラグインだと思うので、fluentdのようにサクッとプロトタイプとして書いてみたり、公開されたプラグインを自分なりにカスタマイズして使えるとかなり便利そうですね。プラグイン部分はJRubyなので、Rubyのシンタックスで書けるのはかなり良さそうな印象です。皆さんもEmbulkを触って、あわよくばContributeしていきましょう!