Embulk+DigdagでExcelのデータをPostgreSQLにロードする

Embulkアイキャッチ

データインテグレーション部 大矢です。

DBのテストデータをExcelで書くことはよくあると思います。
今回はExcelで書いたデータをEmbulkでDBにロードする、ということをやってみようと思います。

使用した環境は以下のとおりです。

  • Embulk v0.8.13
  • Digdag v0.8.16
  • PostgreSQL 9.5.3
  • Mac OS X El Capitan version 10.11.6

ExcelのデータをPostgreSQLにロードする

作業ディレクトリ

先に今回の作業で使用するディレクトリ構成を示しておきます。最終的にはこうなります。

excelload
├── allsheet.dig
└── embulk
  ├── sales.yml
  ├── testdata.xlsx
  └── users.yml

Excelのテストデータ

ロードするデータ Excelのテストデータシートはこのように1シートに1テーブルのデータを書きます。
シート名はテーブル名にするなど、分かりやすくしておきます。

excel_users

DBにロード先のテーブルを作ります。

create table users(
userid integer not null primary key,
username char(8),
city varchar(30)
);

Embulkのインストール

$ curl --create-dirs-o ~/.embulk/bin/embulk-L "http://dl.embulk.org/embulk-latest.jar"
$ chmod+x ~/.embulk/bin/embulk
$ echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc
$ source ~/.bashrc

Embulkプラグインのインストール

Embulkはインプット/アウトプットしたいものに応じたプラグインがたくさんありますが、 今回は以下のものを使用します。

embulk-parser-roo-excel Excelファイルを読み込むプラグイン
embulk-output-postgresql PostgreSQLのテーブルへデータを出力するJDBCプラグイン
$ embulk gem install embulk-parser-roo-excel
$ embulk gem install embulk-output-postgresql

Embulkの設定ファイル

roo-excelプラグインは今のところguess機能非サポートのようですので、Embulkの設定ファイルは自力で書くことになります。
以下のようなファイルを作成し、users.ymlとして保存。

users.yml

in:
  type: file
  path_prefix: /Users/oya.akihisa/work/excelload/embulk/testdata.xlsx
  parser:
    type: roo-excel                  # 使用するパーサープラグイン
    skip_header_lines: 1             # ヘッダー行の数
    sheet: "users"                   # シート名
    columns:                         # 列名と型
    - {name: userid, type: long}
    - {name: username, type: string}
    - {name: city, type: string}
out:
  type: postgresql
  host: localhost
  user: oya.akihisa
  password: ""
  database: postgres
  table: users                       # テーブル名
  mode: truncate_insert              # 全行消してからinsert

columsで使用できるtype

columsで使用できる、Embulkでサポートされているtypeは以下の通りです。

説明
boolean 真偽値
long 整数型
timestamp 時刻
double 浮動小数点
string 文字列

mode

テストデータは消して入れなおすことを想定しているので、ここではtruncate_insertを使用しています。
他にもinsert, insert_direct, replace, mergeがあるようです。

Embulk実行

作成した設定ファイルをパラメータにして、runコマンドを実行。

$ embulk run users.yml 

2016-09-30 18:21:57.526 +0900: Embulk v0.8.13
2016-09-30 18:21:59.935 +0900 [INFO] (0001:transaction): Loaded plugin embulk-output-postgresql (0.6.3)
2016-09-30 18:22:01.182 +0900 [INFO] (0001:transaction): Loaded plugin embulk-parser-roo-excel (0.0.1)
2016-09-30 18:22:01.219 +0900 [INFO] (0001:transaction): Listing local files at directory '/Users/oya.akihisa/work/excelload/embulk' filtering filename by prefix 'testdata.xlsx'
2016-09-30 18:22:01.235 +0900 [INFO] (0001:transaction): Loading files [/Users/oya.akihisa/work/excelload/embulk/testdata.xlsx]
2016-09-30 18:22:01.285 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / output tasks 4 = input tasks 1 * 4
2016-09-30 18:22:01.319 +0900 [INFO] (0001:transaction): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=oya.akihisa, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2016-09-30 18:22:01.508 +0900 [INFO] (0001:transaction): SQL: SET search_path TO "public"
2016-09-30 18:22:01.511 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-09-30 18:22:01.511 +0900 [INFO] (0001:transaction): Using truncate_insert mode
2016-09-30 18:22:01.560 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "users_ee2eb72c770e80_bl_tmp000"
2016-09-30 18:22:01.562 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-09-30 18:22:01.576 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "users_ee2eb72c770e80_bl_tmp000" ("userid" INT4, "username" BPCHAR, "city" VARCHAR(30))
2016-09-30 18:22:01.607 +0900 [INFO] (0001:transaction): > 0.03 seconds
2016-09-30 18:22:01.609 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "users_ee2eb72c770e80_bl_tmp001"
2016-09-30 18:22:01.610 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-09-30 18:22:01.613 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "users_ee2eb72c770e80_bl_tmp001" ("userid" INT4, "username" BPCHAR, "city" VARCHAR(30))
2016-09-30 18:22:01.617 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-09-30 18:22:01.618 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "users_ee2eb72c770e80_bl_tmp002"
2016-09-30 18:22:01.619 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-09-30 18:22:01.621 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "users_ee2eb72c770e80_bl_tmp002" ("userid" INT4, "username" BPCHAR, "city" VARCHAR(30))
2016-09-30 18:22:01.626 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-09-30 18:22:01.627 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "users_ee2eb72c770e80_bl_tmp003"
2016-09-30 18:22:01.627 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-09-30 18:22:01.629 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "users_ee2eb72c770e80_bl_tmp003" ("userid" INT4, "username" BPCHAR, "city" VARCHAR(30))
2016-09-30 18:22:01.633 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-09-30 18:22:01.686 +0900 [INFO] (0001:transaction): {done:  0 / 1, running: 0}
2016-09-30 18:22:01.737 +0900 [INFO] (0017:task-0000): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=oya.akihisa, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2016-09-30 18:22:01.752 +0900 [INFO] (0017:task-0000): SQL: SET search_path TO "public"
2016-09-30 18:22:01.753 +0900 [INFO] (0017:task-0000): > 0.00 seconds
2016-09-30 18:22:01.754 +0900 [INFO] (0017:task-0000): Copy SQL: COPY "users_ee2eb72c770e80_bl_tmp000" ("userid", "username", "city") FROM STDIN
2016-09-30 18:22:01.759 +0900 [INFO] (0017:task-0000): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=oya.akihisa, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2016-09-30 18:22:01.771 +0900 [INFO] (0017:task-0000): SQL: SET search_path TO "public"
2016-09-30 18:22:01.772 +0900 [INFO] (0017:task-0000): > 0.00 seconds
2016-09-30 18:22:01.772 +0900 [INFO] (0017:task-0000): Copy SQL: COPY "users_ee2eb72c770e80_bl_tmp001" ("userid", "username", "city") FROM STDIN
2016-09-30 18:22:01.775 +0900 [INFO] (0017:task-0000): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=oya.akihisa, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2016-09-30 18:22:01.787 +0900 [INFO] (0017:task-0000): SQL: SET search_path TO "public"
2016-09-30 18:22:01.788 +0900 [INFO] (0017:task-0000): > 0.00 seconds
2016-09-30 18:22:01.788 +0900 [INFO] (0017:task-0000): Copy SQL: COPY "users_ee2eb72c770e80_bl_tmp002" ("userid", "username", "city") FROM STDIN
2016-09-30 18:22:01.789 +0900 [INFO] (0017:task-0000): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=oya.akihisa, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2016-09-30 18:22:01.800 +0900 [INFO] (0017:task-0000): SQL: SET search_path TO "public"
2016-09-30 18:22:01.801 +0900 [INFO] (0017:task-0000): > 0.00 seconds
2016-09-30 18:22:01.801 +0900 [INFO] (0017:task-0000): Copy SQL: COPY "users_ee2eb72c770e80_bl_tmp003" ("userid", "username", "city") FROM STDIN
2016-09-30 18:22:02.698 +0900 [INFO] (0017:task-0000): Loading 3 rows (41 bytes)
2016-09-30 18:22:02.703 +0900 [INFO] (0017:task-0000): > 0.00 seconds (loaded 3 rows in total)
2016-09-30 18:22:02.707 +0900 [INFO] (0001:transaction): {done:  1 / 1, running: 0}
2016-09-30 18:22:02.708 +0900 [INFO] (0001:transaction): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=oya.akihisa, tcpKeepAlive=true, loginTimeout=300, socketTimeout=28800}
2016-09-30 18:22:02.719 +0900 [INFO] (0001:transaction): SQL: SET search_path TO "public"
2016-09-30 18:22:02.720 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-09-30 18:22:02.721 +0900 [INFO] (0001:transaction): SQL: DELETE FROM "users"
2016-09-30 18:22:02.723 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-09-30 18:22:02.723 +0900 [INFO] (0001:transaction): SQL: INSERT INTO "users" ("userid", "username", "city") SELECT "userid", "username", "city" FROM "users_ee2eb72c770e80_bl_tmp000" UNION ALL SELECT "userid", "username", "city" FROM "users_ee2eb72c770e80_bl_tmp001" UNION ALL SELECT "userid", "username", "city" FROM "users_ee2eb72c770e80_bl_tmp002" UNION ALL SELECT "userid", "username", "city" FROM "users_ee2eb72c770e80_bl_tmp003"
2016-09-30 18:22:02.727 +0900 [INFO] (0001:transaction): > 0.00 seconds (3 rows)
2016-09-30 18:22:02.742 +0900 [INFO] (0001:transaction): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=oya.akihisa, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2016-09-30 18:22:02.752 +0900 [INFO] (0001:transaction): SQL: SET search_path TO "public"
2016-09-30 18:22:02.754 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-09-30 18:22:02.754 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "users_ee2eb72c770e80_bl_tmp000"
2016-09-30 18:22:02.762 +0900 [INFO] (0001:transaction): > 0.01 seconds
2016-09-30 18:22:02.762 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "users_ee2eb72c770e80_bl_tmp001"
2016-09-30 18:22:02.766 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-09-30 18:22:02.766 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "users_ee2eb72c770e80_bl_tmp002"
2016-09-30 18:22:02.769 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-09-30 18:22:02.769 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "users_ee2eb72c770e80_bl_tmp003"
2016-09-30 18:22:02.773 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-09-30 18:22:02.776 +0900 [INFO] (main): Committed.
2016-09-30 18:22:02.776 +0900 [INFO] (main): Next config diff: {"in":{"last_path":"/Users/oya.akihisa/work/excelload/embulk/testdata.xlsx"},"out":{}}

ログを見ると、ただINSERTしてるだけ、ではなさそうですね。
Embulkの処理には、トランザクションステージとタスクステージというものがあります。
...(0001:transaction)...の行はトランザクションステージで、書き込むための作業テーブルの準備を行っています。
その後の...(0017:task-0000)...の行はタスクステージで、作業テーブルへの書き込みを行っています。
タスクステージが終わるとまたトランザクションステージになり、ターゲットテーブルへのINSERT、作業テーブルの削除などが行われています。
そして最後にCommitが行われています。

スクリーンショット 2016-09-30 18.22.35

無事DBにデータが入ったようです。漢字も大丈夫そうです。

複数シートのロード

先ほどは1シートで1テーブルのデータをロードしましたが、複数テーブルのデータをロードするにはどうしたらよいでしょうか。
ここでDigdagを使ってみます。
Digdagを使って、Embulkを複数回実行させるようにします。

2つ目のテーブルとテストデータ

2つ目のテーブルとテストデータを用意。

excel_sales

create table sales(
	salesid integer not null primary key,
	pricepaid integer,
	saletime timestamp
);

salesテーブル用のEmbulkの設定ファイル

salesテーブル用の設定ファイルを作成します。
ファイル名をsales.ymlとして保存。

sales.yml

in:
  type: file
  path_prefix: /Users/oya.akihisa/work/excelload/embulk/testdata.xlsx
  parser:
    type: roo-excel
    skip_header_lines: 1
    sheet: "sales"
    columns:
    - {name: salesid, type: long}
    - {name: pricepaid, type: long}
    - {name: saletime, type: timestamp}
out:
  type: postgresql
  host: localhost
  user: oya.akihisa
  password: ""
  database: postgres
  table: sales
  mode: truncate_insert
  default_timezone: "Japan"

default_timezone

"Japan"としています。これを設定しないと時刻データがUTCとなります。

Digdagのインストール

$ curl -o ~/bin/digdag--create-dirs-L "https://dl.digdag.io/digdag-latest"
$ chmod+x ~/bin/digdag
$ echo 'export PATH="$HOME/bin:$PATH"' >> ~/.bashrc

Digdagの設定ファイル

allsheet.dig

+all_sheet:
  for_each>:
    config: [embulk/users.yml, embulk/sales.yml]
  _do:
    embulk>: ${config}

Digdagの設定ファイルではさまざまなオペレータが使用できますが、ここでは繰り返し処理のためのfor_eachオペレーターを使います。
上記のようにシート名を配列にしてembulkオペレーターに渡すと、要素数分繰り返し処理してくれます。

Digdag実行

作成した設定ファイルをパラメータにして、runコマンドを実行。

$ digdag run allsheet.dig 

2016-10-03 09:53:00 +0900: Digdag v0.8.16
2016-10-03 09:53:01 +0900 [WARN] (main): Using a new session time 2016-10-03T00:00:00+00:00.
2016-10-03 09:53:01 +0900 [INFO] (main): Using session /Users/oya.akihisa/work/excelload/.digdag/status/20161003T000000+0000.
2016-10-03 09:53:01 +0900 [INFO] (main): Starting a new session project id=1 workflow name=allsheet session_time=2016-10-03T00:00:00+00:00
2016-10-03 09:53:01 +0900 [INFO] (0017@+allsheet+all_sheet): for_each>: {config=[embulk/users.yml, embulk/sales.yml]}
2016-10-03 09:53:01 +0900 [INFO] (0017@+allsheet+all_sheet^sub+for-config=embulk%2Fusers.yml): embulk>: embulk/users.yml
2016-10-03 09:53:06.041 +0900: Embulk v0.8.13
2016-10-03 09:53:07.963 +0900 [INFO] (0001:transaction): Loaded plugin embulk-output-postgresql (0.6.3)
2016-10-03 09:53:09.210 +0900 [INFO] (0001:transaction): Loaded plugin embulk-parser-roo-excel (0.0.1)
2016-10-03 09:53:09.239 +0900 [INFO] (0001:transaction): Listing local files at directory '/Users/oya.akihisa/work/excelload/embulk' filtering filename by prefix 'testdata.xlsx'
2016-10-03 09:53:09.246 +0900 [INFO] (0001:transaction): Loading files [/Users/oya.akihisa/work/excelload/embulk/testdata.xlsx]
2016-10-03 09:53:09.303 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / output tasks 4 = input tasks 1 * 4
2016-10-03 09:53:09.342 +0900 [INFO] (0001:transaction): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=oya.akihisa, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2016-10-03 09:53:09.453 +0900 [INFO] (0001:transaction): SQL: SET search_path TO "public"
2016-10-03 09:53:09.456 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-10-03 09:53:09.456 +0900 [INFO] (0001:transaction): Using truncate_insert mode
2016-10-03 09:53:09.504 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "users_f1abf330b26e40_bl_tmp000"
2016-10-03 09:53:09.507 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-10-03 09:53:09.524 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "users_f1abf330b26e40_bl_tmp000" ("userid" INT4, "username" BPCHAR, "city" VARCHAR(30))
2016-10-03 09:53:09.535 +0900 [INFO] (0001:transaction): > 0.01 seconds
2016-10-03 09:53:09.537 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "users_f1abf330b26e40_bl_tmp001"
2016-10-03 09:53:09.538 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-10-03 09:53:09.540 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "users_f1abf330b26e40_bl_tmp001" ("userid" INT4, "username" BPCHAR, "city" VARCHAR(30))
2016-10-03 09:53:09.551 +0900 [INFO] (0001:transaction): > 0.01 seconds
2016-10-03 09:53:09.552 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "users_f1abf330b26e40_bl_tmp002"
2016-10-03 09:53:09.552 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-10-03 09:53:09.557 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "users_f1abf330b26e40_bl_tmp002" ("userid" INT4, "username" BPCHAR, "city" VARCHAR(30))
2016-10-03 09:53:09.561 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-10-03 09:53:09.562 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "users_f1abf330b26e40_bl_tmp003"
2016-10-03 09:53:09.563 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-10-03 09:53:09.565 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "users_f1abf330b26e40_bl_tmp003" ("userid" INT4, "username" BPCHAR, "city" VARCHAR(30))
2016-10-03 09:53:09.569 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-10-03 09:53:09.613 +0900 [INFO] (0001:transaction): {done:  0 / 1, running: 0}
2016-10-03 09:53:09.656 +0900 [INFO] (0017:task-0000): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=oya.akihisa, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2016-10-03 09:53:09.670 +0900 [INFO] (0017:task-0000): SQL: SET search_path TO "public"
2016-10-03 09:53:09.671 +0900 [INFO] (0017:task-0000): > 0.00 seconds
2016-10-03 09:53:09.673 +0900 [INFO] (0017:task-0000): Copy SQL: COPY "users_f1abf330b26e40_bl_tmp000" ("userid", "username", "city") FROM STDIN
2016-10-03 09:53:09.676 +0900 [INFO] (0017:task-0000): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=oya.akihisa, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2016-10-03 09:53:09.686 +0900 [INFO] (0017:task-0000): SQL: SET search_path TO "public"
2016-10-03 09:53:09.688 +0900 [INFO] (0017:task-0000): > 0.00 seconds
2016-10-03 09:53:09.689 +0900 [INFO] (0017:task-0000): Copy SQL: COPY "users_f1abf330b26e40_bl_tmp001" ("userid", "username", "city") FROM STDIN
2016-10-03 09:53:09.690 +0900 [INFO] (0017:task-0000): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=oya.akihisa, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2016-10-03 09:53:09.700 +0900 [INFO] (0017:task-0000): SQL: SET search_path TO "public"
2016-10-03 09:53:09.700 +0900 [INFO] (0017:task-0000): > 0.00 seconds
2016-10-03 09:53:09.701 +0900 [INFO] (0017:task-0000): Copy SQL: COPY "users_f1abf330b26e40_bl_tmp002" ("userid", "username", "city") FROM STDIN
2016-10-03 09:53:09.701 +0900 [INFO] (0017:task-0000): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=oya.akihisa, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2016-10-03 09:53:09.712 +0900 [INFO] (0017:task-0000): SQL: SET search_path TO "public"
2016-10-03 09:53:09.713 +0900 [INFO] (0017:task-0000): > 0.00 seconds
2016-10-03 09:53:09.713 +0900 [INFO] (0017:task-0000): Copy SQL: COPY "users_f1abf330b26e40_bl_tmp003" ("userid", "username", "city") FROM STDIN
2016-10-03 09:53:10.502 +0900 [INFO] (0017:task-0000): Loading 3 rows (41 bytes)
2016-10-03 09:53:10.522 +0900 [INFO] (0017:task-0000): > 0.01 seconds (loaded 3 rows in total)
2016-10-03 09:53:10.527 +0900 [INFO] (0001:transaction): {done:  1 / 1, running: 0}
2016-10-03 09:53:10.528 +0900 [INFO] (0001:transaction): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=oya.akihisa, tcpKeepAlive=true, loginTimeout=300, socketTimeout=28800}
2016-10-03 09:53:10.537 +0900 [INFO] (0001:transaction): SQL: SET search_path TO "public"
2016-10-03 09:53:10.538 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-10-03 09:53:10.539 +0900 [INFO] (0001:transaction): SQL: DELETE FROM "users"
2016-10-03 09:53:10.542 +0900 [INFO] (0001:transaction): > 0.00 seconds (3 rows)
2016-10-03 09:53:10.542 +0900 [INFO] (0001:transaction): SQL: INSERT INTO "users" ("userid", "username", "city") SELECT "userid", "username", "city" FROM "users_f1abf330b26e40_bl_tmp000" UNION ALL SELECT "userid", "username", "city" FROM "users_f1abf330b26e40_bl_tmp001" UNION ALL SELECT "userid", "username", "city" FROM "users_f1abf330b26e40_bl_tmp002" UNION ALL SELECT "userid", "username", "city" FROM "users_f1abf330b26e40_bl_tmp003"
2016-10-03 09:53:10.547 +0900 [INFO] (0001:transaction): > 0.00 seconds (3 rows)
2016-10-03 09:53:10.565 +0900 [INFO] (0001:transaction): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=oya.akihisa, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2016-10-03 09:53:10.575 +0900 [INFO] (0001:transaction): SQL: SET search_path TO "public"
2016-10-03 09:53:10.576 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-10-03 09:53:10.576 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "users_f1abf330b26e40_bl_tmp000"
2016-10-03 09:53:10.582 +0900 [INFO] (0001:transaction): > 0.01 seconds
2016-10-03 09:53:10.582 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "users_f1abf330b26e40_bl_tmp001"
2016-10-03 09:53:10.586 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-10-03 09:53:10.586 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "users_f1abf330b26e40_bl_tmp002"
2016-10-03 09:53:10.590 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-10-03 09:53:10.590 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "users_f1abf330b26e40_bl_tmp003"
2016-10-03 09:53:10.593 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-10-03 09:53:10.595 +0900 [INFO] (main): Committed.
2016-10-03 09:53:10.596 +0900 [INFO] (main): Next config diff: {"in":{"last_path":"/Users/oya.akihisa/work/excelload/embulk/testdata.xlsx"},"out":{}}

2016-10-03 09:53:10 +0900 [INFO] (0017@+allsheet+all_sheet^sub+for-config=embulk%2Fsales.yml): embulk>: embulk/sales.yml
2016-10-03 09:53:14.733 +0900: Embulk v0.8.13
2016-10-03 09:53:16.858 +0900 [INFO] (0001:transaction): Loaded plugin embulk-output-postgresql (0.6.3)
2016-10-03 09:53:17.996 +0900 [INFO] (0001:transaction): Loaded plugin embulk-parser-roo-excel (0.0.1)
2016-10-03 09:53:18.024 +0900 [INFO] (0001:transaction): Listing local files at directory '/Users/oya.akihisa/work/excelload/embulk' filtering filename by prefix 'testdata.xlsx'
2016-10-03 09:53:18.031 +0900 [INFO] (0001:transaction): Loading files [/Users/oya.akihisa/work/excelload/embulk/testdata.xlsx]
2016-10-03 09:53:18.085 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / output tasks 4 = input tasks 1 * 4
2016-10-03 09:53:18.122 +0900 [INFO] (0001:transaction): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=oya.akihisa, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2016-10-03 09:53:18.250 +0900 [INFO] (0001:transaction): SQL: SET search_path TO "public"
2016-10-03 09:53:18.251 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-10-03 09:53:18.252 +0900 [INFO] (0001:transaction): Using truncate_insert mode
2016-10-03 09:53:18.280 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "sales_f1abfc2adb11c0_bl_tmp000"
2016-10-03 09:53:18.283 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-10-03 09:53:18.295 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "sales_f1abfc2adb11c0_bl_tmp000" ("salesid" INT4, "pricepaid" INT4, "saletime" TIMESTAMP)
2016-10-03 09:53:18.300 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-10-03 09:53:18.303 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "sales_f1abfc2adb11c0_bl_tmp001"
2016-10-03 09:53:18.304 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-10-03 09:53:18.306 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "sales_f1abfc2adb11c0_bl_tmp001" ("salesid" INT4, "pricepaid" INT4, "saletime" TIMESTAMP)
2016-10-03 09:53:18.308 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-10-03 09:53:18.308 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "sales_f1abfc2adb11c0_bl_tmp002"
2016-10-03 09:53:18.309 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-10-03 09:53:18.313 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "sales_f1abfc2adb11c0_bl_tmp002" ("salesid" INT4, "pricepaid" INT4, "saletime" TIMESTAMP)
2016-10-03 09:53:18.314 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-10-03 09:53:18.315 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "sales_f1abfc2adb11c0_bl_tmp003"
2016-10-03 09:53:18.316 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-10-03 09:53:18.318 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "sales_f1abfc2adb11c0_bl_tmp003" ("salesid" INT4, "pricepaid" INT4, "saletime" TIMESTAMP)
2016-10-03 09:53:18.320 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-10-03 09:53:18.362 +0900 [INFO] (0001:transaction): {done:  0 / 1, running: 0}
2016-10-03 09:53:18.400 +0900 [INFO] (0017:task-0000): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=oya.akihisa, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2016-10-03 09:53:18.422 +0900 [INFO] (0017:task-0000): SQL: SET search_path TO "public"
2016-10-03 09:53:18.423 +0900 [INFO] (0017:task-0000): > 0.00 seconds
2016-10-03 09:53:18.424 +0900 [INFO] (0017:task-0000): Copy SQL: COPY "sales_f1abfc2adb11c0_bl_tmp000" ("salesid", "pricepaid", "saletime") FROM STDIN
2016-10-03 09:53:18.428 +0900 [INFO] (0017:task-0000): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=oya.akihisa, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2016-10-03 09:53:18.440 +0900 [INFO] (0017:task-0000): SQL: SET search_path TO "public"
2016-10-03 09:53:18.441 +0900 [INFO] (0017:task-0000): > 0.00 seconds
2016-10-03 09:53:18.441 +0900 [INFO] (0017:task-0000): Copy SQL: COPY "sales_f1abfc2adb11c0_bl_tmp001" ("salesid", "pricepaid", "saletime") FROM STDIN
2016-10-03 09:53:18.443 +0900 [INFO] (0017:task-0000): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=oya.akihisa, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2016-10-03 09:53:18.457 +0900 [INFO] (0017:task-0000): SQL: SET search_path TO "public"
2016-10-03 09:53:18.458 +0900 [INFO] (0017:task-0000): > 0.00 seconds
2016-10-03 09:53:18.458 +0900 [INFO] (0017:task-0000): Copy SQL: COPY "sales_f1abfc2adb11c0_bl_tmp002" ("salesid", "pricepaid", "saletime") FROM STDIN
2016-10-03 09:53:18.459 +0900 [INFO] (0017:task-0000): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=oya.akihisa, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2016-10-03 09:53:18.474 +0900 [INFO] (0017:task-0000): SQL: SET search_path TO "public"
2016-10-03 09:53:18.475 +0900 [INFO] (0017:task-0000): > 0.00 seconds
2016-10-03 09:53:18.476 +0900 [INFO] (0017:task-0000): Copy SQL: COPY "sales_f1abfc2adb11c0_bl_tmp003" ("salesid", "pricepaid", "saletime") FROM STDIN
2016-10-03 09:53:19.268 +0900 [INFO] (0017:task-0000): Loading 3 rows (117 bytes)
2016-10-03 09:53:19.273 +0900 [INFO] (0017:task-0000): > 0.01 seconds (loaded 3 rows in total)
2016-10-03 09:53:19.281 +0900 [INFO] (0001:transaction): {done:  1 / 1, running: 0}
2016-10-03 09:53:19.282 +0900 [INFO] (0001:transaction): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=oya.akihisa, tcpKeepAlive=true, loginTimeout=300, socketTimeout=28800}
2016-10-03 09:53:19.292 +0900 [INFO] (0001:transaction): SQL: SET search_path TO "public"
2016-10-03 09:53:19.293 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-10-03 09:53:19.294 +0900 [INFO] (0001:transaction): SQL: DELETE FROM "sales"
2016-10-03 09:53:19.297 +0900 [INFO] (0001:transaction): > 0.00 seconds (3 rows)
2016-10-03 09:53:19.297 +0900 [INFO] (0001:transaction): SQL: INSERT INTO "sales" ("salesid", "pricepaid", "saletime") SELECT "salesid", "pricepaid", "saletime" FROM "sales_f1abfc2adb11c0_bl_tmp000" UNION ALL SELECT "salesid", "pricepaid", "saletime" FROM "sales_f1abfc2adb11c0_bl_tmp001" UNION ALL SELECT "salesid", "pricepaid", "saletime" FROM "sales_f1abfc2adb11c0_bl_tmp002" UNION ALL SELECT "salesid", "pricepaid", "saletime" FROM "sales_f1abfc2adb11c0_bl_tmp003"
2016-10-03 09:53:19.300 +0900 [INFO] (0001:transaction): > 0.00 seconds (3 rows)
2016-10-03 09:53:19.311 +0900 [INFO] (0001:transaction): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=oya.akihisa, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2016-10-03 09:53:19.321 +0900 [INFO] (0001:transaction): SQL: SET search_path TO "public"
2016-10-03 09:53:19.323 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-10-03 09:53:19.323 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "sales_f1abfc2adb11c0_bl_tmp000"
2016-10-03 09:53:19.327 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-10-03 09:53:19.328 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "sales_f1abfc2adb11c0_bl_tmp001"
2016-10-03 09:53:19.330 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-10-03 09:53:19.330 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "sales_f1abfc2adb11c0_bl_tmp002"
2016-10-03 09:53:19.332 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-10-03 09:53:19.332 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "sales_f1abfc2adb11c0_bl_tmp003"
2016-10-03 09:53:19.335 +0900 [INFO] (0001:transaction): > 0.00 seconds
2016-10-03 09:53:19.338 +0900 [INFO] (main): Committed.
2016-10-03 09:53:19.338 +0900 [INFO] (main): Next config diff: {"in":{"last_path":"/Users/oya.akihisa/work/excelload/embulk/testdata.xlsx"},"out":{}}

Success. Task state is saved at /Users/oya.akihisa/work/excelload/.digdag/status/20161003T000000+0000 directory.
  * Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
  * Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.

Success.と出力されました。うまくいったようです。

スクリーンショット 2016-09-30 19.07.40

DBにもちゃんとデータが入っています。

まとめ

ExcelのデータをEmbulk、Digdagを使ってDBにロードすることができました。
Embulk、Digdagを使うのは今回が初めてだったのですが、順調に使っていくことができました。
EmbulkはDBへのデータロードやデータ加工処理に、Digdagはバッチジョブ管理に、と便利に使えそうです。これから使いこなしていきたいです。

  • hiroyuki sato

    embulk-parser-roo-excelの著者です。ご利用ありがとうございます。
    作った当人がいうのもなんですが、Excelデータを読む場合は、
    embulk-parser-poi_excelもオススメです。
    (1) 高機能 (2) Javaで書かれているので高速
    http://www.ne.jp/asahi/hishidama/home/tech/embulk/parser-poi_excel.html

    • oya.akihisa

      コメントありがとうございます。また情報ありがとうございます。
      そうですね。必要になったらこちらも使ってみます。