[Embulk] guess機能を試してみた【追記】あり

Embulkアイキャッチ

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

【追記】
コメントをいただきました。
Postgresqlのデータを登録する際は、default_timezoneを設定するのが良いとのことです。
そのままでは、投入されたデータの日付が9時間ずれてしまうそうです。
out:
 type: postgresql
 default_timezone: "Japan" # <-- これ

詳しくはこちらを参照ください。

ご指摘ありがとうございました!


こんにちは、最近Embulk(エンバルク)を調べている川崎です。

前回の記事はこちらをご覧ください。 [Embulk] Embulkについての個人的なまとめ

[Embulk] Embulkについての個人的なまとめ

4/22(金)に早速Embulkの特集が掲載された「WEB+DB PRESS Vol.92」を入手しました。記事の内容も参考にしながら、まずはEmbulkのインストールから進めていきます。

なお、本日(4月25日(月))Treasure Dataさんのイベントが開催されますが、Embulkは今回のお題にはなっていないようです。残念。

Treasure Data Tech Talk
http://eventdots.jp/event/584571

インストール

Embulkのインストールはこちらをご覧ください。 https://github.com/embulk/embulk#quick-start

curl --create-dirs -o ~/.embulk/bin/embulk -L "http://dl.embulk.org/embulk-latest.jar"
chmod +x ~/.embulk/bin/embulk
echo 'export PATH="$HOME/.embulk/bin:$PATH"' &amp;gt;&amp;gt; ~/.bashrc
source ~/.bashrc

サンプル実行

Embulkに付属のexampleコマンドを実行してみます。このステップで、Embulkをデモ実行してみることが可能です。

embulk example ./try1
embulk guess ./try1/seed.yml -o config.yml
embulk preview config.yml
embulk run config.yml

既存のデータセットをguess機能を使って読み込む

手始めに統計の世界で有名なIRIS(あやめ)のデータセットを、guess機能を使ってデータベースに自動的に取り込ませてみます。データの中身はこのようになっています。

$ cat IRIS.csv
Sepal_Length,Sepal_Width,Petal_Length,Petal_Width,Species
5.1,3.5,1.4,0.2,setosa
4.9,3.0,1.4,0.2,setosa
4.7,3.2,1.3,0.2,setosa
4.6,3.1,1.5,0.2,setosa
5.0,3.6,1.4,0.2,setosa
guess機能に対する設定を、以下のようにguess.ymlファイルに記述しておきます。(テキストエディタで作成します)
$ cat guess.yml 
in:
 type: file
 path_prefix: "./IRIS.csv"
out:
 type: postgresql
 host: localhost
 user: postgres
 password: xxxx
 database: postgres
 table: iris
 mode: insert

guessコマンドを実行すると、推定した結果がconfig.ymlファイルに保存されます。

$ embulk guess ./guess.yml -o config.yml
2016-04-25 16:10:35.393 +0900: Embulk v0.8.8
2016-04-25 16:10:36.574 +0900 [INFO] (0001:guess): Listing local files at directory '.' filtering filename by prefix 'IRIS.csv'
2016-04-25 16:10:36.583 +0900 [INFO] (0001:guess): Loading files [IRIS.csv]
2016-04-25 16:10:36.637 +0900 [INFO] (0001:guess): Loaded plugin embulk/guess/gzip from a load path
2016-04-25 16:10:36.652 +0900 [INFO] (0001:guess): Loaded plugin embulk/guess/bzip2 from a load path
2016-04-25 16:10:36.667 +0900 [INFO] (0001:guess): Loaded plugin embulk/guess/json from a load path
2016-04-25 16:10:36.676 +0900 [INFO] (0001:guess): Loaded plugin embulk/guess/csv from a load path
in:
 type: file
 path_prefix: ./IRIS.csv
 parser:
 charset: UTF-8
 newline: CRLF
 type: csv
 delimiter: ','
 quote: '"'
 escape: '"'
 trim_if_not_quoted: false
 skip_header_lines: 1
 allow_extra_columns: false
 allow_optional_columns: false
 columns:
 - {name: Sepal_Length, type: double}
 - {name: Sepal_Width, type: double}
 - {name: Petal_Length, type: double}
 - {name: Petal_Width, type: double}
 - {name: Species, type: string}
out: {type: postgresql, host: localhost, user: postgres, password: xxxx, database: postgres,
 table: iris, mode: insert}
Created 'config.yml' file.
config.ymlの内容は、次のようになっています。guess.ymlに記述した内容は、そのまま引き継がれていると思います。
追加で、ロード対象がCSVであることや、そのオプション設定、カラムの設定などが保存されています。
$ cat config.yml 
in:
 type: file
 path_prefix: ./IRIS.csv
 parser:
 charset: UTF-8
 newline: CRLF
 type: csv
 delimiter: ','
 quote: '"'
 escape: '"'
 trim_if_not_quoted: false
 skip_header_lines: 1
 allow_extra_columns: false
 allow_optional_columns: false
 columns:
 - {name: Sepal_Length, type: double}
 - {name: Sepal_Width, type: double}
 - {name: Petal_Length, type: double}
 - {name: Petal_Width, type: double}
 - {name: Species, type: string}
out: {type: postgresql, host: localhost, user: postgres, password: xxxx, database: postgres,
 table: iris, mode: insert}

データのロードを実行します。

$ embulk run config.yml
2016-04-25 16:19:01.866 +0900: Embulk v0.8.8
2016-04-25 16:19:03.537 +0900 [INFO] (0001:transaction): Loaded plugin embulk-output-postgresql (0.5.1)
2016-04-25 16:19:03.580 +0900 [INFO] (0001:transaction): Listing local files at directory '.' filtering filename by prefix 'IRIS.csv'
2016-04-25 16:19:03.587 +0900 [INFO] (0001:transaction): Loading files [IRIS.csv]
2016-04-25 16:19:03.649 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / output tasks 4 = input tasks 1 * 4
2016-04-25 16:19:03.674 +0900 [INFO] (0001:transaction): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=postgres, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2016-04-25 16:19:03.764 +0900 [INFO] (0001:transaction): SQL: SET search_path TO "public"
2016-04-25 16:19:03.766 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-04-25 16:19:03.766 +0900 [INFO] (0001:transaction): Using insert mode
2016-04-25 16:19:03.785 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "iris_71dc4e71a1b5980_bl_tmp000"
2016-04-25 16:19:03.786 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-04-25 16:19:03.796 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "iris_71dc4e71a1b5980_bl_tmp000" ("Sepal_Length" DOUBLE PRECISION, "Sepal_Width" DOUBLE PRECISION, "Petal_Length" DOUBLE PRECISION, "Petal_Width" DOUBLE PRECISION, "Species" TEXT)
2016-04-25 16:19:03.812 +0900 [INFO] (0001:transaction): > 0.02 seconds
2016-04-25 16:19:03.814 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "iris_71dc4e71a1b5980_bl_tmp001"
2016-04-25 16:19:03.815 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-04-25 16:19:03.817 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "iris_71dc4e71a1b5980_bl_tmp001" ("Sepal_Length" DOUBLE PRECISION, "Sepal_Width" DOUBLE PRECISION, "Petal_Length" DOUBLE PRECISION, "Petal_Width" DOUBLE PRECISION, "Species" TEXT)
2016-04-25 16:19:03.824 +0900 [INFO] (0001:transaction): > 0.01 seconds
2016-04-25 16:19:03.824 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "iris_71dc4e71a1b5980_bl_tmp002"
2016-04-25 16:19:03.825 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-04-25 16:19:03.828 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "iris_71dc4e71a1b5980_bl_tmp002" ("Sepal_Length" DOUBLE PRECISION, "Sepal_Width" DOUBLE PRECISION, "Petal_Length" DOUBLE PRECISION, "Petal_Width" DOUBLE PRECISION, "Species" TEXT)
2016-04-25 16:19:03.833 +0900 [INFO] (0001:transaction): > 0.01 seconds
2016-04-25 16:19:03.834 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "iris_71dc4e71a1b5980_bl_tmp003"
2016-04-25 16:19:03.835 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-04-25 16:19:03.837 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "iris_71dc4e71a1b5980_bl_tmp003" ("Sepal_Length" DOUBLE PRECISION, "Sepal_Width" DOUBLE PRECISION, "Petal_Length" DOUBLE PRECISION, "Petal_Width" DOUBLE PRECISION, "Species" TEXT)
2016-04-25 16:19:03.841 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-04-25 16:19:03.895 +0900 [INFO] (0001:transaction): {done: 0 / 1, running: 0}
2016-04-25 16:19:03.931 +0900 [INFO] (0017:task-0000): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=postgres, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2016-04-25 16:19:03.948 +0900 [INFO] (0017:task-0000): SQL: SET search_path TO "public"
2016-04-25 16:19:03.949 +0900 [INFO] (0017:task-0000): > 0.00 seconds
2016-04-25 16:19:03.950 +0900 [INFO] (0017:task-0000): Copy SQL: COPY "iris_71dc4e71a1b5980_bl_tmp000" ("Sepal_Length", "Sepal_Width", "Petal_Length", "Petal_Width", "Species") FROM STDIN
2016-04-25 16:19:03.955 +0900 [INFO] (0017:task-0000): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=postgres, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2016-04-25 16:19:03.967 +0900 [INFO] (0017:task-0000): SQL: SET search_path TO "public"
2016-04-25 16:19:03.968 +0900 [INFO] (0017:task-0000): > 0.00 seconds
2016-04-25 16:19:03.968 +0900 [INFO] (0017:task-0000): Copy SQL: COPY "iris_71dc4e71a1b5980_bl_tmp001" ("Sepal_Length", "Sepal_Width", "Petal_Length", "Petal_Width", "Species") FROM STDIN
2016-04-25 16:19:03.970 +0900 [INFO] (0017:task-0000): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=postgres, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2016-04-25 16:19:03.983 +0900 [INFO] (0017:task-0000): SQL: SET search_path TO "public"
2016-04-25 16:19:03.984 +0900 [INFO] (0017:task-0000): > 0.00 seconds
2016-04-25 16:19:03.984 +0900 [INFO] (0017:task-0000): Copy SQL: COPY "iris_71dc4e71a1b5980_bl_tmp002" ("Sepal_Length", "Sepal_Width", "Petal_Length", "Petal_Width", "Species") FROM STDIN
2016-04-25 16:19:03.986 +0900 [INFO] (0017:task-0000): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=postgres, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2016-04-25 16:19:03.998 +0900 [INFO] (0017:task-0000): SQL: SET search_path TO "public"
2016-04-25 16:19:04.000 +0900 [INFO] (0017:task-0000): > 0.00 seconds
2016-04-25 16:19:04.000 +0900 [INFO] (0017:task-0000): Copy SQL: COPY "iris_71dc4e71a1b5980_bl_tmp003" ("Sepal_Length", "Sepal_Width", "Petal_Length", "Petal_Width", "Species") FROM STDIN
2016-04-25 16:19:04.061 +0900 [INFO] (0017:task-0000): Loading 150 rows (3,800 bytes)
2016-04-25 16:19:04.073 +0900 [INFO] (0017:task-0000): > 0.01 seconds (loaded 150 rows in total)
2016-04-25 16:19:04.076 +0900 [INFO] (0001:transaction): {done: 1 / 1, running: 0}
2016-04-25 16:19:04.077 +0900 [INFO] (0001:transaction): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=postgres, tcpKeepAlive=true, loginTimeout=300, socketTimeout=28800}
2016-04-25 16:19:04.088 +0900 [INFO] (0001:transaction): SQL: SET search_path TO "public"
2016-04-25 16:19:04.089 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-04-25 16:19:04.091 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "iris" ("Sepal_Length" DOUBLE PRECISION, "Sepal_Width" DOUBLE PRECISION, "Petal_Length" DOUBLE PRECISION, "Petal_Width" DOUBLE PRECISION, "Species" TEXT)
2016-04-25 16:19:04.104 +0900 [INFO] (0001:transaction): > 0.01 seconds
2016-04-25 16:19:04.106 +0900 [INFO] (0001:transaction): SQL: INSERT INTO "iris" ("Sepal_Length", "Sepal_Width", "Petal_Length", "Petal_Width", "Species") SELECT "Sepal_Length", "Sepal_Width", "Petal_Length", "Petal_Width", "Species" FROM "iris_71dc4e71a1b5980_bl_tmp000" UNION ALL SELECT "Sepal_Length", "Sepal_Width", "Petal_Length", "Petal_Width", "Species" FROM "iris_71dc4e71a1b5980_bl_tmp001" UNION ALL SELECT "Sepal_Length", "Sepal_Width", "Petal_Length", "Petal_Width", "Species" FROM "iris_71dc4e71a1b5980_bl_tmp002" UNION ALL SELECT "Sepal_Length", "Sepal_Width", "Petal_Length", "Petal_Width", "Species" FROM "iris_71dc4e71a1b5980_bl_tmp003"
2016-04-25 16:19:04.113 +0900 [INFO] (0001:transaction): > 0.01 seconds (150 rows)
2016-04-25 16:19:04.125 +0900 [INFO] (0001:transaction): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=postgres, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2016-04-25 16:19:04.136 +0900 [INFO] (0001:transaction): SQL: SET search_path TO "public"
2016-04-25 16:19:04.137 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-04-25 16:19:04.137 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "iris_71dc4e71a1b5980_bl_tmp000"
2016-04-25 16:19:04.144 +0900 [INFO] (0001:transaction): > 0.01 seconds
2016-04-25 16:19:04.144 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "iris_71dc4e71a1b5980_bl_tmp001"
2016-04-25 16:19:04.147 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-04-25 16:19:04.147 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "iris_71dc4e71a1b5980_bl_tmp002"
2016-04-25 16:19:04.150 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-04-25 16:19:04.151 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "iris_71dc4e71a1b5980_bl_tmp003"
2016-04-25 16:19:04.156 +0900 [INFO] (0001:transaction): > 0.01 seconds
2016-04-25 16:19:04.160 +0900 [INFO] (main): Committed.
2016-04-25 16:19:04.160 +0900 [INFO] (main): Next config diff: {"in":{"last_path":"IRIS.csv"},"out":{}}

無事にデータがデータベースにロードされました。テーブルの中身を確認します。

pgadmin_20160425_1

各列のデータ型を確認します。
CREATE TABLE public.iris
(
"Sepal_Length" double precision,
"Sepal_Width" double precision,
"Petal_Length" double precision,
"Petal_Width" double precision,
"Species" text
)

データのロードに成功しました。

日本語のデータセットを試す

次は日本語を含むデータで試してみます。オープンデータとして、下記のサイトからダウンロードできるCSVファイルを使うことにしました。

http://www.ekidata.jp/

ekijp_20160425_1

ダウンロードできるデータはいくつか種類がありますが、今回は「駅データ」を使用しました。データの中身はこのようになっています。

$ cat station20160401free.csv
station_cd,station_g_cd,station_name,station_name_k,station_name_r,line_cd,pref_cd,post,add,lon,lat,open_ymd,close_ymd,e_status,e_sort
1110101,1110101,函館,,,11101,1,040-0063,北海道函館市若松町12-13,140.726413,41.773709,1902-12-10,,0,1110101
1110102,1110102,五稜郭,,,11101,1,041-0813,函館市亀田本町,140.733539,41.803557,,,0,1110102
1110103,1110103,桔梗,,,11101,1,041-1210,北海道函館市桔梗3丁目41-36,140.722952,41.846457,1902-12-10,,0,1110103
1110104,1110104,大中山,,,11101,1,041-1121,亀田郡七飯町大字大中山,140.71358,41.864641,,,0,1110104
1110105,1110105,七飯,,,11101,1,041-1111,亀田郡七飯町字本町,140.688556,41.886971,,,0,1110105

先ほどと同様に、guess機能に対する設定をguess.ymlファイルに記述しておきます。(テキストエディタで作成します)

in:
  type: file
  path_prefix: "./station20160401free.csv"
out:
   type: postgresql
   host: localhost
   user: postgres
   password: xxxx
   database: postgres
   table: station
   mode: insert

guessコマンドを実行すると、推定した結果がconfig.ymlファイルに保存されます。

$ embulk guess ./guess.yml -o config.yml
2016-04-25 16:55:09.781 +0900: Embulk v0.8.8
2016-04-25 16:55:10.655 +0900 [INFO] (0001:guess): Listing local files at directory '.' filtering filename by prefix 'station20160401free.csv'
2016-04-25 16:55:10.659 +0900 [INFO] (0001:guess): Loading files [station20160401free.csv]
2016-04-25 16:55:10.707 +0900 [INFO] (0001:guess): Loaded plugin embulk/guess/gzip from a load path
2016-04-25 16:55:10.720 +0900 [INFO] (0001:guess): Loaded plugin embulk/guess/bzip2 from a load path
2016-04-25 16:55:10.734 +0900 [INFO] (0001:guess): Loaded plugin embulk/guess/json from a load path
2016-04-25 16:55:10.738 +0900 [INFO] (0001:guess): Loaded plugin embulk/guess/csv from a load path
in:
  type: file
  path_prefix: ./station20160401free.csv
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    escape: '"'
    trim_if_not_quoted: false
    skip_header_lines: 1
    allow_extra_columns: false
    allow_optional_columns: false
    columns:
    - {name: station_cd, type: long}
    - {name: station_g_cd, type: long}
    - {name: station_name, type: string}
    - {name: station_name_k, type: string}
    - {name: station_name_r, type: string}
    - {name: line_cd, type: long}
    - {name: pref_cd, type: long}
    - {name: post, type: string}
    - {name: add, type: string}
    - {name: lon, type: string}
    - {name: lat, type: string}
    - {name: open_ymd, type: timestamp, format: '%Y-%m-%d'}
    - {name: close_ymd, type: timestamp, format: '%Y-%m-%d'}
    - {name: e_status, type: long}
    - {name: e_sort, type: long}
out: {type: postgresql, host: localhost, user: postgres, password: xxxx, database: postgres,
  table: station, mode: insert}
Created 'config.yml' file.

config.ymlの内容は、次のようになっています。

$ cat config.yml 
in:
  type: file
  path_prefix: ./station20160401free.csv
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    escape: '"'
    trim_if_not_quoted: false
    skip_header_lines: 1
    allow_extra_columns: false
    allow_optional_columns: false
    columns:
    - {name: station_cd, type: long}
    - {name: station_g_cd, type: long}
    - {name: station_name, type: string}
    - {name: station_name_k, type: string}
    - {name: station_name_r, type: string}
    - {name: line_cd, type: long}
    - {name: pref_cd, type: long}
    - {name: post, type: string}
    - {name: add, type: string}
    - {name: lon, type: string}
    - {name: lat, type: string}
    - {name: open_ymd, type: timestamp, format: '%Y-%m-%d'}
    - {name: close_ymd, type: timestamp, format: '%Y-%m-%d'}
    - {name: e_status, type: long}
    - {name: e_sort, type: long}
out: {type: postgresql, host: localhost, user: postgres, password: xxxx, database: postgres,
  table: station, mode: insert}

データのロードを実行します。

$ embulk run config.yml 
2016-04-25 16:58:42.047 +0900: Embulk v0.8.8
2016-04-25 16:58:43.584 +0900 [INFO] (0001:transaction): Loaded plugin embulk-output-postgresql (0.5.1)
2016-04-25 16:58:43.655 +0900 [INFO] (0001:transaction): Listing local files at directory '.' filtering filename by prefix 'station20160401free.csv'
2016-04-25 16:58:43.660 +0900 [INFO] (0001:transaction): Loading files [station20160401free.csv]
2016-04-25 16:58:43.742 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / output tasks 4 = input tasks 1 * 4
2016-04-25 16:58:43.759 +0900 [INFO] (0001:transaction): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=postgres, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2016-04-25 16:58:43.861 +0900 [INFO] (0001:transaction): SQL: SET search_path TO "public"
2016-04-25 16:58:43.863 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-04-25 16:58:43.864 +0900 [INFO] (0001:transaction): Using insert mode
2016-04-25 16:58:43.880 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "station_ce331f972880_bl_tmp000"
2016-04-25 16:58:43.882 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-04-25 16:58:43.894 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "station_ce331f972880_bl_tmp000" ("station_cd" BIGINT, "station_g_cd" BIGINT, "station_name" TEXT, "station_name_k" TEXT, "station_name_r" TEXT, "line_cd" BIGINT, "pref_cd" BIGINT, "post" TEXT, "add" TEXT, "lon" TEXT, "lat" TEXT, "open_ymd" TIMESTAMP WITH TIME ZONE, "close_ymd" TIMESTAMP WITH TIME ZONE, "e_status" BIGINT, "e_sort" BIGINT)
2016-04-25 16:58:43.909 +0900 [INFO] (0001:transaction): > 0.02 seconds
2016-04-25 16:58:43.910 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "station_ce331f972880_bl_tmp001"
2016-04-25 16:58:43.911 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-04-25 16:58:43.916 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "station_ce331f972880_bl_tmp001" ("station_cd" BIGINT, "station_g_cd" BIGINT, "station_name" TEXT, "station_name_k" TEXT, "station_name_r" TEXT, "line_cd" BIGINT, "pref_cd" BIGINT, "post" TEXT, "add" TEXT, "lon" TEXT, "lat" TEXT, "open_ymd" TIMESTAMP WITH TIME ZONE, "close_ymd" TIMESTAMP WITH TIME ZONE, "e_status" BIGINT, "e_sort" BIGINT)
2016-04-25 16:58:43.923 +0900 [INFO] (0001:transaction): > 0.01 seconds
2016-04-25 16:58:43.924 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "station_ce331f972880_bl_tmp002"
2016-04-25 16:58:43.924 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-04-25 16:58:43.929 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "station_ce331f972880_bl_tmp002" ("station_cd" BIGINT, "station_g_cd" BIGINT, "station_name" TEXT, "station_name_k" TEXT, "station_name_r" TEXT, "line_cd" BIGINT, "pref_cd" BIGINT, "post" TEXT, "add" TEXT, "lon" TEXT, "lat" TEXT, "open_ymd" TIMESTAMP WITH TIME ZONE, "close_ymd" TIMESTAMP WITH TIME ZONE, "e_status" BIGINT, "e_sort" BIGINT)
2016-04-25 16:58:43.933 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-04-25 16:58:43.933 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "station_ce331f972880_bl_tmp003"
2016-04-25 16:58:43.934 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-04-25 16:58:43.939 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "station_ce331f972880_bl_tmp003" ("station_cd" BIGINT, "station_g_cd" BIGINT, "station_name" TEXT, "station_name_k" TEXT, "station_name_r" TEXT, "line_cd" BIGINT, "pref_cd" BIGINT, "post" TEXT, "add" TEXT, "lon" TEXT, "lat" TEXT, "open_ymd" TIMESTAMP WITH TIME ZONE, "close_ymd" TIMESTAMP WITH TIME ZONE, "e_status" BIGINT, "e_sort" BIGINT)
2016-04-25 16:58:43.948 +0900 [INFO] (0001:transaction): > 0.01 seconds
2016-04-25 16:58:44.020 +0900 [INFO] (0001:transaction): {done:  0 / 1, running: 0}
2016-04-25 16:58:44.047 +0900 [INFO] (0017:task-0000): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=postgres, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2016-04-25 16:58:44.067 +0900 [INFO] (0017:task-0000): SQL: SET search_path TO "public"
2016-04-25 16:58:44.068 +0900 [INFO] (0017:task-0000): > 0.00 seconds
2016-04-25 16:58:44.069 +0900 [INFO] (0017:task-0000): Copy SQL: COPY "station_ce331f972880_bl_tmp000" ("station_cd", "station_g_cd", "station_name", "station_name_k", "station_name_r", "line_cd", "pref_cd", "post", "add", "lon", "lat", "open_ymd", "close_ymd", "e_status", "e_sort") FROM STDIN
2016-04-25 16:58:44.075 +0900 [INFO] (0017:task-0000): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=postgres, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2016-04-25 16:58:44.090 +0900 [INFO] (0017:task-0000): SQL: SET search_path TO "public"
2016-04-25 16:58:44.091 +0900 [INFO] (0017:task-0000): > 0.00 seconds
2016-04-25 16:58:44.091 +0900 [INFO] (0017:task-0000): Copy SQL: COPY "station_ce331f972880_bl_tmp001" ("station_cd", "station_g_cd", "station_name", "station_name_k", "station_name_r", "line_cd", "pref_cd", "post", "add", "lon", "lat", "open_ymd", "close_ymd", "e_status", "e_sort") FROM STDIN
2016-04-25 16:58:44.093 +0900 [INFO] (0017:task-0000): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=postgres, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2016-04-25 16:58:44.104 +0900 [INFO] (0017:task-0000): SQL: SET search_path TO "public"
2016-04-25 16:58:44.105 +0900 [INFO] (0017:task-0000): > 0.00 seconds
2016-04-25 16:58:44.105 +0900 [INFO] (0017:task-0000): Copy SQL: COPY "station_ce331f972880_bl_tmp002" ("station_cd", "station_g_cd", "station_name", "station_name_k", "station_name_r", "line_cd", "pref_cd", "post", "add", "lon", "lat", "open_ymd", "close_ymd", "e_status", "e_sort") FROM STDIN
2016-04-25 16:58:44.107 +0900 [INFO] (0017:task-0000): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=postgres, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2016-04-25 16:58:44.123 +0900 [INFO] (0017:task-0000): SQL: SET search_path TO "public"
2016-04-25 16:58:44.124 +0900 [INFO] (0017:task-0000): > 0.00 seconds
2016-04-25 16:58:44.124 +0900 [INFO] (0017:task-0000): Copy SQL: COPY "station_ce331f972880_bl_tmp003" ("station_cd", "station_g_cd", "station_name", "station_name_k", "station_name_r", "line_cd", "pref_cd", "post", "add", "lon", "lat", "open_ymd", "close_ymd", "e_status", "e_sort") FROM STDIN
2016-04-25 16:58:45.008 +0900 [INFO] (0017:task-0000): Loading 2,723 rows (320,762 bytes)
2016-04-25 16:58:45.053 +0900 [INFO] (0017:task-0000): > 0.05 seconds (loaded 2,723 rows in total)
2016-04-25 16:58:45.056 +0900 [INFO] (0017:task-0000): Loading 2,720 rows (320,300 bytes)
2016-04-25 16:58:45.084 +0900 [INFO] (0017:task-0000): > 0.03 seconds (loaded 2,720 rows in total)
2016-04-25 16:58:45.086 +0900 [INFO] (0017:task-0000): Loading 2,757 rows (322,460 bytes)
2016-04-25 16:58:45.112 +0900 [INFO] (0017:task-0000): > 0.03 seconds (loaded 2,757 rows in total)
2016-04-25 16:58:45.113 +0900 [INFO] (0017:task-0000): Loading 2,634 rows (310,929 bytes)
2016-04-25 16:58:45.141 +0900 [INFO] (0017:task-0000): > 0.03 seconds (loaded 2,634 rows in total)
2016-04-25 16:58:45.146 +0900 [INFO] (0001:transaction): {done:  1 / 1, running: 0}
2016-04-25 16:58:45.148 +0900 [INFO] (0001:transaction): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=postgres, tcpKeepAlive=true, loginTimeout=300, socketTimeout=28800}
2016-04-25 16:58:45.161 +0900 [INFO] (0001:transaction): SQL: SET search_path TO "public"
2016-04-25 16:58:45.162 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-04-25 16:58:45.167 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "station" ("station_cd" BIGINT, "station_g_cd" BIGINT, "station_name" TEXT, "station_name_k" TEXT, "station_name_r" TEXT, "line_cd" BIGINT, "pref_cd" BIGINT, "post" TEXT, "add" TEXT, "lon" TEXT, "lat" TEXT, "open_ymd" TIMESTAMP WITH TIME ZONE, "close_ymd" TIMESTAMP WITH TIME ZONE, "e_status" BIGINT, "e_sort" BIGINT)
2016-04-25 16:58:45.179 +0900 [INFO] (0001:transaction): > 0.01 seconds
2016-04-25 16:58:45.181 +0900 [INFO] (0001:transaction): SQL: INSERT INTO "station" ("station_cd", "station_g_cd", "station_name", "station_name_k", "station_name_r", "line_cd", "pref_cd", "post", "add", "lon", "lat", "open_ymd", "close_ymd", "e_status", "e_sort") SELECT "station_cd", "station_g_cd", "station_name", "station_name_k", "station_name_r", "line_cd", "pref_cd", "post", "add", "lon", "lat", "open_ymd", "close_ymd", "e_status", "e_sort" FROM "station_ce331f972880_bl_tmp000" UNION ALL SELECT "station_cd", "station_g_cd", "station_name", "station_name_k", "station_name_r", "line_cd", "pref_cd", "post", "add", "lon", "lat", "open_ymd", "close_ymd", "e_status", "e_sort" FROM "station_ce331f972880_bl_tmp001" UNION ALL SELECT "station_cd", "station_g_cd", "station_name", "station_name_k", "station_name_r", "line_cd", "pref_cd", "post", "add", "lon", "lat", "open_ymd", "close_ymd", "e_status", "e_sort" FROM "station_ce331f972880_bl_tmp002" UNION ALL SELECT "station_cd", "station_g_cd", "station_name", "station_name_k", "station_name_r", "line_cd", "pref_cd", "post", "add", "lon", "lat", "open_ymd", "close_ymd", "e_status", "e_sort" FROM "station_ce331f972880_bl_tmp003"
2016-04-25 16:58:45.227 +0900 [INFO] (0001:transaction): > 0.05 seconds (10,834 rows)
2016-04-25 16:58:45.247 +0900 [INFO] (0001:transaction): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=postgres, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2016-04-25 16:58:45.257 +0900 [INFO] (0001:transaction): SQL: SET search_path TO "public"
2016-04-25 16:58:45.258 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-04-25 16:58:45.258 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "station_ce331f972880_bl_tmp000"
2016-04-25 16:58:45.271 +0900 [INFO] (0001:transaction): > 0.01 seconds
2016-04-25 16:58:45.271 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "station_ce331f972880_bl_tmp001"
2016-04-25 16:58:45.277 +0900 [INFO] (0001:transaction): > 0.01 seconds
2016-04-25 16:58:45.277 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "station_ce331f972880_bl_tmp002"
2016-04-25 16:58:45.282 +0900 [INFO] (0001:transaction): > 0.01 seconds
2016-04-25 16:58:45.282 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "station_ce331f972880_bl_tmp003"
2016-04-25 16:58:45.289 +0900 [INFO] (0001:transaction): > 0.01 seconds
2016-04-25 16:58:45.292 +0900 [INFO] (main): Committed.
2016-04-25 16:58:45.292 +0900 [INFO] (main): Next config diff: {"in":{"last_path":"station20160401free.csv"},"out":{}}

無事にデータがデータベースにロードされました。テーブルの中身を確認します。

ekijp_station_20160425_1

各列のデータ型を確認します。
CREATE TABLE public.station
(
 station_cd bigint,
 station_g_cd bigint,
 station_name text,
 station_name_k text,
 station_name_r text,
 line_cd bigint,
 pref_cd bigint,
 post text,
 add text,
 lon text,
 lat text,
 open_ymd timestamp with time zone,
 close_ymd timestamp with time zone,
 e_status bigint,
 e_sort bigint
)

データのロードに成功しました。緯度(lat)、経度(lon)の値はtext型として読み込まれていますね。

まとめ

Embulkの読み込んだデータのレイアウトを推測する「guess」という機能を、実際に試してみました。 データファイルの中身を自動的に推測してくれるので、データのハンドリング作業が効率化できそうです。

次回

次回は、出力先データベースにRedshiftを使って実験してみたいと思います。

  • hiroyuki sato

    有用な情報ありがとうございます。
    本題とは関係ありませんが、PostgreSQLのデータを登録する際には、
    default_timezoneを設定するのが良いです。
    投入されたデータの日付が9時間ずれているように見えます。
    (投入されたデータ確認の画像のところが、+9:00になっています。
    意図としては、1902-12-10 0:0:0にしたいのだという認識です。)

    out:
    type: postgresql
    default_timezone: “Japan” # <– これ

    詳しくはこちらを参照ください。
    http://qiita.com/hiroysato/items/5bacf7916a6731c9fa36

    • 川崎照夫

      ご指摘ありがとうございます。
      なるほど、気付きませんでした。勉強になります!
      ブログに追記させていただきます。