[Embulk]データを加工するプラグインの開発とデバッグ
はじめに
様々なファイルを取り込むのに便利なツールであるEmbulkですが、業務要件に合わせてデータを加工したいことはあるかと思います。今回はデータを加工するスクリプトをJRubyにて開発しデバッグしてみたので、その手順についてまとめてみたいと思います。
尚、今回はプラグインをgemとして公開する方法については取り上げておりません。予めご了承ください。
今回作成するプラグインについて
今回行う処理ですが
- 〜.csv.gzのファイルを読み込み、解凍する。中身はShift-JISのCSVファイルである。
- 解凍したファイルを読み込み、定義ファイルに記述されたカラムと、タイムスタンプを追記する。
- 上記で編集したデータを再び〜.csv.gzに圧縮して出力する。この時にUTF-8に変換し、タブ区切りにする。
という流れとなります。
このうち1.と3.は既に用意されているプラグインにて実現できるので、独自に開発するのは2.の処理となります。
作成手順
では作成手順です。先にも書いた通り、既存のプラグインの読み込みと、独自の処理をプラグインとして開発する手順の混在となります。
尚、Embulkとプラグイン開発に必要なJRubyのインストールは終っているものとします。
1.読み込むファイルの準備
以下のコマンドにてCSVファイルを.csv.gzに圧縮したサンプルが作成されるので、実行してください。
$ java -jar embulk.jar example ./mydata
出力される mydata/csv/sample_01.csv.gzを解凍し、以下のように編集して再び圧縮します。
id,account,time,purchase,comment 1,32864,2015-01-27 19:23:49,20150127,embulk 2,14824,2015-01-27 19:01:23,20150127,embulk jruby 3,27559,2015-01-28 02:20:02,20150128,"Embulk ""csv"" parser plugin" 4,11270,2015-01-29 11:54:36,20150129,NULL 5,56513,2015-01-29 11:54:36,20150129,This is a comment.
2.configの作成
Embulkはyml形式のconfigファイルにて、実行するプラグインや読み込むファイル等を記述します。まずは独自の処理なしで、.csv.gzを解凍して再び圧縮するconfigファイルを作成し、実行してみましょう。今回はconfigファイルは「config_dev.yml」という名前で作成しました。
config_dev.yml
in: type: file path_prefix: ./mydata/csv/sample_01.csv.gz decoders: - {type: gzip} parser: charset: Shift-JIS newline: CRLF type: csv delimiter: ',' quote: '"' escape: '' skip_header_lines: 1 columns: - {name: id, type: long} - {name: account, type: long} - {name: time, type: string} - {name: purchase, type: string} - {name: comment, type: string} exec: {} out: type: file path_prefix: ./mydata/csv/sample_01_out file_ext: .csv.gz formatter: type: csv delimiter: "\t" newline: CRLF charset: UTF-8 encoders: - type: gzip level: 1
以下のコマンドを実行すると、outで指定した「sample_01_out」というプレフィックスで.csv.gzが出力される筈です。中身がUTF-8であることや、タブ区切りになっていることも確認してください。
$ java -jar embulk.jar run config_dev.yml
3.bundleの実行
次にプラグインの実装を行うbundle用フォルダを作成します。今回は作成するプラグインはgemとして公開はしないため、bundle用のフォルダでソースを編集して実行したいと思います。何故bundle用のフォルダかというと、必要なgemのインストールを行うためです。以下のコマンドを実行してください。
$ java -jar embulk.jar bundle ./embulk_bundle
「embulk_bundle」というフォルダが作成され、その中の「embulk」フォルダ内に「filter」「input」「output」フォルダがあるかと思います。それぞれに「example.rb」という処理例があるかと思いますが、今回はfilterプラグインであるため「filter」フォルダ内のものを参考にします。
4.プログラム
先に作ったembulk_bundle/embulk/filterフォルダ内に、今回の処理を実行するファイルを作成します。プラグイン名は「add_columns」としたいので、「add_columns.rb」を作成して以下のように編集します。
embulk_bundle/embulk/filter/add_columns.rb
require 'pry' require 'pry-nav' module Embulk module Filter class AddColumnsFilterPlugin < FilterPlugin # configで指定するfilter名 Plugin.register_filter('add_columns', self) def self.transaction(config, in_schema, &control) # configの定義値を読み込む task = { 'key' => config.param('key', :string, default: "filter_key"), 'value' => config.param('value', :string, default: "filter_value") } # 出力するカラムを設定 idx = in_schema.size out_columns = in_schema + [Column.new(idx, task['key'], :string)] + [Column.new(idx + 1, "add-timestamp", :string)] puts "Example filter started." yield(task, out_columns) puts "Example filter finished." end def initialize(task, in_schema, out_schema, page_builder) super @value = task['value'] end def close end def add(page) # 出力する値を設定 page.each do |record| @page_builder.add(record + [@value] + [Time.now.strftime("%Y/%m/%d %H:%M")]) end end def finish @page_builder.finish end end end end
bundleコマンド実行時に出力された「example.rb」に、処理とコメントを追加しました。コメントとソースを見れば、何をやっているかは分かるかと思います。一番上で「pry」「pry-nav」をrequireしているのは、デバッグ実行を行うためです。
またデバッグを行うgemを読み込むため、Gemfileに以下を追加します。
Gemfile
gem 'pry' gem 'pry-nav'
5.configの編集
上記で作成した「add_columns」プラグインをデバッグするため、configファイルを以下のように編集します。
config_dev.yml
in: (中略) exec: {} filters: - type: add_columns key: config-column value: golgo13 out: {type: stdout}
「filters」の「type」に作成した「add_columns」プラグインを指定します。またプラグイン内で追加するカラム名と値を、それぞれ「key」「value」で指定しています。出力方法を指定する「out」では標準出力をするよう設定しています。
6.bundleの再実行
Gemfileに記述したモジュールをインストールするため、bundleを再び実行します。
$ java -jar embulk.jar bundle ./embulk_bundle
7.preveiwでの実行とデバッグ
作成したソースが正しく動くかどうかを検証する場合、Embulkでは「preview」を指定して実行することができます。コマンドは以下の通りです。
$ java -jar embulk.jar preview config_dev.yml -b ./embulk_bundle
embulk.jarの引数として「preveiw」を指定しています。またconfigファイル名、bundleフォルダ名も合わせて指定しています。
デバッグ実行するためには、ブレークポイントとしたい箇所に「binding.pry」を記述して実行すると、以下のように処理を止めることができます。
11: def self.transaction(config, in_schema, &control) 12: # configの定義値を読み込む 13: task = { 14: 'key' => config.param('key', :string, default: "filter_key"), 15: 'value' => config.param('value', :string, default: "filter_value") 16: } => 17: binding.pry 18: # 出力するカラムを設定 19: idx = in_schema.size 20: out_columns = in_schema + 21: [Column.new(idx, task['key'], :string)] + 22: [Column.new(idx + 1, "add-timestamp", :string)] 23: 24: puts "Example filter started." 25: yield(task, out_columns) 26: puts "Example filter finished." 27: end
処理が正常に終了すると、結果が以下のようにターミナルに表示されます。
Example filter started. +---------+--------------+---------------------+-----------------+----------------------------+----------------------+----------------------+ | id:long | account:long | time:string | purchase:string | comment:string | config-column:string | add-timestamp:string | +---------+--------------+---------------------+-----------------+----------------------------+----------------------+----------------------+ | 1 | 32,864 | 2015-01-27 19:23:49 | 20150127 | embulk | golgo13 | 2015/05/03 19:19 | | 2 | 14,824 | 2015-01-27 19:01:23 | 20150127 | embulk jruby | golgo13 | 2015/05/03 19:19 | | 3 | 27,559 | 2015-01-28 02:20:02 | 20150128 | Embulk "csv" parser plugin | golgo13 | 2015/05/03 19:19 | | 4 | 11,270 | 2015-01-29 11:54:36 | 20150129 | NULL | golgo13 | 2015/05/03 19:19 | | 5 | 56,513 | 2015-01-29 11:54:36 | 20150129 | This is a comment. | golgo13 | 2015/05/03 19:19 | +---------+--------------+---------------------+-----------------+----------------------------+----------------------+----------------------+
8.本番実行
最後に本番の実行です。configファイルの「out」を最初のようにファイルに出力するように戻し、Embulkに「run」を指定して実行します。
$ java -jar embulk.jar run config_dev.yml -b ./embulk_bundle
出力された.csv.gzファイルの中身にカラムが追加されていたら、処理完了です。
まとめ
以上のように「example」コマンドで出力されるサンプルを加工して、処理を実装することができました。はじめに書いたように今回はgemとしては公開はしませんが、ローカルでデータを加工するには十分かと思います。Embulkを使う際に何かの役に立てば幸いです。
参考サイト
以下のサイトを参考にさせて頂きました。ありがとうございました。
公式ドキュメント
Scheduled bulk data loading to Elasticsearch + Kibana 4 from CSV files
embulk-filter-eval というフィルタープラグイン書いた
Embulk-plugin-inputの作り方
複数行からなるログを解析するために、EmbulkのparserプラグインをRubyで開発する話(準備編)