[Embulk] guess機能を試してみた (テーブルが存在する場合)

2016.05.30

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、最近Embulk(エンバルク)を調べている川崎です。

今回も、読み込んだデータファイルのレイアウトを推測する「guess」という機能を使います。 これまでは、データのロード時に、新規のテーブルを自動作成していましたが、今回は既存のテーブルが存在する場合を試してみます。

データファイルの推測結果と、既存のテーブルの列構成が同じ場合は、検証してもあまり意味がありませんので、

  • テーブルの列のデータ型が、推測したデータ型と違う場合
  • 既存のテーブルの列が少ない場合

を試してみます。

試してみる

これまで、自動生成された新規のテーブルの列構成は、下記の通りでした。

CREATE TABLE station
(
station_cd bigint,
station_g_cd bigint,
station_name varchar(65535),
station_name_k varchar(65535),
station_name_r varchar(65535),
line_cd bigint,
pref_cd bigint,
post varchar(65535),
add varchar(65535),
lon varchar(65535),
lat varchar(65535),
open_ymd timestamp,
close_ymd timestamp,
e_status bigint,
e_sort bigint
);

lon列とlat列(経度と緯度)は、文字型と推測されていますが、データファイルの中身を確認すると、本来は浮動小数点型の値になっています。そこで、本来の浮動小数点型である「double precision」型に変更します。また、最後のe_sort列を削除してみます。

このような小規模な変更を行っても、これまで同様、スムーズにデータのロードができるのか、検証してみます。

一旦、テーブルをdropしてから、再度下記のCREATE TABLE文を実行します。

CREATE TABLE station
(
station_cd bigint,
station_g_cd bigint,
station_name varchar(65535),
station_name_k varchar(65535),
station_name_r varchar(65535),
line_cd bigint,
pref_cd bigint,
post varchar(65535),
add varchar(65535),
lon double precision,
lat double precision,
open_ymd timestamp,
close_ymd timestamp,
e_status bigint
);

それでは、guessコマンドに引き続いて、runコマンドを実行してみます。

guess機能に対する設定を、以下のようにguess.ymlファイルに記述しておきます。(テキストエディタで作成します)データファイルは、前回同様「駅データ」を使います。

$ cat guess.yml
in:
  type: file
  path_prefix: "./station20160401free.csv"
out:
  type: redshift
  host: xxxx.xxxx.ap-northeast-1.redshift.amazonaws.com
  user: demomaster
  password: xxxx
  database: demo
  table: station
  access_key_id: XXXXXXXX 
  secret_access_key:XXXXXXXXXXXXXXXX
  iam_user_name: xxxxx
  s3_bucket: my-redshift-transfer-bucket
  s3_key_prefix: temp/redshift
  mode: insert

guessコマンドを実行すると、推定した結果がconfig.ymlファイルに保存されます。

$ embulk guess ./guess.yml -o config.yml
2016-05-30 11:31:56.792 +0000: Embulk v0.8.9
2016-05-30 11:31:58.189 +0000 [INFO] (0001:guess): Listing local files at directory '.' filtering filename by prefix 'station20160401free.csv'
2016-05-30 11:31:58.194 +0000 [INFO] (0001:guess): Loading files [station20160401free.csv]
2016-05-30 11:31:58.348 +0000 [INFO] (0001:guess): Loaded plugin embulk/guess/gzip from a load path
2016-05-30 11:31:58.362 +0000 [INFO] (0001:guess): Loaded plugin embulk/guess/bzip2 from a load path
2016-05-30 11:31:58.383 +0000 [INFO] (0001:guess): Loaded plugin embulk/guess/json from a load path
2016-05-30 11:31:58.391 +0000 [INFO] (0001:guess): Loaded plugin embulk/guess/csv from a load path
in:
  type: file
  path_prefix: ./station20160401free.csv
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    escape: '"'
    trim_if_not_quoted: false
    skip_header_lines: 1
    allow_extra_columns: false
    allow_optional_columns: false
    columns:
    - {name: station_cd, type: long}
    - {name: station_g_cd, type: long}
    - {name: station_name, type: string}
    - {name: station_name_k, type: string}
    - {name: station_name_r, type: string}
    - {name: line_cd, type: long}
    - {name: pref_cd, type: long}
    - {name: post, type: string}
    - {name: add, type: string}
    - {name: lon, type: string}
    - {name: lat, type: string}
    - {name: open_ymd, type: timestamp, format: '%Y-%m-%d'}
    - {name: close_ymd, type: timestamp, format: '%Y-%m-%d'}
    - {name: e_status, type: long}
    - {name: e_sort, type: long}
out: {type: redshift, host: xxxx.xxxx.ap-northeast-1.redshift.amazonaws.com,
  user: demomaster, password: xxxx, database: demo, table: station, access_key_id: XXXXXXXX,
  secret_access_key: XXXXXXXXXXXXXXXX, iam_user_name: xxxxx,
  s3_bucket: my-redshift-transfer-bucket, s3_key_prefix: temp/redshift, mode: insert}
Created 'config.yml' file.

config.ymlの内容は、次のようになっています。lon列、lat列は文字型として推測されています。

$ cat config.yml
in:
  type: file
  path_prefix: ./station20160401free.csv
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    escape: '"'
    trim_if_not_quoted: false
    skip_header_lines: 1
    allow_extra_columns: false
    allow_optional_columns: false
    columns:
    - {name: station_cd, type: long}
    - {name: station_g_cd, type: long}
    - {name: station_name, type: string}
    - {name: station_name_k, type: string}
    - {name: station_name_r, type: string}
    - {name: line_cd, type: long}
    - {name: pref_cd, type: long}
    - {name: post, type: string}
    - {name: add, type: string}
    - {name: lon, type: string}
    - {name: lat, type: string}
    - {name: open_ymd, type: timestamp, format: '%Y-%m-%d'}
    - {name: close_ymd, type: timestamp, format: '%Y-%m-%d'}
    - {name: e_status, type: long}
    - {name: e_sort, type: long}
out: {type: redshift, host: xxxx.xxxx.ap-northeast-1.redshift.amazonaws.com,
  user: demomaster, password: xxxx, database: demo, table: station, access_key_id: XXXXXXXX,
  secret_access_key: XXXXXXXXXXXXXXXX, iam_user_name: xxxxx,
  s3_bucket: my-redshift-transfer-bucket, s3_key_prefix: temp/redshift, mode: insert}

データのロードを実行します。

$ embulk run config.yml
2016-05-30 11:36:32.849 +0000: Embulk v0.8.9
2016-05-30 11:36:36.107 +0000 [INFO] (0001:transaction): Loaded plugin embulk-output-redshift (0.6.0)
2016-05-30 11:36:36.184 +0000 [INFO] (0001:transaction): Listing local files at directory '.' filtering filename by prefix 'station20160401free.csv'
2016-05-30 11:36:36.192 +0000 [INFO] (0001:transaction): Loading files [station20160401free.csv]
2016-05-30 11:36:36.334 +0000 [INFO] (0001:transaction): Using local thread executor with max_threads=2 / tasks=1
2016-05-30 11:36:36.386 +0000 [INFO] (0001:transaction): Connecting to jdbc:postgresql://xxxx.xxxx.ap-northeast-1.redshift.amazonaws.com:5439/demo options {user=demomaster, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2016-05-30 11:36:36.525 +0000 [INFO] (0001:transaction): SQL: SET search_path TO "public"
2016-05-30 11:36:36.531 +0000 [INFO] (0001:transaction): > 0.00 seconds
2016-05-30 11:36:36.531 +0000 [INFO] (0001:transaction): Using insert mode
2016-05-30 11:36:36.599 +0000 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "station_25c337d97c40_bl_tmp000"
2016-05-30 11:36:36.603 +0000 [INFO] (0001:transaction): > 0.00 seconds
2016-05-30 11:36:36.753 +0000 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "station_25c337d97c40_bl_tmp000" ("station_cd" INT8, "station_g_cd" INT8, "station_name" VARCHAR(65535), "station_name_k" VARCHAR(65535), "station_name_r" VARCHAR(65535), "line_cd" INT8, "pref_cd" INT8, "post" VARCHAR(65535), "add" VARCHAR(65535), "lon" FLOAT8, "lat" FLOAT8, "open_ymd" TIMESTAMP, "close_ymd" TIMESTAMP, "e_status" INT8)
2016-05-30 11:36:36.766 +0000 [INFO] (0001:transaction): > 0.01 seconds
2016-05-30 11:36:37.132 +0000 [INFO] (0001:transaction): {done:  0 / 1, running: 0}
2016-05-30 11:36:37.188 +0000 [INFO] (0016:task-0000): Connecting to jdbc:postgresql://xxxx.xxxx.ap-northeast-1.redshift.amazonaws.com:5439/demo options {user=demomaster, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2016-05-30 11:36:37.837 +0000 [INFO] (0016:task-0000): SQL: SET search_path TO "public"
2016-05-30 11:36:37.839 +0000 [INFO] (0016:task-0000): > 0.00 seconds
2016-05-30 11:36:37.839 +0000 [INFO] (0016:task-0000): Copy SQL: COPY "station_25c337d97c40_bl_tmp000" ("station_cd", "station_g_cd", "station_name", "station_name_k", "station_name_r", "line_cd", "pref_cd", "post", "add", "lon", "lat", "open_ymd", "close_ymd", "e_status") ? GZIP DELIMITER '\t' NULL '\\N' ESCAPE TRUNCATECOLUMNS ACCEPTINVCHARS STATUPDATE OFF COMPUPDATE OFF
2016-05-30 11:36:40.286 +0000 [INFO] (pool-2-thread-1): Uploading file id temp/redshift/fcedd2a6-f831-4146-95a6-3f759045e56a to S3 (340,651 bytes 10,834 rows)
2016-05-30 11:36:40.926 +0000 [INFO] (pool-2-thread-1): Uploaded file temp/redshift/fcedd2a6-f831-4146-95a6-3f759045e56a (0.64 seconds)
2016-05-30 11:36:40.939 +0000 [INFO] (pool-2-thread-2): SQL: SET search_path TO "public"
2016-05-30 11:36:40.941 +0000 [INFO] (pool-2-thread-2): > 0.00 seconds
2016-05-30 11:36:40.941 +0000 [INFO] (pool-2-thread-2): Running COPY from file temp/redshift/fcedd2a6-f831-4146-95a6-3f759045e56a
2016-05-30 11:36:42.127 +0000 [INFO] (pool-2-thread-2): Loaded file temp/redshift/fcedd2a6-f831-4146-95a6-3f759045e56a (0.33 seconds for COPY)
2016-05-30 11:36:42.156 +0000 [INFO] (0016:task-0000): Loaded 1 files.
2016-05-30 11:36:42.158 +0000 [INFO] (0001:transaction): {done:  1 / 1, running: 0}
2016-05-30 11:36:42.159 +0000 [INFO] (0001:transaction): Connecting to jdbc:postgresql://xxxx.xxxx.ap-northeast-1.redshift.amazonaws.com:5439/demo options {user=demomaster, tcpKeepAlive=true, loginTimeout=300, socketTimeout=28800}
2016-05-30 11:36:42.169 +0000 [INFO] (0001:transaction): SQL: SET search_path TO "public"
2016-05-30 11:36:42.172 +0000 [INFO] (0001:transaction): > 0.00 seconds
2016-05-30 11:36:42.172 +0000 [INFO] (0001:transaction): SQL: INSERT INTO "station" ("station_cd", "station_g_cd", "station_name", "station_name_k", "station_name_r", "line_cd", "pref_cd", "post", "add", "lon", "lat", "open_ymd", "close_ymd", "e_status") SELECT "station_cd", "station_g_cd", "station_name", "station_name_k", "station_name_r", "line_cd", "pref_cd", "post", "add", "lon", "lat", "open_ymd", "close_ymd", "e_status" FROM "station_25c337d97c40_bl_tmp000"
2016-05-30 11:36:42.205 +0000 [INFO] (0001:transaction): > 0.03 seconds (10,834 rows)
2016-05-30 11:36:42.377 +0000 [INFO] (0001:transaction): Connecting to jdbc:postgresql://xxxx.xxxx.ap-northeast-1.redshift.amazonaws.com:5439/demo options {user=demomaster, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2016-05-30 11:36:42.391 +0000 [INFO] (0001:transaction): SQL: SET search_path TO "public"
2016-05-30 11:36:42.393 +0000 [INFO] (0001:transaction): > 0.00 seconds
2016-05-30 11:36:42.393 +0000 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "station_25c337d97c40_bl_tmp000"
2016-05-30 11:36:42.503 +0000 [INFO] (0001:transaction): > 0.11 seconds
2016-05-30 11:36:42.507 +0000 [INFO] (main): Committed.
2016-05-30 11:36:42.507 +0000 [INFO] (main): Next config diff: {"in":{"last_path":"station20160401free.csv"},"out":{}}

無事にデータがデータベースにロードされました。テーブルの中身を確認します。lon列、lat列は浮動小数点としてロードされています。また、e_sort列は取り込まれていません。このような差異があっても、データのロードに成功しました。(暗黙のデータ変換が可能な場合。当然ながら、データファイルの側に"X"などの文字が含まれている場合は、その行はエラーのためスキップされます)

table_exists

まとめ

今回は既存のテーブルが存在する場合を試してみました。

列のデータ型が(暗黙のデータ変換が可能な範囲で)違っていても、自動的にデータをロードすることができました。また、既存のテーブルにない列については、特にエラーにはならず、正常に読み込むことができました。このような柔軟な対応は、データロード処理の作業効率アップに大いに貢献してくれそうです。

次回

次回は、時間が取れたら、Embulkのエラー処理について実験してみたいと思います。また、引き続き、Embulkの様々なオプションについての実験も進めていきます。