[Embulk] guess機能を試してみた Redshift編

Embulkアイキャッチ

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、最近Embulk(エンバルク)を調べている川崎です。

前回の記事はこちらをご覧ください。 [Embulk] guess機能を試してみた

[Embulk] guess機能を試してみた【追記】あり

EmbulkのRedshift outputプラグインをインストール

まずは下記の手順で、embulk-output-redshiftプラグインをインストールします。

$ embulk gem install embulk-output-redshift
2016-05-18 14:48:27.051 +0000: Embulk v0.8.9
Fetching: embulk-output-redshift-0.6.0.gem (100%)
Successfully installed embulk-output-redshift-0.6.0
1 gem installed

インストールされたことを確認します。

$ embulk gem list embulk-output-redshift
2016-05-18 14:49:00.606 +0000: Embulk v0.8.9

*** LOCAL GEMS ***

embulk-output-redshift (0.6.0)

Redshiftで日本語のデータセットをロード

前回同様、日本語を含むデータをロードしてみます。オープンデータとして、下記のサイトからダウンロードできるCSVファイルを使います。

http://www.ekidata.jp/

ekijp_20160425_1

ファイルの種類は、前回同様「駅データ」を使用していきます。データの中身はこのようになっています。

$ cat station20160401free.csv
station_cd,station_g_cd,station_name,station_name_k,station_name_r,line_cd,pref_cd,post,add,lon,lat,open_ymd,close_ymd,e_status,e_sort
1110101,1110101,函館,,,11101,1,040-0063,北海道函館市若松町12-13,140.726413,41.773709,1902-12-10,,0,1110101
1110102,1110102,五稜郭,,,11101,1,041-0813,函館市亀田本町,140.733539,41.803557,,,0,1110102
1110103,1110103,桔梗,,,11101,1,041-1210,北海道函館市桔梗3丁目41-36,140.722952,41.846457,1902-12-10,,0,1110103
1110104,1110104,大中山,,,11101,1,041-1121,亀田郡七飯町大字大中山,140.71358,41.864641,,,0,1110104
1110105,1110105,七飯,,,11101,1,041-1111,亀田郡七飯町字本町,140.688556,41.886971,,,0,1110105
guess機能に対する設定を、以下のようにguess.ymlファイルに記述しておきます。(テキストエディタで作成します)
$ cat guess.yml
in:
 type: file
 path_prefix: "./station20160401free.csv"
out:
 type: redshift
 host: xxxx.xxxx.ap-northeast-1.redshift.amazonaws.com
 user: demomaster
 password: xxxx
 database: demo
 table: station
 access_key_id: XXXXXXXX
 secret_access_key: XXXXXXXXXXXXXXXX
 iam_user_name: xxxxx
 s3_bucket: my-redshift-transfer-bucket
 s3_key_prefix: temp/redshift
 mode: insert

embulk-output-redshiftは、一旦S3にファイルをアップロードしてから、copyコマンドでRedshiftにロードする仕組みとのことです。 そこで、AWSのaccess_key_id、secret_access_key、iam_user_name、それからs3_bucket、s3_key_prefixの設定が必要になります。

guessコマンドを実行すると、推定した結果がconfig.ymlファイルに保存されます。

$ embulk guess ./guess.yml -o config.yml
2016-05-18 13:49:59.383 +0000: Embulk v0.8.9
2016-05-18 13:50:00.747 +0000 [INFO] (0001:guess): Listing local files at directory '.' filtering filename by prefix 'station20160401free.csv'
2016-05-18 13:50:00.752 +0000 [INFO] (0001:guess): Loading files [station20160401free.csv]
2016-05-18 13:50:00.990 +0000 [INFO] (0001:guess): Loaded plugin embulk/guess/gzip from a load path
2016-05-18 13:50:01.003 +0000 [INFO] (0001:guess): Loaded plugin embulk/guess/bzip2 from a load path
2016-05-18 13:50:01.024 +0000 [INFO] (0001:guess): Loaded plugin embulk/guess/json from a load path
2016-05-18 13:50:01.035 +0000 [INFO] (0001:guess): Loaded plugin embulk/guess/csv from a load path
in:
  type: file
  path_prefix: ./station20160401free.csv
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    escape: '"'
    trim_if_not_quoted: false
    skip_header_lines: 1
    allow_extra_columns: false
    allow_optional_columns: false
    columns:
    - {name: station_cd, type: long}
    - {name: station_g_cd, type: long}
    - {name: station_name, type: string}
    - {name: station_name_k, type: string}
    - {name: station_name_r, type: string}
    - {name: line_cd, type: long}
    - {name: pref_cd, type: long}
    - {name: post, type: string}
    - {name: add, type: string}
    - {name: lon, type: string}
    - {name: lat, type: string}
    - {name: open_ymd, type: timestamp, format: '%Y-%m-%d'}
    - {name: close_ymd, type: timestamp, format: '%Y-%m-%d'}
    - {name: e_status, type: long}
    - {name: e_sort, type: long}
out: {type: redshift, host: xxxx.xxxx.ap-northeast-1.redshift.amazonaws.com,
  user: demomaster, password: xxxx, database: demo, table: station, access_key_id:XXXXXXXX,
  secret_access_key: XXXXXXXXXXXXXXXX, iam_user_name: xxxxx,
  s3_bucket: my-redshift-transfer-bucket, s3_key_prefix: temp/redshift, mode: insert}
Created 'config.yml' file.

config.ymlの内容は、次のようになっています。

$ cat config.yml
in:
  type: file
  path_prefix: ./station20160401free.csv
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    escape: '"'
    trim_if_not_quoted: false
    skip_header_lines: 1
    allow_extra_columns: false
    allow_optional_columns: false
    columns:
    - {name: station_cd, type: long}
    - {name: station_g_cd, type: long}
    - {name: station_name, type: string}
    - {name: station_name_k, type: string}
    - {name: station_name_r, type: string}
    - {name: line_cd, type: long}
    - {name: pref_cd, type: long}
    - {name: post, type: string}
    - {name: add, type: string}
    - {name: lon, type: string}
    - {name: lat, type: string}
    - {name: open_ymd, type: timestamp, format: '%Y-%m-%d'}
    - {name: close_ymd, type: timestamp, format: '%Y-%m-%d'}
    - {name: e_status, type: long}
    - {name: e_sort, type: long}
out: {type: redshift, host: xxxx.xxxx.ap-northeast-1.redshift.amazonaws.com,
  user: demomaster, password: xxxx, database: demo, table: station, access_key_id:XXXXXXXX,
  secret_access_key: XXXXXXXXXXXXXXXX, iam_user_name: xxxxx,
  s3_bucket: my-redshift-transfer-bucket, s3_key_prefix: temp/redshift, mode: insert}

データのロードを実行します。

$ embulk run config.yml 
2016-05-18 13:52:32.405 +0000: Embulk v0.8.9
2016-05-18 13:52:35.555 +0000 [INFO] (0001:transaction): Loaded plugin embulk-output-redshift (0.6.0)
2016-05-18 13:52:35.642 +0000 [INFO] (0001:transaction): Listing local files at directory '.' filtering filename by prefix 'station20160401free.csv'
2016-05-18 13:52:35.647 +0000 [INFO] (0001:transaction): Loading files [station20160401free.csv]
2016-05-18 13:52:35.788 +0000 [INFO] (0001:transaction): Using local thread executor with max_threads=2 / tasks=1
2016-05-18 13:52:35.849 +0000 [INFO] (0001:transaction): Connecting to jdbc:postgresql://xxxx.xxxx.ap-northeast-1.redshift.amazonaws.com:5439/cmdemo options {user=demomaster, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2016-05-18 13:52:35.958 +0000 [INFO] (0001:transaction): SQL: SET search_path TO "public"
2016-05-18 13:52:35.962 +0000 [INFO] (0001:transaction): > 0.00 seconds
2016-05-18 13:52:35.963 +0000 [INFO] (0001:transaction): Using insert mode
2016-05-18 13:52:35.998 +0000 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "station_73a31593ae80_bl_tmp000"
2016-05-18 13:52:36.001 +0000 [INFO] (0001:transaction): > 0.00 seconds
2016-05-18 13:52:36.144 +0000 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "station_73a31593ae80_bl_tmp000" ("station_cd" BIGINT, "station_g_cd" BIGINT, "station_name" VARCHAR(65535), "station_name_k" VARCHAR(65535), "station_name_r" VARCHAR(65535), "line_cd" BIGINT, "pref_cd" BIGINT, "post" VARCHAR(65535), "add" VARCHAR(65535), "lon" VARCHAR(65535), "lat" VARCHAR(65535), "open_ymd" TIMESTAMP, "close_ymd" TIMESTAMP, "e_status" BIGINT, "e_sort" BIGINT)
2016-05-18 13:52:36.192 +0000 [INFO] (0001:transaction): > 0.05 seconds
2016-05-18 13:52:36.395 +0000 [INFO] (0001:transaction): {done:  0 / 1, running: 0}
2016-05-18 13:52:36.459 +0000 [INFO] (0016:task-0000): Connecting to jdbc:postgresql://xxxx.xxxx.ap-northeast-1.redshift.amazonaws.com:5439/demo options {user=demomaster, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2016-05-18 13:52:37.000 +0000 [INFO] (0016:task-0000): SQL: SET search_path TO "public"
2016-05-18 13:52:37.002 +0000 [INFO] (0016:task-0000): > 0.00 seconds
2016-05-18 13:52:37.003 +0000 [INFO] (0016:task-0000): Copy SQL: COPY "station_73a31593ae80_bl_tmp000" ("station_cd", "station_g_cd", "station_name", "station_name_k", "station_name_r", "line_cd", "pref_cd", "post", "add", "lon", "lat", "open_ymd", "close_ymd", "e_status", "e_sort") ? GZIP DELIMITER '\t' NULL '\\N' ESCAPE TRUNCATECOLUMNS ACCEPTINVCHARS STATUPDATE OFF COMPUPDATE OFF
2016-05-18 13:52:39.438 +0000 [INFO] (pool-2-thread-1): Uploading file id temp/redshift/7d6756a0-54e1-4924-9dfc-44b8474ef8f1 to S3 (371,203 bytes 10,834 rows)
2016-05-18 13:52:40.048 +0000 [INFO] (pool-2-thread-1): Uploaded file temp/redshift/7d6756a0-54e1-4924-9dfc-44b8474ef8f1 (0.61 seconds)
2016-05-18 13:52:40.061 +0000 [INFO] (pool-2-thread-2): SQL: SET search_path TO "public"
2016-05-18 13:52:40.063 +0000 [INFO] (pool-2-thread-2): > 0.00 seconds
2016-05-18 13:52:40.063 +0000 [INFO] (pool-2-thread-2): Running COPY from file temp/redshift/7d6756a0-54e1-4924-9dfc-44b8474ef8f1
2016-05-18 13:52:44.045 +0000 [INFO] (pool-2-thread-2): Loaded file temp/redshift/7d6756a0-54e1-4924-9dfc-44b8474ef8f1 (3.15 seconds for COPY)
2016-05-18 13:52:44.089 +0000 [INFO] (0016:task-0000): Loaded 1 files.
2016-05-18 13:52:44.092 +0000 [INFO] (0001:transaction): {done:  1 / 1, running: 0}
2016-05-18 13:52:44.092 +0000 [INFO] (0001:transaction): Connecting to jdbc:postgresql://xxxx.xxxx.ap-northeast-1.redshift.amazonaws.com:5439/demo options {user=demomaster, tcpKeepAlive=true, loginTimeout=300, socketTimeout=28800}
2016-05-18 13:52:44.099 +0000 [INFO] (0001:transaction): SQL: SET search_path TO "public"
2016-05-18 13:52:44.102 +0000 [INFO] (0001:transaction): > 0.00 seconds
2016-05-18 13:52:44.115 +0000 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "station" ("station_cd" BIGINT, "station_g_cd" BIGINT, "station_name" VARCHAR(65535), "station_name_k" VARCHAR(65535), "station_name_r" VARCHAR(65535), "line_cd" BIGINT, "pref_cd" BIGINT, "post" VARCHAR(65535), "add" VARCHAR(65535), "lon" VARCHAR(65535), "lat" VARCHAR(65535), "open_ymd" TIMESTAMP, "close_ymd" TIMESTAMP, "e_status" BIGINT, "e_sort" BIGINT)
2016-05-18 13:52:44.130 +0000 [INFO] (0001:transaction): > 0.01 seconds
2016-05-18 13:52:44.304 +0000 [INFO] (0001:transaction): SQL: INSERT INTO "station" ("station_cd", "station_g_cd", "station_name", "station_name_k", "station_name_r", "line_cd", "pref_cd", "post", "add", "lon", "lat", "open_ymd", "close_ymd", "e_status", "e_sort") SELECT "station_cd", "station_g_cd", "station_name", "station_name_k", "station_name_r", "line_cd", "pref_cd", "post", "add", "lon", "lat", "open_ymd", "close_ymd", "e_status", "e_sort" FROM "station_73a31593ae80_bl_tmp000"
2016-05-18 13:52:46.362 +0000 [INFO] (0001:transaction): > 2.06 seconds (10,834 rows)
2016-05-18 13:52:46.562 +0000 [INFO] (0001:transaction): Connecting to jdbc:postgresql://xxxx.xxxx.ap-northeast-1.redshift.amazonaws.com:5439/demo options {user=demomaster, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2016-05-18 13:52:46.573 +0000 [INFO] (0001:transaction): SQL: SET search_path TO "public"
2016-05-18 13:52:46.579 +0000 [INFO] (0001:transaction): > 0.01 seconds
2016-05-18 13:52:46.579 +0000 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "station_73a31593ae80_bl_tmp000"
2016-05-18 13:52:46.687 +0000 [INFO] (0001:transaction): > 0.11 seconds
2016-05-18 13:52:46.690 +0000 [INFO] (main): Committed.
2016-05-18 13:52:46.690 +0000 [INFO] (main): Next config diff: {"in":{"last_path":"station20160401free.csv"},"out":{}}

無事にデータがデータベースにロードされました。テーブルの中身を確認します。

ekijp_station_20160518_1

各列のデータ型を確認します。
CREATE TABLE station
(
   station_cd      bigint,
   station_g_cd    bigint,
   station_name    varchar(65535),
   station_name_k  varchar(65535),
   station_name_r  varchar(65535),
   line_cd         bigint,
   pref_cd         bigint,
   post            varchar(65535),
   add             varchar(65535),
   lon             varchar(65535),
   lat             varchar(65535),
   open_ymd        timestamp,
   close_ymd       timestamp,
   e_status        bigint,
   e_sort          bigint
);

データのロードに成功しました。

まとめ

Embulkの読み込んだデータのレイアウトを推測する「guess」という機能を、ターゲットのデータベースをRedshiftに変更して試してみました。Redshiftの場合でも何の問題もなく、ロードすることができました。

次回

次回は、Embulkの様々なオプションについて実験してみたいと思います。

AWS Cloud Roadshow 2017 福岡