[Embulk]データを加工するプラグインの開発とデバッグ

2015.05.04

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

様々なファイルを取り込むのに便利なツールであるEmbulkですが、業務要件に合わせてデータを加工したいことはあるかと思います。今回はデータを加工するスクリプトをJRubyにて開発しデバッグしてみたので、その手順についてまとめてみたいと思います。

尚、今回はプラグインをgemとして公開する方法については取り上げておりません。予めご了承ください。

今回作成するプラグインについて

今回行う処理ですが

  1. 〜.csv.gzのファイルを読み込み、解凍する。中身はShift-JISのCSVファイルである。
  2. 解凍したファイルを読み込み、定義ファイルに記述されたカラムと、タイムスタンプを追記する。
  3. 上記で編集したデータを再び〜.csv.gzに圧縮して出力する。この時にUTF-8に変換し、タブ区切りにする。

という流れとなります。

このうち1.と3.は既に用意されているプラグインにて実現できるので、独自に開発するのは2.の処理となります。

作成手順

では作成手順です。先にも書いた通り、既存のプラグインの読み込みと、独自の処理をプラグインとして開発する手順の混在となります。

尚、Embulkとプラグイン開発に必要なJRubyのインストールは終っているものとします。

1.読み込むファイルの準備

以下のコマンドにてCSVファイルを.csv.gzに圧縮したサンプルが作成されるので、実行してください。

$ java -jar embulk.jar example ./mydata

出力される mydata/csv/sample_01.csv.gzを解凍し、以下のように編集して再び圧縮します。

id,account,time,purchase,comment
1,32864,2015-01-27 19:23:49,20150127,embulk
2,14824,2015-01-27 19:01:23,20150127,embulk jruby
3,27559,2015-01-28 02:20:02,20150128,"Embulk ""csv"" parser plugin"
4,11270,2015-01-29 11:54:36,20150129,NULL
5,56513,2015-01-29 11:54:36,20150129,This is a comment.

2.configの作成

Embulkはyml形式のconfigファイルにて、実行するプラグインや読み込むファイル等を記述します。まずは独自の処理なしで、.csv.gzを解凍して再び圧縮するconfigファイルを作成し、実行してみましょう。今回はconfigファイルは「config_dev.yml」という名前で作成しました。

config_dev.yml
in:
  type: file
  path_prefix: ./mydata/csv/sample_01.csv.gz
  decoders:
  - {type: gzip}
  parser:
    charset: Shift-JIS
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    escape: ''
    skip_header_lines: 1
    columns:
    - {name: id, type: long}
    - {name: account, type: long}
    - {name: time, type: string}
    - {name: purchase, type: string}
    - {name: comment, type: string}
exec: {}
out:
  type: file
  path_prefix: ./mydata/csv/sample_01_out
  file_ext: .csv.gz
  formatter:
    type: csv
    delimiter: "\t"
    newline: CRLF
    charset: UTF-8
  encoders:
  - type: gzip
    level: 1

以下のコマンドを実行すると、outで指定した「sample_01_out」というプレフィックスで.csv.gzが出力される筈です。中身がUTF-8であることや、タブ区切りになっていることも確認してください。

$ java -jar embulk.jar run config_dev.yml

3.bundleの実行

次にプラグインの実装を行うbundle用フォルダを作成します。今回は作成するプラグインはgemとして公開はしないため、bundle用のフォルダでソースを編集して実行したいと思います。何故bundle用のフォルダかというと、必要なgemのインストールを行うためです。以下のコマンドを実行してください。

$ java -jar embulk.jar bundle ./embulk_bundle

「embulk_bundle」というフォルダが作成され、その中の「embulk」フォルダ内に「filter」「input」「output」フォルダがあるかと思います。それぞれに「example.rb」という処理例があるかと思いますが、今回はfilterプラグインであるため「filter」フォルダ内のものを参考にします。

4.プログラム

先に作ったembulk_bundle/embulk/filterフォルダ内に、今回の処理を実行するファイルを作成します。プラグイン名は「add_columns」としたいので、「add_columns.rb」を作成して以下のように編集します。

embulk_bundle/embulk/filter/add_columns.rb
require 'pry'
require 'pry-nav'

module Embulk
  module Filter

    class AddColumnsFilterPlugin < FilterPlugin
      # configで指定するfilter名
      Plugin.register_filter('add_columns', self)

      def self.transaction(config, in_schema, &control)
        # configの定義値を読み込む
        task = {
          'key' => config.param('key', :string, default: "filter_key"),
          'value' => config.param('value', :string, default: "filter_value")
        }

        # 出力するカラムを設定
        idx = in_schema.size
        out_columns = in_schema + 
                      [Column.new(idx, task['key'], :string)] + 
                      [Column.new(idx + 1, "add-timestamp", :string)]

        puts "Example filter started."
        yield(task, out_columns)
        puts "Example filter finished."
      end

      def initialize(task, in_schema, out_schema, page_builder)
        super
        @value = task['value']
      end

      def close
      end

      def add(page)
        # 出力する値を設定
        page.each do |record|
          @page_builder.add(record + [@value] + [Time.now.strftime("%Y/%m/%d %H:%M")])
        end
      end

      def finish
        @page_builder.finish
      end
    end

  end
end

bundleコマンド実行時に出力された「example.rb」に、処理とコメントを追加しました。コメントとソースを見れば、何をやっているかは分かるかと思います。一番上で「pry」「pry-nav」をrequireしているのは、デバッグ実行を行うためです。

またデバッグを行うgemを読み込むため、Gemfileに以下を追加します。

Gemfile
gem 'pry'
gem 'pry-nav'

5.configの編集

上記で作成した「add_columns」プラグインをデバッグするため、configファイルを以下のように編集します。

config_dev.yml
in:
  (中略)
exec: {}
filters:
  - type: add_columns
    key: config-column
    value: golgo13
out: {type: stdout}

「filters」の「type」に作成した「add_columns」プラグインを指定します。またプラグイン内で追加するカラム名と値を、それぞれ「key」「value」で指定しています。出力方法を指定する「out」では標準出力をするよう設定しています。

6.bundleの再実行

Gemfileに記述したモジュールをインストールするため、bundleを再び実行します。

$ java -jar embulk.jar bundle ./embulk_bundle

7.preveiwでの実行とデバッグ

作成したソースが正しく動くかどうかを検証する場合、Embulkでは「preview」を指定して実行することができます。コマンドは以下の通りです。

$ java -jar embulk.jar preview config_dev.yml -b ./embulk_bundle

embulk.jarの引数として「preveiw」を指定しています。またconfigファイル名、bundleフォルダ名も合わせて指定しています。

デバッグ実行するためには、ブレークポイントとしたい箇所に「binding.pry」を記述して実行すると、以下のように処理を止めることができます。

    11:       def self.transaction(config, in_schema, &control)
    12:         # configの定義値を読み込む
    13:         task = {
    14:           'key' => config.param('key', :string, default: "filter_key"),
    15:           'value' => config.param('value', :string, default: "filter_value")
    16:         }
 => 17: binding.pry
    18:         # 出力するカラムを設定
    19:         idx = in_schema.size
    20:         out_columns = in_schema + 
    21:                       [Column.new(idx, task['key'], :string)] + 
    22:                       [Column.new(idx + 1, "add-timestamp", :string)]
    23: 
    24:         puts "Example filter started."
    25:         yield(task, out_columns)
    26:         puts "Example filter finished."
    27:       end

処理が正常に終了すると、結果が以下のようにターミナルに表示されます。

Example filter started.
+---------+--------------+---------------------+-----------------+----------------------------+----------------------+----------------------+
| id:long | account:long |         time:string | purchase:string |             comment:string | config-column:string | add-timestamp:string |
+---------+--------------+---------------------+-----------------+----------------------------+----------------------+----------------------+
|       1 |       32,864 | 2015-01-27 19:23:49 |        20150127 |                     embulk |              golgo13 |     2015/05/03 19:19 |
|       2 |       14,824 | 2015-01-27 19:01:23 |        20150127 |               embulk jruby |              golgo13 |     2015/05/03 19:19 |
|       3 |       27,559 | 2015-01-28 02:20:02 |        20150128 | Embulk "csv" parser plugin |              golgo13 |     2015/05/03 19:19 |
|       4 |       11,270 | 2015-01-29 11:54:36 |        20150129 |                       NULL |              golgo13 |     2015/05/03 19:19 |
|       5 |       56,513 | 2015-01-29 11:54:36 |        20150129 |         This is a comment. |              golgo13 |     2015/05/03 19:19 |
+---------+--------------+---------------------+-----------------+----------------------------+----------------------+----------------------+

8.本番実行

最後に本番の実行です。configファイルの「out」を最初のようにファイルに出力するように戻し、Embulkに「run」を指定して実行します。

$ java -jar embulk.jar run config_dev.yml -b ./embulk_bundle

出力された.csv.gzファイルの中身にカラムが追加されていたら、処理完了です。

まとめ

以上のように「example」コマンドで出力されるサンプルを加工して、処理を実装することができました。はじめに書いたように今回はgemとしては公開はしませんが、ローカルでデータを加工するには十分かと思います。Embulkを使う際に何かの役に立てば幸いです。

参考サイト

以下のサイトを参考にさせて頂きました。ありがとうございました。
公式ドキュメント
Scheduled bulk data loading to Elasticsearch + Kibana 4 from CSV files
embulk-filter-eval というフィルタープラグイン書いた
Embulk-plugin-inputの作り方
複数行からなるログを解析するために、EmbulkのparserプラグインをRubyで開発する話(準備編)