Embulk+DigdagでExcelのデータをPostgreSQLにロードする
データインテグレーション部 大矢です。
DBのテストデータをExcelで書くことはよくあると思います。
今回はExcelで書いたデータをEmbulkでDBにロードする、ということをやってみようと思います。
使用した環境は以下のとおりです。
- Embulk v0.8.13
- Digdag v0.8.16
- PostgreSQL 9.5.3
- Mac OS X El Capitan version 10.11.6
ExcelのデータをPostgreSQLにロードする
作業ディレクトリ
先に今回の作業で使用するディレクトリ構成を示しておきます。最終的にはこうなります。
excelload ├── allsheet.dig └── embulk ├── sales.yml ├── testdata.xlsx └── users.yml
Excelのテストデータ
ロードするデータ Excelのテストデータシートはこのように1シートに1テーブルのデータを書きます。
シート名はテーブル名にするなど、分かりやすくしておきます。
DBにロード先のテーブルを作ります。
create table users( userid integer not null primary key, username char(8), city varchar(30) );
Embulkのインストール
$ curl --create-dirs-o ~/.embulk/bin/embulk-L "http://dl.embulk.org/embulk-latest.jar" $ chmod+x ~/.embulk/bin/embulk $ echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc $ source ~/.bashrc
Embulkプラグインのインストール
Embulkはインプット/アウトプットしたいものに応じたプラグインがたくさんありますが、 今回は以下のものを使用します。
embulk-parser-roo-excel | Excelファイルを読み込むプラグイン |
embulk-output-postgresql | PostgreSQLのテーブルへデータを出力するJDBCプラグイン |
$ embulk gem install embulk-parser-roo-excel $ embulk gem install embulk-output-postgresql
Embulkの設定ファイル
roo-excelプラグインは今のところguess機能非サポートのようですので、Embulkの設定ファイルは自力で書くことになります。
以下のようなファイルを作成し、users.ymlとして保存。
users.yml
in: type: file path_prefix: /Users/oya.akihisa/work/excelload/embulk/testdata.xlsx parser: type: roo-excel # 使用するパーサープラグイン skip_header_lines: 1 # ヘッダー行の数 sheet: "users" # シート名 columns: # 列名と型 - {name: userid, type: long} - {name: username, type: string} - {name: city, type: string} out: type: postgresql host: localhost user: oya.akihisa password: "" database: postgres table: users # テーブル名 mode: truncate_insert # 全行消してからinsert
columsで使用できるtype
columsで使用できる、Embulkでサポートされているtypeは以下の通りです。
型 | 説明 |
---|---|
boolean | 真偽値 |
long | 整数型 |
timestamp | 時刻 |
double | 浮動小数点 |
string | 文字列 |
mode
テストデータは消して入れなおすことを想定しているので、ここではtruncate_insertを使用しています。
他にもinsert, insert_direct, replace, mergeがあるようです。
Embulk実行
作成した設定ファイルをパラメータにして、runコマンドを実行。
$ embulk run users.yml 2016-09-30 18:21:57.526 +0900: Embulk v0.8.13 2016-09-30 18:21:59.935 +0900 [INFO] (0001:transaction): Loaded plugin embulk-output-postgresql (0.6.3) 2016-09-30 18:22:01.182 +0900 [INFO] (0001:transaction): Loaded plugin embulk-parser-roo-excel (0.0.1) 2016-09-30 18:22:01.219 +0900 [INFO] (0001:transaction): Listing local files at directory '/Users/oya.akihisa/work/excelload/embulk' filtering filename by prefix 'testdata.xlsx' 2016-09-30 18:22:01.235 +0900 [INFO] (0001:transaction): Loading files [/Users/oya.akihisa/work/excelload/embulk/testdata.xlsx] 2016-09-30 18:22:01.285 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / output tasks 4 = input tasks 1 * 4 2016-09-30 18:22:01.319 +0900 [INFO] (0001:transaction): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=oya.akihisa, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800} 2016-09-30 18:22:01.508 +0900 [INFO] (0001:transaction): SQL: SET search_path TO "public" 2016-09-30 18:22:01.511 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-09-30 18:22:01.511 +0900 [INFO] (0001:transaction): Using truncate_insert mode 2016-09-30 18:22:01.560 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "users_ee2eb72c770e80_bl_tmp000" 2016-09-30 18:22:01.562 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-09-30 18:22:01.576 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "users_ee2eb72c770e80_bl_tmp000" ("userid" INT4, "username" BPCHAR, "city" VARCHAR(30)) 2016-09-30 18:22:01.607 +0900 [INFO] (0001:transaction): > 0.03 seconds 2016-09-30 18:22:01.609 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "users_ee2eb72c770e80_bl_tmp001" 2016-09-30 18:22:01.610 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-09-30 18:22:01.613 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "users_ee2eb72c770e80_bl_tmp001" ("userid" INT4, "username" BPCHAR, "city" VARCHAR(30)) 2016-09-30 18:22:01.617 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-09-30 18:22:01.618 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "users_ee2eb72c770e80_bl_tmp002" 2016-09-30 18:22:01.619 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-09-30 18:22:01.621 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "users_ee2eb72c770e80_bl_tmp002" ("userid" INT4, "username" BPCHAR, "city" VARCHAR(30)) 2016-09-30 18:22:01.626 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-09-30 18:22:01.627 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "users_ee2eb72c770e80_bl_tmp003" 2016-09-30 18:22:01.627 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-09-30 18:22:01.629 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "users_ee2eb72c770e80_bl_tmp003" ("userid" INT4, "username" BPCHAR, "city" VARCHAR(30)) 2016-09-30 18:22:01.633 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-09-30 18:22:01.686 +0900 [INFO] (0001:transaction): {done: 0 / 1, running: 0} 2016-09-30 18:22:01.737 +0900 [INFO] (0017:task-0000): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=oya.akihisa, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800} 2016-09-30 18:22:01.752 +0900 [INFO] (0017:task-0000): SQL: SET search_path TO "public" 2016-09-30 18:22:01.753 +0900 [INFO] (0017:task-0000): > 0.00 seconds 2016-09-30 18:22:01.754 +0900 [INFO] (0017:task-0000): Copy SQL: COPY "users_ee2eb72c770e80_bl_tmp000" ("userid", "username", "city") FROM STDIN 2016-09-30 18:22:01.759 +0900 [INFO] (0017:task-0000): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=oya.akihisa, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800} 2016-09-30 18:22:01.771 +0900 [INFO] (0017:task-0000): SQL: SET search_path TO "public" 2016-09-30 18:22:01.772 +0900 [INFO] (0017:task-0000): > 0.00 seconds 2016-09-30 18:22:01.772 +0900 [INFO] (0017:task-0000): Copy SQL: COPY "users_ee2eb72c770e80_bl_tmp001" ("userid", "username", "city") FROM STDIN 2016-09-30 18:22:01.775 +0900 [INFO] (0017:task-0000): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=oya.akihisa, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800} 2016-09-30 18:22:01.787 +0900 [INFO] (0017:task-0000): SQL: SET search_path TO "public" 2016-09-30 18:22:01.788 +0900 [INFO] (0017:task-0000): > 0.00 seconds 2016-09-30 18:22:01.788 +0900 [INFO] (0017:task-0000): Copy SQL: COPY "users_ee2eb72c770e80_bl_tmp002" ("userid", "username", "city") FROM STDIN 2016-09-30 18:22:01.789 +0900 [INFO] (0017:task-0000): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=oya.akihisa, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800} 2016-09-30 18:22:01.800 +0900 [INFO] (0017:task-0000): SQL: SET search_path TO "public" 2016-09-30 18:22:01.801 +0900 [INFO] (0017:task-0000): > 0.00 seconds 2016-09-30 18:22:01.801 +0900 [INFO] (0017:task-0000): Copy SQL: COPY "users_ee2eb72c770e80_bl_tmp003" ("userid", "username", "city") FROM STDIN 2016-09-30 18:22:02.698 +0900 [INFO] (0017:task-0000): Loading 3 rows (41 bytes) 2016-09-30 18:22:02.703 +0900 [INFO] (0017:task-0000): > 0.00 seconds (loaded 3 rows in total) 2016-09-30 18:22:02.707 +0900 [INFO] (0001:transaction): {done: 1 / 1, running: 0} 2016-09-30 18:22:02.708 +0900 [INFO] (0001:transaction): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=oya.akihisa, tcpKeepAlive=true, loginTimeout=300, socketTimeout=28800} 2016-09-30 18:22:02.719 +0900 [INFO] (0001:transaction): SQL: SET search_path TO "public" 2016-09-30 18:22:02.720 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-09-30 18:22:02.721 +0900 [INFO] (0001:transaction): SQL: DELETE FROM "users" 2016-09-30 18:22:02.723 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-09-30 18:22:02.723 +0900 [INFO] (0001:transaction): SQL: INSERT INTO "users" ("userid", "username", "city") SELECT "userid", "username", "city" FROM "users_ee2eb72c770e80_bl_tmp000" UNION ALL SELECT "userid", "username", "city" FROM "users_ee2eb72c770e80_bl_tmp001" UNION ALL SELECT "userid", "username", "city" FROM "users_ee2eb72c770e80_bl_tmp002" UNION ALL SELECT "userid", "username", "city" FROM "users_ee2eb72c770e80_bl_tmp003" 2016-09-30 18:22:02.727 +0900 [INFO] (0001:transaction): > 0.00 seconds (3 rows) 2016-09-30 18:22:02.742 +0900 [INFO] (0001:transaction): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=oya.akihisa, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800} 2016-09-30 18:22:02.752 +0900 [INFO] (0001:transaction): SQL: SET search_path TO "public" 2016-09-30 18:22:02.754 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-09-30 18:22:02.754 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "users_ee2eb72c770e80_bl_tmp000" 2016-09-30 18:22:02.762 +0900 [INFO] (0001:transaction): > 0.01 seconds 2016-09-30 18:22:02.762 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "users_ee2eb72c770e80_bl_tmp001" 2016-09-30 18:22:02.766 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-09-30 18:22:02.766 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "users_ee2eb72c770e80_bl_tmp002" 2016-09-30 18:22:02.769 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-09-30 18:22:02.769 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "users_ee2eb72c770e80_bl_tmp003" 2016-09-30 18:22:02.773 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-09-30 18:22:02.776 +0900 [INFO] (main): Committed. 2016-09-30 18:22:02.776 +0900 [INFO] (main): Next config diff: {"in":{"last_path":"/Users/oya.akihisa/work/excelload/embulk/testdata.xlsx"},"out":{}}
ログを見ると、ただINSERTしてるだけ、ではなさそうですね。
Embulkの処理には、トランザクションステージとタスクステージというものがあります。
...(0001:transaction)...の行はトランザクションステージで、書き込むための作業テーブルの準備を行っています。
その後の...(0017:task-0000)...の行はタスクステージで、作業テーブルへの書き込みを行っています。
タスクステージが終わるとまたトランザクションステージになり、ターゲットテーブルへのINSERT、作業テーブルの削除などが行われています。
そして最後にCommitが行われています。
無事DBにデータが入ったようです。漢字も大丈夫そうです。
複数シートのロード
先ほどは1シートで1テーブルのデータをロードしましたが、複数テーブルのデータをロードするにはどうしたらよいでしょうか。
ここでDigdagを使ってみます。
Digdagを使って、Embulkを複数回実行させるようにします。
2つ目のテーブルとテストデータ
2つ目のテーブルとテストデータを用意。
create table sales( salesid integer not null primary key, pricepaid integer, saletime timestamp );
salesテーブル用のEmbulkの設定ファイル
salesテーブル用の設定ファイルを作成します。
ファイル名をsales.ymlとして保存。
sales.yml
in: type: file path_prefix: /Users/oya.akihisa/work/excelload/embulk/testdata.xlsx parser: type: roo-excel skip_header_lines: 1 sheet: "sales" columns: - {name: salesid, type: long} - {name: pricepaid, type: long} - {name: saletime, type: timestamp} out: type: postgresql host: localhost user: oya.akihisa password: "" database: postgres table: sales mode: truncate_insert default_timezone: "Japan"
default_timezone
"Japan"としています。これを設定しないと時刻データがUTCとなります。
Digdagのインストール
$ curl -o ~/bin/digdag--create-dirs-L "https://dl.digdag.io/digdag-latest" $ chmod+x ~/bin/digdag $ echo 'export PATH="$HOME/bin:$PATH"' >> ~/.bashrc
Digdagの設定ファイル
allsheet.dig
+all_sheet: for_each>: config: [embulk/users.yml, embulk/sales.yml] _do: embulk>: ${config}
Digdagの設定ファイルではさまざまなオペレータが使用できますが、ここでは繰り返し処理のためのfor_eachオペレーターを使います。
上記のようにシート名を配列にしてembulkオペレーターに渡すと、要素数分繰り返し処理してくれます。
Digdag実行
作成した設定ファイルをパラメータにして、runコマンドを実行。
$ digdag run allsheet.dig 2016-10-03 09:53:00 +0900: Digdag v0.8.16 2016-10-03 09:53:01 +0900 [WARN] (main): Using a new session time 2016-10-03T00:00:00+00:00. 2016-10-03 09:53:01 +0900 [INFO] (main): Using session /Users/oya.akihisa/work/excelload/.digdag/status/20161003T000000+0000. 2016-10-03 09:53:01 +0900 [INFO] (main): Starting a new session project id=1 workflow name=allsheet session_time=2016-10-03T00:00:00+00:00 2016-10-03 09:53:01 +0900 [INFO] (0017@+allsheet+all_sheet): for_each>: {config=[embulk/users.yml, embulk/sales.yml]} 2016-10-03 09:53:01 +0900 [INFO] (0017@+allsheet+all_sheet^sub+for-config=embulk%2Fusers.yml): embulk>: embulk/users.yml 2016-10-03 09:53:06.041 +0900: Embulk v0.8.13 2016-10-03 09:53:07.963 +0900 [INFO] (0001:transaction): Loaded plugin embulk-output-postgresql (0.6.3) 2016-10-03 09:53:09.210 +0900 [INFO] (0001:transaction): Loaded plugin embulk-parser-roo-excel (0.0.1) 2016-10-03 09:53:09.239 +0900 [INFO] (0001:transaction): Listing local files at directory '/Users/oya.akihisa/work/excelload/embulk' filtering filename by prefix 'testdata.xlsx' 2016-10-03 09:53:09.246 +0900 [INFO] (0001:transaction): Loading files [/Users/oya.akihisa/work/excelload/embulk/testdata.xlsx] 2016-10-03 09:53:09.303 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / output tasks 4 = input tasks 1 * 4 2016-10-03 09:53:09.342 +0900 [INFO] (0001:transaction): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=oya.akihisa, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800} 2016-10-03 09:53:09.453 +0900 [INFO] (0001:transaction): SQL: SET search_path TO "public" 2016-10-03 09:53:09.456 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-10-03 09:53:09.456 +0900 [INFO] (0001:transaction): Using truncate_insert mode 2016-10-03 09:53:09.504 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "users_f1abf330b26e40_bl_tmp000" 2016-10-03 09:53:09.507 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-10-03 09:53:09.524 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "users_f1abf330b26e40_bl_tmp000" ("userid" INT4, "username" BPCHAR, "city" VARCHAR(30)) 2016-10-03 09:53:09.535 +0900 [INFO] (0001:transaction): > 0.01 seconds 2016-10-03 09:53:09.537 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "users_f1abf330b26e40_bl_tmp001" 2016-10-03 09:53:09.538 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-10-03 09:53:09.540 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "users_f1abf330b26e40_bl_tmp001" ("userid" INT4, "username" BPCHAR, "city" VARCHAR(30)) 2016-10-03 09:53:09.551 +0900 [INFO] (0001:transaction): > 0.01 seconds 2016-10-03 09:53:09.552 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "users_f1abf330b26e40_bl_tmp002" 2016-10-03 09:53:09.552 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-10-03 09:53:09.557 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "users_f1abf330b26e40_bl_tmp002" ("userid" INT4, "username" BPCHAR, "city" VARCHAR(30)) 2016-10-03 09:53:09.561 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-10-03 09:53:09.562 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "users_f1abf330b26e40_bl_tmp003" 2016-10-03 09:53:09.563 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-10-03 09:53:09.565 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "users_f1abf330b26e40_bl_tmp003" ("userid" INT4, "username" BPCHAR, "city" VARCHAR(30)) 2016-10-03 09:53:09.569 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-10-03 09:53:09.613 +0900 [INFO] (0001:transaction): {done: 0 / 1, running: 0} 2016-10-03 09:53:09.656 +0900 [INFO] (0017:task-0000): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=oya.akihisa, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800} 2016-10-03 09:53:09.670 +0900 [INFO] (0017:task-0000): SQL: SET search_path TO "public" 2016-10-03 09:53:09.671 +0900 [INFO] (0017:task-0000): > 0.00 seconds 2016-10-03 09:53:09.673 +0900 [INFO] (0017:task-0000): Copy SQL: COPY "users_f1abf330b26e40_bl_tmp000" ("userid", "username", "city") FROM STDIN 2016-10-03 09:53:09.676 +0900 [INFO] (0017:task-0000): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=oya.akihisa, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800} 2016-10-03 09:53:09.686 +0900 [INFO] (0017:task-0000): SQL: SET search_path TO "public" 2016-10-03 09:53:09.688 +0900 [INFO] (0017:task-0000): > 0.00 seconds 2016-10-03 09:53:09.689 +0900 [INFO] (0017:task-0000): Copy SQL: COPY "users_f1abf330b26e40_bl_tmp001" ("userid", "username", "city") FROM STDIN 2016-10-03 09:53:09.690 +0900 [INFO] (0017:task-0000): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=oya.akihisa, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800} 2016-10-03 09:53:09.700 +0900 [INFO] (0017:task-0000): SQL: SET search_path TO "public" 2016-10-03 09:53:09.700 +0900 [INFO] (0017:task-0000): > 0.00 seconds 2016-10-03 09:53:09.701 +0900 [INFO] (0017:task-0000): Copy SQL: COPY "users_f1abf330b26e40_bl_tmp002" ("userid", "username", "city") FROM STDIN 2016-10-03 09:53:09.701 +0900 [INFO] (0017:task-0000): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=oya.akihisa, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800} 2016-10-03 09:53:09.712 +0900 [INFO] (0017:task-0000): SQL: SET search_path TO "public" 2016-10-03 09:53:09.713 +0900 [INFO] (0017:task-0000): > 0.00 seconds 2016-10-03 09:53:09.713 +0900 [INFO] (0017:task-0000): Copy SQL: COPY "users_f1abf330b26e40_bl_tmp003" ("userid", "username", "city") FROM STDIN 2016-10-03 09:53:10.502 +0900 [INFO] (0017:task-0000): Loading 3 rows (41 bytes) 2016-10-03 09:53:10.522 +0900 [INFO] (0017:task-0000): > 0.01 seconds (loaded 3 rows in total) 2016-10-03 09:53:10.527 +0900 [INFO] (0001:transaction): {done: 1 / 1, running: 0} 2016-10-03 09:53:10.528 +0900 [INFO] (0001:transaction): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=oya.akihisa, tcpKeepAlive=true, loginTimeout=300, socketTimeout=28800} 2016-10-03 09:53:10.537 +0900 [INFO] (0001:transaction): SQL: SET search_path TO "public" 2016-10-03 09:53:10.538 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-10-03 09:53:10.539 +0900 [INFO] (0001:transaction): SQL: DELETE FROM "users" 2016-10-03 09:53:10.542 +0900 [INFO] (0001:transaction): > 0.00 seconds (3 rows) 2016-10-03 09:53:10.542 +0900 [INFO] (0001:transaction): SQL: INSERT INTO "users" ("userid", "username", "city") SELECT "userid", "username", "city" FROM "users_f1abf330b26e40_bl_tmp000" UNION ALL SELECT "userid", "username", "city" FROM "users_f1abf330b26e40_bl_tmp001" UNION ALL SELECT "userid", "username", "city" FROM "users_f1abf330b26e40_bl_tmp002" UNION ALL SELECT "userid", "username", "city" FROM "users_f1abf330b26e40_bl_tmp003" 2016-10-03 09:53:10.547 +0900 [INFO] (0001:transaction): > 0.00 seconds (3 rows) 2016-10-03 09:53:10.565 +0900 [INFO] (0001:transaction): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=oya.akihisa, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800} 2016-10-03 09:53:10.575 +0900 [INFO] (0001:transaction): SQL: SET search_path TO "public" 2016-10-03 09:53:10.576 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-10-03 09:53:10.576 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "users_f1abf330b26e40_bl_tmp000" 2016-10-03 09:53:10.582 +0900 [INFO] (0001:transaction): > 0.01 seconds 2016-10-03 09:53:10.582 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "users_f1abf330b26e40_bl_tmp001" 2016-10-03 09:53:10.586 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-10-03 09:53:10.586 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "users_f1abf330b26e40_bl_tmp002" 2016-10-03 09:53:10.590 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-10-03 09:53:10.590 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "users_f1abf330b26e40_bl_tmp003" 2016-10-03 09:53:10.593 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-10-03 09:53:10.595 +0900 [INFO] (main): Committed. 2016-10-03 09:53:10.596 +0900 [INFO] (main): Next config diff: {"in":{"last_path":"/Users/oya.akihisa/work/excelload/embulk/testdata.xlsx"},"out":{}} 2016-10-03 09:53:10 +0900 [INFO] (0017@+allsheet+all_sheet^sub+for-config=embulk%2Fsales.yml): embulk>: embulk/sales.yml 2016-10-03 09:53:14.733 +0900: Embulk v0.8.13 2016-10-03 09:53:16.858 +0900 [INFO] (0001:transaction): Loaded plugin embulk-output-postgresql (0.6.3) 2016-10-03 09:53:17.996 +0900 [INFO] (0001:transaction): Loaded plugin embulk-parser-roo-excel (0.0.1) 2016-10-03 09:53:18.024 +0900 [INFO] (0001:transaction): Listing local files at directory '/Users/oya.akihisa/work/excelload/embulk' filtering filename by prefix 'testdata.xlsx' 2016-10-03 09:53:18.031 +0900 [INFO] (0001:transaction): Loading files [/Users/oya.akihisa/work/excelload/embulk/testdata.xlsx] 2016-10-03 09:53:18.085 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / output tasks 4 = input tasks 1 * 4 2016-10-03 09:53:18.122 +0900 [INFO] (0001:transaction): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=oya.akihisa, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800} 2016-10-03 09:53:18.250 +0900 [INFO] (0001:transaction): SQL: SET search_path TO "public" 2016-10-03 09:53:18.251 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-10-03 09:53:18.252 +0900 [INFO] (0001:transaction): Using truncate_insert mode 2016-10-03 09:53:18.280 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "sales_f1abfc2adb11c0_bl_tmp000" 2016-10-03 09:53:18.283 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-10-03 09:53:18.295 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "sales_f1abfc2adb11c0_bl_tmp000" ("salesid" INT4, "pricepaid" INT4, "saletime" TIMESTAMP) 2016-10-03 09:53:18.300 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-10-03 09:53:18.303 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "sales_f1abfc2adb11c0_bl_tmp001" 2016-10-03 09:53:18.304 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-10-03 09:53:18.306 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "sales_f1abfc2adb11c0_bl_tmp001" ("salesid" INT4, "pricepaid" INT4, "saletime" TIMESTAMP) 2016-10-03 09:53:18.308 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-10-03 09:53:18.308 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "sales_f1abfc2adb11c0_bl_tmp002" 2016-10-03 09:53:18.309 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-10-03 09:53:18.313 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "sales_f1abfc2adb11c0_bl_tmp002" ("salesid" INT4, "pricepaid" INT4, "saletime" TIMESTAMP) 2016-10-03 09:53:18.314 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-10-03 09:53:18.315 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "sales_f1abfc2adb11c0_bl_tmp003" 2016-10-03 09:53:18.316 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-10-03 09:53:18.318 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "sales_f1abfc2adb11c0_bl_tmp003" ("salesid" INT4, "pricepaid" INT4, "saletime" TIMESTAMP) 2016-10-03 09:53:18.320 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-10-03 09:53:18.362 +0900 [INFO] (0001:transaction): {done: 0 / 1, running: 0} 2016-10-03 09:53:18.400 +0900 [INFO] (0017:task-0000): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=oya.akihisa, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800} 2016-10-03 09:53:18.422 +0900 [INFO] (0017:task-0000): SQL: SET search_path TO "public" 2016-10-03 09:53:18.423 +0900 [INFO] (0017:task-0000): > 0.00 seconds 2016-10-03 09:53:18.424 +0900 [INFO] (0017:task-0000): Copy SQL: COPY "sales_f1abfc2adb11c0_bl_tmp000" ("salesid", "pricepaid", "saletime") FROM STDIN 2016-10-03 09:53:18.428 +0900 [INFO] (0017:task-0000): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=oya.akihisa, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800} 2016-10-03 09:53:18.440 +0900 [INFO] (0017:task-0000): SQL: SET search_path TO "public" 2016-10-03 09:53:18.441 +0900 [INFO] (0017:task-0000): > 0.00 seconds 2016-10-03 09:53:18.441 +0900 [INFO] (0017:task-0000): Copy SQL: COPY "sales_f1abfc2adb11c0_bl_tmp001" ("salesid", "pricepaid", "saletime") FROM STDIN 2016-10-03 09:53:18.443 +0900 [INFO] (0017:task-0000): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=oya.akihisa, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800} 2016-10-03 09:53:18.457 +0900 [INFO] (0017:task-0000): SQL: SET search_path TO "public" 2016-10-03 09:53:18.458 +0900 [INFO] (0017:task-0000): > 0.00 seconds 2016-10-03 09:53:18.458 +0900 [INFO] (0017:task-0000): Copy SQL: COPY "sales_f1abfc2adb11c0_bl_tmp002" ("salesid", "pricepaid", "saletime") FROM STDIN 2016-10-03 09:53:18.459 +0900 [INFO] (0017:task-0000): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=oya.akihisa, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800} 2016-10-03 09:53:18.474 +0900 [INFO] (0017:task-0000): SQL: SET search_path TO "public" 2016-10-03 09:53:18.475 +0900 [INFO] (0017:task-0000): > 0.00 seconds 2016-10-03 09:53:18.476 +0900 [INFO] (0017:task-0000): Copy SQL: COPY "sales_f1abfc2adb11c0_bl_tmp003" ("salesid", "pricepaid", "saletime") FROM STDIN 2016-10-03 09:53:19.268 +0900 [INFO] (0017:task-0000): Loading 3 rows (117 bytes) 2016-10-03 09:53:19.273 +0900 [INFO] (0017:task-0000): > 0.01 seconds (loaded 3 rows in total) 2016-10-03 09:53:19.281 +0900 [INFO] (0001:transaction): {done: 1 / 1, running: 0} 2016-10-03 09:53:19.282 +0900 [INFO] (0001:transaction): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=oya.akihisa, tcpKeepAlive=true, loginTimeout=300, socketTimeout=28800} 2016-10-03 09:53:19.292 +0900 [INFO] (0001:transaction): SQL: SET search_path TO "public" 2016-10-03 09:53:19.293 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-10-03 09:53:19.294 +0900 [INFO] (0001:transaction): SQL: DELETE FROM "sales" 2016-10-03 09:53:19.297 +0900 [INFO] (0001:transaction): > 0.00 seconds (3 rows) 2016-10-03 09:53:19.297 +0900 [INFO] (0001:transaction): SQL: INSERT INTO "sales" ("salesid", "pricepaid", "saletime") SELECT "salesid", "pricepaid", "saletime" FROM "sales_f1abfc2adb11c0_bl_tmp000" UNION ALL SELECT "salesid", "pricepaid", "saletime" FROM "sales_f1abfc2adb11c0_bl_tmp001" UNION ALL SELECT "salesid", "pricepaid", "saletime" FROM "sales_f1abfc2adb11c0_bl_tmp002" UNION ALL SELECT "salesid", "pricepaid", "saletime" FROM "sales_f1abfc2adb11c0_bl_tmp003" 2016-10-03 09:53:19.300 +0900 [INFO] (0001:transaction): > 0.00 seconds (3 rows) 2016-10-03 09:53:19.311 +0900 [INFO] (0001:transaction): Connecting to jdbc:postgresql://localhost:5432/postgres options {user=oya.akihisa, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800} 2016-10-03 09:53:19.321 +0900 [INFO] (0001:transaction): SQL: SET search_path TO "public" 2016-10-03 09:53:19.323 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-10-03 09:53:19.323 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "sales_f1abfc2adb11c0_bl_tmp000" 2016-10-03 09:53:19.327 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-10-03 09:53:19.328 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "sales_f1abfc2adb11c0_bl_tmp001" 2016-10-03 09:53:19.330 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-10-03 09:53:19.330 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "sales_f1abfc2adb11c0_bl_tmp002" 2016-10-03 09:53:19.332 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-10-03 09:53:19.332 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "sales_f1abfc2adb11c0_bl_tmp003" 2016-10-03 09:53:19.335 +0900 [INFO] (0001:transaction): > 0.00 seconds 2016-10-03 09:53:19.338 +0900 [INFO] (main): Committed. 2016-10-03 09:53:19.338 +0900 [INFO] (main): Next config diff: {"in":{"last_path":"/Users/oya.akihisa/work/excelload/embulk/testdata.xlsx"},"out":{}} Success. Task state is saved at /Users/oya.akihisa/work/excelload/.digdag/status/20161003T000000+0000 directory. * Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time. * Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.
Success.と出力されました。うまくいったようです。
DBにもちゃんとデータが入っています。
まとめ
ExcelのデータをEmbulk、Digdagを使ってDBにロードすることができました。
Embulk、Digdagを使うのは今回が初めてだったのですが、順調に使っていくことができました。
EmbulkはDBへのデータロードやデータ加工処理に、Digdagはバッチジョブ管理に、と便利に使えそうです。これから使いこなしていきたいです。