[Embulk] タイムスタンプのカラムを追加する

2016.05.29

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、最近Embulk(エンバルク)を調べている川崎です。

今回は、Embulkでプラグインとして提供されているフィルター機能を組み合わせて、元のデータファイルに無い、タイムスタンプのカラム(テーブルにロードされた日時)を追加する方法を試してみたいと思います。

ざっとfilterプラグインのリストを調べてみましたが、単独のfilterプラグインでは上記の機能は実現できないようなので、プラグインを組み合わせて実現する方針とします。 *1

  • embulk-filter-column
  • embulk-filter-eval

この2つのフィルターを組み合わせてみます。フィルターについては、Embulkでは複数のフィルターを設定ファイルに記述しておくと、上から順番に実行されるとのことです。

フィルター1:embulk-filter-column

この「embulk-filter-column」は、いくつかの機能があります。

  • 不要なカラムを削除する(カラムを落とす)
    • column 残すカラムを指定
    • drop_columns 落とすカラムを指定
  • デフォルト値(値がNULLだった時のデフォルト値を指定)(default)
  • カラムのコピー(src)
  • カラムを追加(add_columns)

今回は、カラムを追加する機能としてadd_columnsを使い、timestamp型のcreated_at列を追加します。このフィルター自身は、値をレコード毎に変更するような機能は無く、defaultで指定する固定値が入りますので、ひとまずダミーの値(20160101)を入れておきます。

embulk-filter-column
 https://github.com/sonots/embulk-filter-column
embulk-filter-column および embulk-filter-row プラグインのご紹介
 http://qiita.com/sonots/items/1acb9c53f0566bf78a9e

フィルター2:embulk-filter-eval

  • eval_columns Rubyのコードで値の変換をする
  • out_columns 残すカラムを指定する

このフィルターでは、新規の列を追加する機能は無いようなので「embulk-filter-column」と組み合わせて利用します。eval_columnsの機能で値に「Time.now」を指定し、レコード毎に現在日時を入れるようにします。

embulk-filter-eval
 https://github.com/mgi166/embulk-filter-eval
embulk-filter-eval というフィルタープラグイン書いた
http://mgi.hatenablog.com/entry/2015/03/19/084430

プラグインをインストールする

まずは下記の手順で、embulk-filter-columnプラグインをインストールします。

$ embulk gem install embulk-filter-column
2016-05-29 13:23:47.431 +0000: Embulk v0.8.9
Fetching: embulk-filter-column-0.4.0.gem (100%)
Successfully installed embulk-filter-column-0.4.0
1 gem installe

インストールされたことを確認します。

$ embulk gem list embulk-filter-column
2016-05-29 13:24:30.613 +0000: Embulk v0.8.9

*** LOCAL GEMS ***

embulk-filter-column (0.4.0)

続いて、embulk-filter-evalプラグインをインストールします。

$ embulk gem install embulk-filter-eval
2016-05-29 13:24:41.558 +0000: Embulk v0.8.9
Fetching: embulk-filter-eval-0.1.0.gem (100%)
Successfully installed embulk-filter-eval-0.1.0
1 gem installed

インストールされたことを確認します。

$ embulk gem list embulk-filter-eval
2016-05-29 13:25:14.221 +0000: Embulk v0.8.9

*** LOCAL GEMS ***

embulk-filter-eval (0.1.0)

試してみる

今回も、データファイルの中身を推測する、guess機能を使います。guess機能に対する設定を、以下のようにguess.ymlファイルに記述しておきます。(テキストエディタで作成します)データファイルは、前回も利用した「駅データ」を使います。

in:
  type: file
  path_prefix: "./station20160401free.csv"
filters:
  - type: column
    add_columns:
      - {name: created_at, type: timestamp, default: '20160101', format: "%Y%m%d" }
  - type: eval
    eval_columns:
     - created_at: Time.now
out:
  type: redshift
  host: xxxx.xxxx.ap-northeast-1.redshift.amazonaws.com
  user: demomaster
  password: xxxx
  database: demo
  table: station
  access_key_id: XXXXXXXX 
  secret_access_key:XXXXXXXXXXXXXXXX
  iam_user_name: xxxxx
  s3_bucket: my-redshift-transfer-bucket
  s3_key_prefix: temp/redshift
  mode: insert

このguess.ymlのうち、filtersの設定は次のようになっています。defaultで指定している日付はダミーの値で、evalで現在日時に上書きされます。

filters:
  - type: column
    add_columns:
      - {name: created_at, type: timestamp, default: '20160101', format: "%Y%m%d" }
  - type: eval
    eval_columns:
     - created_at: Time.now

guessコマンドを実行すると、推定した結果がconfig.ymlファイルに保存されます。

$ embulk guess ./guess.yml -o config.yml
2016-05-29 10:12:05.547 +0000: Embulk v0.8.9
2016-05-29 10:12:06.928 +0000 [INFO] (0001:guess): Listing local files at directory '.' filtering filename by prefix 'station20160401free.csv'
2016-05-29 10:12:06.933 +0000 [INFO] (0001:guess): Loading files [station20160401free.csv]
2016-05-29 10:12:07.092 +0000 [INFO] (0001:guess): Loaded plugin embulk/guess/gzip from a load path
2016-05-29 10:12:07.106 +0000 [INFO] (0001:guess): Loaded plugin embulk/guess/bzip2 from a load path
2016-05-29 10:12:07.128 +0000 [INFO] (0001:guess): Loaded plugin embulk/guess/json from a load path
2016-05-29 10:12:07.135 +0000 [INFO] (0001:guess): Loaded plugin embulk/guess/csv from a load path
in:
  type: file
  path_prefix: ./station20160401free.csv
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    escape: '"'
    trim_if_not_quoted: false
    skip_header_lines: 1
    allow_extra_columns: false
    allow_optional_columns: false
    columns:
    - {name: station_cd, type: long}
    - {name: station_g_cd, type: long}
    - {name: station_name, type: string}
    - {name: station_name_k, type: string}
    - {name: station_name_r, type: string}
    - {name: line_cd, type: long}
    - {name: pref_cd, type: long}
    - {name: post, type: string}
    - {name: add, type: string}
    - {name: lon, type: string}
    - {name: lat, type: string}
    - {name: open_ymd, type: timestamp, format: '%Y-%m-%d'}
    - {name: close_ymd, type: timestamp, format: '%Y-%m-%d'}
    - {name: e_status, type: long}
    - {name: e_sort, type: long}
filters:
- type: column
  add_columns:
  - {name: created_at, type: timestamp, default: '20160101', format: '%Y%m%d'}
- type: eval
  eval_columns:
  - {created_at: Time.now}
out: {type: redshift, host: xxxx.xxxx.ap-northeast-1.redshift.amazonaws.com,
  user: demomaster, password: xxxx, database: demo, table: station,
  access_key_id: XXXXXXXX, secret_access_key: XXXXXXXXXXXXXXXX,
  iam_user_name: xxxxx, s3_bucket: my-redshift-transfer-bucket, s3_key_prefix: temp/redshift,
  mode: insert}
Created 'config.yml¨' file.

config.ymlの内容は、次のようになっています。

$ cat config.yml
in:
  type: file
  path_prefix: ./station20160401free.csv
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    escape: '"'
    trim_if_not_quoted: false
    skip_header_lines: 1
    allow_extra_columns: false
    allow_optional_columns: false
    columns:
    - {name: station_cd, type: long}
    - {name: station_g_cd, type: long}
    - {name: station_name, type: string}
    - {name: station_name_k, type: string}
    - {name: station_name_r, type: string}
    - {name: line_cd, type: long}
    - {name: pref_cd, type: long}
    - {name: post, type: string}
    - {name: add, type: string}
    - {name: lon, type: string}
    - {name: lat, type: string}
    - {name: open_ymd, type: timestamp, format: '%Y-%m-%d'}
    - {name: close_ymd, type: timestamp, format: '%Y-%m-%d'}
    - {name: e_status, type: long}
    - {name: e_sort, type: long}
filters:
- type: column
  add_columns:
  - {name: created_at, type: timestamp, default: '20160101', format: '%Y%m%d'}
- type: eval
  eval_columns:
  - {created_at: Time.now}
out: {type: redshift, host: xxxx.xxxx.ap-northeast-1.redshift.amazonaws.com,
  user: demomaster, password: xxxx, database: demo, table: station,
  access_key_id: XXXXXXXX, secret_access_key: XXXXXXXXXXXXXXXX,
  iam_user_name: xxxxx, s3_bucket: my-redshift-transfer-bucket, s3_key_prefix: temp/redshift,
  mode: insert}

データのロードを実行します。

$ embulk run config.yml
2016-05-29 12:57:24.355 +0000: Embulk v0.8.9
2016-05-29 12:57:27.635 +0000 [INFO] (0001:transaction): Loaded plugin embulk-output-redshift (0.6.0)
2016-05-29 12:57:27.712 +0000 [INFO] (0001:transaction): Loaded plugin embulk-filter-column (0.4.0)
2016-05-29 12:57:27.760 +0000 [INFO] (0001:transaction): Loaded plugin embulk-filter-eval (0.1.0)
2016-05-29 12:57:27.808 +0000 [INFO] (0001:transaction): Listing local files at directory '.' filtering filename by prefix 'station20160401free.csv'
2016-05-29 12:57:27.813 +0000 [INFO] (0001:transaction): Loading files [station20160401free.csv]
2016-05-29 12:57:28.061 +0000 [INFO] (0001:transaction): Using local thread executor with max_threads=2 / tasks=1
2016-05-29 12:57:28.111 +0000 [INFO] (0001:transaction): Connecting to jdbc:postgresql://xxxx.xxxx.ap-northeast-1.redshift.amazonaws.com:5439/demo options {user=demomaster, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2016-05-29 12:57:28.256 +0000 [INFO] (0001:transaction): SQL: SET search_path TO "public"
2016-05-29 12:57:28.266 +0000 [INFO] (0001:transaction): > 0.00 seconds
2016-05-29 12:57:28.266 +0000 [INFO] (0001:transaction): Using insert mode
2016-05-29 12:57:28.301 +0000 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "station_e7371c40aa80_bl_tmp000"
2016-05-29 12:57:28.304 +0000 [INFO] (0001:transaction): > 0.00 seconds
2016-05-29 12:57:28.447 +0000 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "station_e7371c40aa80_bl_tmp000" ("station_cd" BIGINT, "station_g_cd" BIGINT, "station_name" VARCHAR(65535), "station_name_k" VARCHAR(65535), "station_name_r" VARCHAR(65535), "line_cd" BIGINT, "pref_cd" BIGINT, "post" VARCHAR(65535), "add" VARCHAR(65535), "lon" VARCHAR(65535), "lat" VARCHAR(65535), "open_ymd" TIMESTAMP, "close_ymd" TIMESTAMP, "e_status" BIGINT, "e_sort" BIGINT, "created_at" TIMESTAMP)
2016-05-29 12:57:28.464 +0000 [INFO] (0001:transaction): > 0.01 seconds
2016-05-29 12:57:28.708 +0000 [INFO] (0001:transaction): {done:  0 / 1, running: 0}
2016-05-29 12:57:28.772 +0000 [INFO] (0016:task-0000): Connecting to jdbc:postgresql://xxxx.xxxx.ap-northeast-1.redshift.amazonaws.com:5439/demo options {user=demomaster, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2016-05-29 12:57:29.388 +0000 [INFO] (0016:task-0000): SQL: SET search_path TO "public"
2016-05-29 12:57:29.391 +0000 [INFO] (0016:task-0000): > 0.00 seconds
2016-05-29 12:57:29.391 +0000 [INFO] (0016:task-0000): Copy SQL: COPY "station_e7371c40aa80_bl_tmp000" ("station_cd", "station_g_cd", "station_name", "station_name_k", "station_name_r", "line_cd", "pref_cd", "post", "add", "lon", "lat", "open_ymd", "close_ymd", "e_status", "e_sort", "created_at") ? GZIP DELIMITER '\t' NULL '\\N' ESCAPE TRUNCATECOLUMNS ACCEPTINVCHARS STATUPDATE OFF COMPUPDATE OFF
2016-05-29 12:57:36.910 +0000 [INFO] (pool-2-thread-1): Uploading file id temp/redshift/d08be029-abd7-4663-bafd-4f646294800a to S3 (398,304 bytes 10,834 rows)
2016-05-29 12:57:37.398 +0000 [INFO] (pool-2-thread-1): Uploaded file temp/redshift/d08be029-abd7-4663-bafd-4f646294800a (0.49 seconds)
2016-05-29 12:57:37.409 +0000 [INFO] (pool-2-thread-2): SQL: SET search_path TO "public"
2016-05-29 12:57:37.411 +0000 [INFO] (pool-2-thread-2): > 0.00 seconds
2016-05-29 12:57:37.412 +0000 [INFO] (pool-2-thread-2): Running COPY from file temp/redshift/d08be029-abd7-4663-bafd-4f646294800a
2016-05-29 12:57:38.680 +0000 [INFO] (pool-2-thread-2): Loaded file temp/redshift/d08be029-abd7-4663-bafd-4f646294800a (0.34 seconds for COPY)
2016-05-29 12:57:38.709 +0000 [INFO] (0016:task-0000): Loaded 1 files.
2016-05-29 12:57:38.718 +0000 [INFO] (0001:transaction): {done:  1 / 1, running: 0}
2016-05-29 12:57:38.718 +0000 [INFO] (0001:transaction): Connecting to jdbc:postgresql://xxxx.xxxx.ap-northeast-1.redshift.amazonaws.com:5439/demo options {user=demomaster, tcpKeepAlive=true, loginTimeout=300, socketTimeout=28800}
2016-05-29 12:57:38.727 +0000 [INFO] (0001:transaction): SQL: SET search_path TO "public"
2016-05-29 12:57:38.730 +0000 [INFO] (0001:transaction): > 0.00 seconds
2016-05-29 12:57:38.745 +0000 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "station" ("station_cd" BIGINT, "station_g_cd" BIGINT, "station_name" VARCHAR(65535), "station_name_k" VARCHAR(65535), "station_name_r" VARCHAR(65535), "line_cd" BIGINT, "pref_cd" BIGINT, "post" VARCHAR(65535), "add" VARCHAR(65535), "lon" VARCHAR(65535), "lat" VARCHAR(65535), "open_ymd" TIMESTAMP, "close_ymd" TIMESTAMP, "e_status" BIGINT, "e_sort" BIGINT, "created_at" TIMESTAMP)
2016-05-29 12:57:38.755 +0000 [INFO] (0001:transaction): > 0.01 seconds
2016-05-29 12:57:38.856 +0000 [INFO] (0001:transaction): SQL: INSERT INTO "station" ("station_cd", "station_g_cd", "station_name", "station_name_k", "station_name_r", "line_cd", "pref_cd", "post", "add", "lon", "lat", "open_ymd", "close_ymd", "e_status", "e_sort", "created_at") SELECT "station_cd", "station_g_cd", "station_name", "station_name_k", "station_name_r", "line_cd", "pref_cd", "post", "add", "lon", "lat", "open_ymd", "close_ymd", "e_status", "e_sort", "created_at" FROM "station_e7371c40aa80_bl_tmp000"
2016-05-29 12:57:38.885 +0000 [INFO] (0001:transaction): > 0.03 seconds (10,834 rows)
2016-05-29 12:57:39.090 +0000 [INFO] (0001:transaction): Connecting to jdbc:postgresql://xxxx.xxxx.ap-northeast-1.redshift.amazonaws.com:5439/demo options {user=demomaster, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2016-05-29 12:57:39.100 +0000 [INFO] (0001:transaction): SQL: SET search_path TO "public"
2016-05-29 12:57:39.102 +0000 [INFO] (0001:transaction): > 0.00 seconds
2016-05-29 12:57:39.102 +0000 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "station_e7371c40aa80_bl_tmp000"
2016-05-29 12:57:39.324 +0000 [INFO] (0001:transaction): > 0.22 seconds
2016-05-29 12:57:39.327 +0000 [INFO] (main): Committed.
2016-05-29 12:57:39.327 +0000 [INFO] (main): Next config diff: {"in":{"last_path":"station20160401free.csv"},"out":{}}

無事にデータがデータベースにロードされました。各列のデータ型を確認します。

CREATE TABLE station
(
   station_cd      bigint,
   station_g_cd    bigint,
   station_name    varchar(65535),
   station_name_k  varchar(65535),
   station_name_r  varchar(65535),
   line_cd         bigint,
   pref_cd         bigint,
   post            varchar(65535),
   add             varchar(65535),
   lon             varchar(65535),
   lat             varchar(65535),
   open_ymd        timestamp,
   close_ymd       timestamp,
   e_status        bigint,
   e_sort          bigint,
   created_at      timestamp
);

created_at列の追加に成功しました。カラムの中身を確認します。created_at列には、レコード毎のタイムスタンプの値が入っています。

created_at

まとめ

Embulkで複数のfilterプラグインを利用して、レコード毎のタイムスタンプを保持するカラムを追加してみました。

複数のfilterプラグインを組み合わせる、という手法は、シンプルな機能を組み合わせて、より複雑な問題を解決するというEmbulkの思想に合致している気がしました。

次回

次回も、Embulkの様々なオプションについて実験してみたいと思います。

今回は、パフォーマンスの検証はできておりません。機会があったら、パフォーマンスの検証を実施したいのと、それから、今回の機能を単独で実行できるfilterプラグインを作成する、といったことにも取り組んでみたいと考えています。

脚注

  1. もし単独のプラグインで実現できる方法をご存知の方がいらっしゃいましたら、お知らせください。