[Embulk] タイムスタンプのカラムを追加する add_timeプラグイン編

2016.05.30

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、最近Embulk(エンバルク)を調べている川崎です。

前回「タイムスタンプのカラムを追加する」という記事を書いたのですが、早速、Embulk作者の古橋さんからembulk-filter-add_timeが利用できることを教えていただきました。

今回はembulk-filter-add_timeの動作を検証したいと思います。

フィルター:embulk-filter-add_time

「embulk-filter-add_time」の機能の概要は、次の通りになります。

  • 新規の時間ベースのカラム
  • 既存のカラムから値をコピー、または値を指定

設定項目は以下の通りです。

  • to_column設定(必須)
  • from_value設定
  • from_column設定

プラグインをインストールする

まずは下記の手順で、embulk-filter-columnプラグインをインストールします。

$ embulk gem install embulk-filter-add_time
2016-05-30 08:21:36.940 +0000: Embulk v0.8.9
Successfully installed embulk-filter-add_time-0.2.0
1 gem installed

インストールされたことを確認します。

$ embulk gem list embulk-filter-add_time
2016-05-30 08:21:58.350 +0000: Embulk v0.8.9

*** LOCAL GEMS ***

embulk-filter-add_time (0.2.0)

試してみる

前回同様、データファイルの中身を推測するguess機能を使います。guess機能に対する設定を、以下のようにguess.ymlファイルに記述しておきます。(テキストエディタで作成します)データファイルは今回も「駅データ」を使います。

in:
  type: file
  path_prefix: "./station20160401free.csv"
filters:
  - type: add_time
    to_column:
     name: created_at
     type: timestamp
    from_value:
     mode: upload_time
out:
  type: redshift
  host: xxxx.xxxx.ap-northeast-1.redshift.amazonaws.com
  user: demomaster
  password: xxxx
  database: demo
  table: station
  access_key_id: XXXXXXXX 
  secret_access_key:XXXXXXXXXXXXXXXX
  iam_user_name: xxxxx
  s3_bucket: my-redshift-transfer-bucket
  s3_key_prefix: temp/redshift
  mode: insert

このguess.ymlのうち、filtersの設定は次のようになっています。

filters:
  - type: add_time
    to_column:
     name: created_at
     type: timestamp
    from_value:
     mode: upload_time

guessコマンドを実行すると、推定した結果がconfig.ymlファイルに保存されます。

$ embulk guess ./guess.yml -o config.yml
2016-05-30 08:56:35.657 +0000: Embulk v0.8.9
2016-05-30 08:56:37.101 +0000 [INFO] (0001:guess): Listing local files at directory '.' filtering filename by prefix 'station20160401free.csv'
2016-05-30 08:56:37.106 +0000 [INFO] (0001:guess): Loading files [station20160401free.csv]
2016-05-30 08:56:37.269 +0000 [INFO] (0001:guess): Loaded plugin embulk/guess/gzip from a load path
2016-05-30 08:56:37.283 +0000 [INFO] (0001:guess): Loaded plugin embulk/guess/bzip2 from a load path
2016-05-30 08:56:37.305 +0000 [INFO] (0001:guess): Loaded plugin embulk/guess/json from a load path
2016-05-30 08:56:37.313 +0000 [INFO] (0001:guess): Loaded plugin embulk/guess/csv from a load path
in:
  type: file
  path_prefix: ./station20160401free.csv
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    escape: '"'
    trim_if_not_quoted: false
    skip_header_lines: 1
    allow_extra_columns: false
    allow_optional_columns: false
    columns:
    - {name: station_cd, type: long}
    - {name: station_g_cd, type: long}
    - {name: station_name, type: string}
    - {name: station_name_k, type: string}
    - {name: station_name_r, type: string}
    - {name: line_cd, type: long}
    - {name: pref_cd, type: long}
    - {name: post, type: string}
    - {name: add, type: string}
    - {name: lon, type: string}
    - {name: lat, type: string}
    - {name: open_ymd, type: timestamp, format: '%Y-%m-%d'}
    - {name: close_ymd, type: timestamp, format: '%Y-%m-%d'}
    - {name: e_status, type: long}
    - {name: e_sort, type: long}
filters:
- type: add_time
  to_column: {name: created_at, type: timestamp}
  from_value: {mode: upload_time}
out: {type: redshift, host: xxxx.xxxx.ap-northeast-1.redshift.amazonaws.com,
  user: demomaster, password: xxxx, database: demo, table: station, access_key_id: XXXXXXXX,
  secret_access_key: XXXXXXXXXXXXXXXX, iam_user_name: xxxxx,
  s3_bucket: my-redshift-transfer-bucket, s3_key_prefix: temp/redshift, mode: insert}
Created 'config.yml' file.

config.ymlの内容は、次のようになっています。

$ cat config.yml
in:
  type: file
  path_prefix: ./station20160401free.csv
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    escape: '"'
    trim_if_not_quoted: false
    skip_header_lines: 1
    allow_extra_columns: false
    allow_optional_columns: false
    columns:
    - {name: station_cd, type: long}
    - {name: station_g_cd, type: long}
    - {name: station_name, type: string}
    - {name: station_name_k, type: string}
    - {name: station_name_r, type: string}
    - {name: line_cd, type: long}
    - {name: pref_cd, type: long}
    - {name: post, type: string}
    - {name: add, type: string}
    - {name: lon, type: string}
    - {name: lat, type: string}
    - {name: open_ymd, type: timestamp, format: '%Y-%m-%d'}
    - {name: close_ymd, type: timestamp, format: '%Y-%m-%d'}
    - {name: e_status, type: long}
    - {name: e_sort, type: long}
filters:
- type: add_time
  to_column: {name: created_at, type: timestamp}
  from_value: {mode: upload_time}
out: {type: redshift, host: xxxx.xxxx.ap-northeast-1.redshift.amazonaws.com,
  user: demomaster, password: xxxx, database: demo, table: station, access_key_id: XXXXXXXX,
  secret_access_key: XXXXXXXXXXXXXXXX, iam_user_name: xxxxx,
  s3_bucket: my-redshift-transfer-bucket, s3_key_prefix: temp/redshift, mode: insert}

データのロードを実行します。

$ embulk run config.yml
2016-05-30 09:02:27.717 +0000: Embulk v0.8.9
2016-05-30 09:02:30.933 +0000 [INFO] (0001:transaction): Loaded plugin embulk-output-redshift (0.6.0)
2016-05-30 09:02:31.016 +0000 [INFO] (0001:transaction): Loaded plugin embulk-filter-add_time (0.2.0)
2016-05-30 09:02:31.085 +0000 [INFO] (0001:transaction): Listing local files at directory '.' filtering filename by prefix 'station20160401free.csv'
2016-05-30 09:02:31.094 +0000 [INFO] (0001:transaction): Loading files [station20160401free.csv]
2016-05-30 09:02:31.295 +0000 [INFO] (0001:transaction): Using local thread executor with max_threads=2 / tasks=1
2016-05-30 09:02:31.340 +0000 [INFO] (0001:transaction): Connecting to jdbc:postgresql://xxxx.xxxx.ap-northeast-1.redshift.amazonaws.com:5439/demo options {user=demomaster, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2016-05-30 09:02:31.473 +0000 [INFO] (0001:transaction): SQL: SET search_path TO "public"
2016-05-30 09:02:31.484 +0000 [INFO] (0001:transaction): > 0.01 seconds
2016-05-30 09:02:31.485 +0000 [INFO] (0001:transaction): Using insert mode
2016-05-30 09:02:31.559 +0000 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "station_01a62d5bf040_bl_tmp000"
2016-05-30 09:02:31.563 +0000 [INFO] (0001:transaction): > 0.00 seconds
2016-05-30 09:02:31.690 +0000 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "station_01a62d5bf040_bl_tmp000" ("station_cd" INT8, "station_g_cd" INT8, "station_name" VARCHAR(65535), "station_name_k" VARCHAR(65535), "station_name_r" VARCHAR(65535), "line_cd" INT8, "pref_cd" INT8, "post" VARCHAR(65535), "add" VARCHAR(65535), "lon" VARCHAR(65535), "lat" VARCHAR(65535), "open_ymd" TIMESTAMP, "close_ymd" TIMESTAMP, "e_status" INT8, "e_sort" INT8, "created_at2" TIMESTAMP)
2016-05-30 09:02:31.699 +0000 [INFO] (0001:transaction): > 0.01 seconds
2016-05-30 09:02:31.869 +0000 [INFO] (0001:transaction): {done:  0 / 1, running: 0}
2016-05-30 09:02:31.923 +0000 [INFO] (0016:task-0000): Connecting to jdbc:postgresql://xxxx.xxxx.ap-northeast-1.redshift.amazonaws.com:5439/demo options {user=demomaster, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2016-05-30 09:02:32.494 +0000 [INFO] (0016:task-0000): SQL: SET search_path TO "public"
2016-05-30 09:02:32.496 +0000 [INFO] (0016:task-0000): > 0.00 seconds
2016-05-30 09:02:32.496 +0000 [INFO] (0016:task-0000): Copy SQL: COPY "station_01a62d5bf040_bl_tmp000" ("station_cd", "station_g_cd", "station_name", "station_name_k", "station_name_r", "line_cd", "pref_cd", "post", "add", "lon", "lat", "open_ymd", "close_ymd", "e_status", "e_sort") ? GZIP DELIMITER '\t' NULL '\\N' ESCAPE TRUNCATECOLUMNS ACCEPTINVCHARS STATUPDATE OFF COMPUPDATE OFF
2016-05-30 09:02:34.868 +0000 [INFO] (pool-2-thread-1): Uploading file id temp/redshift/eee88877-7797-4f6d-8084-ab45503a8101 to S3 (371,203 bytes 10,834 rows)
2016-05-30 09:02:35.501 +0000 [INFO] (pool-2-thread-1): Uploaded file temp/redshift/eee88877-7797-4f6d-8084-ab45503a8101 (0.63 seconds)
2016-05-30 09:02:35.514 +0000 [INFO] (pool-2-thread-2): SQL: SET search_path TO "public"
2016-05-30 09:02:35.516 +0000 [INFO] (pool-2-thread-2): > 0.00 seconds
2016-05-30 09:02:35.516 +0000 [INFO] (pool-2-thread-2): Running COPY from file temp/redshift/eee88877-7797-4f6d-8084-ab45503a8101
2016-05-30 09:02:37.804 +0000 [INFO] (pool-2-thread-2): Loaded file temp/redshift/eee88877-7797-4f6d-8084-ab45503a8101 (1.42 seconds for COPY)
2016-05-30 09:02:37.839 +0000 [INFO] (0016:task-0000): Loaded 1 files.
2016-05-30 09:02:37.842 +0000 [INFO] (0001:transaction): {done:  1 / 1, running: 0}
2016-05-30 09:02:37.843 +0000 [INFO] (0001:transaction): Connecting to jdbc:postgresql://xxxx.xxxx.ap-northeast-1.redshift.amazonaws.com:5439/demo options {user=demomaster, tcpKeepAlive=true, loginTimeout=300, socketTimeout=28800}
2016-05-30 09:02:37.852 +0000 [INFO] (0001:transaction): SQL: SET search_path TO "public"
2016-05-30 09:02:37.854 +0000 [INFO] (0001:transaction): > 0.00 seconds
2016-05-30 09:02:37.855 +0000 [INFO] (0001:transaction): SQL: INSERT INTO "station" ("station_cd", "station_g_cd", "station_name", "station_name_k", "station_name_r", "line_cd", "pref_cd", "post", "add", "lon", "lat", "open_ymd", "close_ymd", "e_status", "e_sort") SELECT "station_cd", "station_g_cd", "station_name", "station_name_k", "station_name_r", "line_cd", "pref_cd", "post", "add", "lon", "lat", "open_ymd", "close_ymd", "e_status", "e_sort" FROM "station_01a62d5bf040_bl_tmp000"
2016-05-30 09:02:38.900 +0000 [INFO] (0001:transaction): > 1.05 seconds (10,834 rows)
2016-05-30 09:02:39.131 +0000 [INFO] (0001:transaction): Connecting to jdbc:postgresql://xxxx.xxxx.ap-northeast-1.redshift.amazonaws.com:5439/demo options {user=demomaster, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2016-05-30 09:02:39.144 +0000 [INFO] (0001:transaction): SQL: SET search_path TO "public"
2016-05-30 09:02:39.147 +0000 [INFO] (0001:transaction): > 0.00 seconds
2016-05-30 09:02:39.147 +0000 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "station_01a62d5bf040_bl_tmp000"
2016-05-30 09:02:39.231 +0000 [INFO] (0001:transaction): > 0.08 seconds
2016-05-30 09:02:39.233 +0000 [INFO] (main): Committed.
2016-05-30 09:02:39.234 +0000 [INFO] (main): Next config diff: {"in":{"last_path":"station20160401free.csv"},"out":{}}

無事にデータがデータベースにロードされました。各列のデータ型を確認します。

CREATE TABLE station
(
   station_cd      bigint,
   station_g_cd    bigint,
   station_name    varchar(65535),
   station_name_k  varchar(65535),
   station_name_r  varchar(65535),
   line_cd         bigint,
   pref_cd         bigint,
   post            varchar(65535),
   add             varchar(65535),
   lon             varchar(65535),
   lat             varchar(65535),
   open_ymd        timestamp,
   close_ymd       timestamp,
   e_status        bigint,
   e_sort          bigint,
   created_at     timestamp
);

created_at列の追加に成功しました。カラムの中身を確認します。created_at列には、全レコードに同じタイムスタンプの値が入っています。(前回はレコード毎のタイムスタンプ値でした)

embulk-filter-add_time

まとめ

Embulkでembulk-filter-add_timeプラグインを利用して、タイムスタンプのカラムを追加しました。

動作としては、

  • 全レコードに同じタイムスタンプ値(embulk-filter-add_timeプラグインでupload_timeを指定)
  • レコード毎のタイムスタンプ値を保持(embulk-filter-columnプラグインとembulk-filter-evalプラグインの組み合わせ)

と、今回と前回で結果が異なっておりますので、要件に応じて、それぞれのプラグインをお使いいただければと思います。