Digdag + EmbulkによるTSVファイルのS3→Redshiftロード #digdag

2016.06.22

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

こんにちは、yokatsukiです。

先日6月18日、第五回ゲームサーバ勉強会に参加してきました。

そこで、トレジャーデータのサポートエンジニアマネージャー高橋様から、直前の6月15日にオープンソース化されたばかりのDigdagの説明がありました。その時の発表スライドは下記です。

Digdagは弊社でも何名かが既に触ってブログで公開(下記)しているので、名前と目的は知ってましたが、説明とデモを見るうちに自分でも試してみたくなりました。

という訳で、簡単ではありますが共有します。

テーブル定義の無いTSVファイルをRedshiftへロードする

通常、TSVファイルをRedshiftにロードする為には、事前にRedshift上にテーブルを作成しておく必要があります。この時のテーブル設計作業は思った以上に面倒なものです。事前にテーブル定義書があったとしても、Redshiftに適応するための編集は必要になりますし、テーブル定義書が実際のテーブル構造と異なっていた、というのもあるあるだと思います。

この作業を軽減してくれるのが、Embulkでテーブルのカラム定義を推測してくれるguess機能なのですが、これも実行するには下記の手順が必要になります。

  • 定義ファイルを作成する為に必要なリソースへのアクセス情報をymlで記述する
  • 上記ファイルをguessコマンドで読み込み、定義ファイルを生成する
  • runコマンドで定義ファイルを読み込み、テーブルの作成とファイル内容のロードを実行する

こちらの手順をDigdagでまとめ、コマンド一発で実行できるようにしました。

実行環境作成

AWS環境準備

動作を確認するために、AWS環境では以下の準備を行いました。

  • IAMユーザ、アクセスキー発行(cm-katsuki.yosuke)
  • データ格納用S3バケット作成(cm-embulk-source-data-yokatsuki)
  • 一時作業用S3バケット作成(cm-embulk-temp-yokatsuki)
  • VPC, Subunet設定(10.0.0.0/24)
  • Redshiftクラスタ起動(dc1.large)
  • Embulk, Digdag実行用EC2インスタンス起動(t2.micro)

Digdag、Embulk準備

起動したEC2インスタンスに、Digdag、Embulkの実行環境を準備しました。

JDKインストール

事前にオラクル社のページからRPMパッケージをダウンロードしておきます。

$ sudo rpm -ivh jdk-8u92-linux-x64.rpm

Digdagインストール

$ curl -o /usr/local/bin/digdag --create-dirs -L "https://dl.digdag.io/digdag-latest"
$ chmod +x /usr/local/bin/digdag

Embulkインストール

$ curl --create-dirs -o ~/.embulk/bin/embulk -L "http://dl.embulk.org/embulk-latest.jar"
$ chmod +x ~/.embulk/bin/embulk

なお、DigdagやEmbulkには、PATHを通しておきました。

Embulkプラグインインストール

S3からファイルを取得し、Redshiftに格納するそれぞれのプラグインを追加しました。

$ embulk gem install embulk-input-s3
$ embulk gem install embulk-output-redshift

結果、以下の構成ができあがりました。

load-tsv-from-s3-to-redshift-using-embulk-digdag-01

データの準備

データ準備

ロード対象データとして、Tableauユーザお馴染みのSuperstoreサンプルを使用しました。 これはExcelファイルなので、Ordersシートの内容を、タブ区切りのテキストファイルとして保存します。また、Embulkでは見出し(先頭行)を自動的にテーブルのカラム名に設定することができるのですが、現在のRedshiftでは日本語カラムが定義できないので、「製品名」等の見出しを「product_name」等の英語表記に変えておきます。

Digdagプロジェクト作成

initコマンドでプロジェクトを作成します。

$ digdag init tsvload
2016-06-22 08:45:42 +0000: Digdag v0.8.2
  Creating tsvload/.gitignore
  Creating tsvload/tasks/shell_sample.sh
  Creating tsvload/tasks/repeat_hello.sh
  Creating tsvload/tasks/__init__.py
  Creating tsvload/tsvload.dig
Done. Type `cd tsvload` and then `digdag run tsvload.dig` to run the workflow. Enjoy!

作成されたプロジェクトにはtasksサブディレクトリが作られ、その中にサンプルが用意されますが、これは削除しても大丈夫です。

ワークフロー記述

Digdagのドキュメントを参考にしながら、tsvload.digファイルにタスクを記述します。

timezone: UTC

+guess:
  sh>: embulk guess embulk/guess-redshift-output.yml -o embulk/config-redshift-output.yml

+load:
  embulk>: embulk/config-redshift-output.yml

内容を簡単に説明すると、まずguessタスクで、シェルコマンドとしてembulkのguessを実行します。読み取る設定ファイルや結果の設定ファイルはembulkサブディレクトリ以下に置きます。次にloadタスクでembulkのロードを実行します。

どのような技術を使ってタスクを実行するか、は各処理の先頭に記述する「オペレータ」と呼ばれる識別子で決定するのですが、embulkオペレータではguessを実行する事ができないようなので、シェルオペレータを使用しました。

Embulk設定ファイル記述

上記ワークフローの記述に沿うよう、embuklサブディレクトリを作り、その下にEmbulkでguessに使用する設定ファイルを記述します。

in:
  type: s3
  bucket: cm-embulk-source-data-yokatsuki
  path_prefix: Sample
  access_key_id: xxxxxx
  secret_access_key: xxxxxx
out:
  type: redshift
  host: myredshift.xxxxxx.us-east-1.redshift.amazonaws.com
  user: xxxxxx
  password: xxxxxx
  database: xxxxxx
  table: orders_test_embulk
  access_key_id: xxxxxx
  secret_access_key: xxxxxx
  iam_user_name: cm-katsuki.yosuke
  s3_bucket: cm-embulk-temp-yokatsuki
  s3_key_prefix: temp/redshift
  mode: insert

ここまでで、Digdagプロジェクト以下の構造は以下の通りになります。

tsvload(プロジェクトディレクトリ)
├── embulk
│   └── guess-redshift-output.yml
└── tsvload.dig

Digdagワークフロー実行

ここまで準備できれば後は実行するだけです。プロジェクトディレクトリ下で、以下のコマンドを実行し、ワークフローを開始します。

$ digdag run tsvload.dig

以下のメッセージが出力され、タスクが実行されていきます。

2016-06-22 09:18:10 +0000: Digdag v0.8.2
2016-06-22 09:18:11 +0000 [WARN] (main): Using a new session time 2016-06-22T00:00:00+00:00.
2016-06-22 09:18:11 +0000 [INFO] (main): Using session .digdag/status/20160622T000000+0000.
2016-06-22 09:18:11 +0000 [INFO] (main): Starting a new session project id=1 workflow name=tsvload session_time=2016-06-22T00:00:00+00:00
2016-06-22 09:18:12 +0000 [INFO] (0017@+tsvload+guess): sh>: embulk guess embulk/guess-redshift-output.yml -o embulk/config-redshift-output.yml
2016-06-22 09:18:15.741 +0000: Embulk v0.8.9
2016-06-22 09:18:17.002 +0000 [INFO] (0001:guess): Loaded plugin embulk-input-s3 (0.2.8)
2016-06-22 09:18:18.026 +0000 [INFO] (0001:guess): Loaded plugin embulk/guess/gzip from a load path
2016-06-22 09:18:18.056 +0000 [INFO] (0001:guess): Loaded plugin embulk/guess/bzip2 from a load path
2016-06-22 09:18:18.074 +0000 [INFO] (0001:guess): Loaded plugin embulk/guess/json from a load path
2016-06-22 09:18:18.082 +0000 [INFO] (0001:guess): Loaded plugin embulk/guess/csv from a load path
in:
  type: s3
  bucket: cm-embulk-source-data-yokatsuki
  path_prefix: Sample
  access_key_id: xxxxxx
  secret_access_key: xxxxxx
  parser:
    charset: MS932
    newline: CRLF
    type: csv
    delimiter: "\t"
    quote: '"'
    escape: '"'
    trim_if_not_quoted: false
    skip_header_lines: 1
    allow_extra_columns: false
    allow_optional_columns: false
    columns:
    - {name: order_id, type: long}
    - {name: order_date, type: timestamp, format: '%Y/%m/%d'}
    - {name: order_priority, type: string}
    - {name: order_quantity, type: long}
    - {name: sales, type: string}
    - {name: discount, type: string}
    - {name: ship_mode, type: string}
    - {name: profit, type: long}
    - {name: unit_price, type: long}
    - {name: advertising_expenses, type: string}
    - {name: shipping_cost, type: long}
    - {name: customer_name, type: string}
    - {name: prefecture, type: string}
    - {name: city, type: string}
    - {name: region, type: string}
    - {name: shop_name, type: string}
    - {name: customer_segment, type: string}
    - {name: product_category, type: string}
    - {name: product_sub_category, type: string}
    - {name: product_id, type: string}
    - {name: product_name, type: string}
    - {name: product_description, type: string}
    - {name: product_container, type: string}
    - {name: product_base_margin, type: double}
    - {name: supplier, type: string}
    - {name: delivery_date, type: timestamp, format: '%Y/%m/%d'}
    - {name: ship_date, type: timestamp, format: '%Y/%m/%d'}
out: {type: redshift, host: myredshift.xxxxxx.us-east-1.redshift.amazonaws.com,
  user: xxxxxx, password: xxxxxx, database: xxxxxx, table: orders_test_embulk,
  access_key_id: xxxxxx, secret_access_key: xxxxxx,
  iam_user_name: cm-katsuki.yosuke, s3_bucket: cm-embulk-temp-yokatsuki, s3_key_prefix: temp/redshift,
  mode: insert}
Created 'embulk/config-redshift-output.yml' file.
2016-06-22 09:18:20 +0000 [INFO] (0017@+tsvload+load): embulk>: embulk/config-redshift-output.yml
2016-06-22 09:18:26.427 +0000: Embulk v0.8.9
2016-06-22 09:18:29.312 +0000 [INFO] (0001:transaction): Loaded plugin embulk-input-s3 (0.2.8)
2016-06-22 09:18:29.406 +0000 [INFO] (0001:transaction): Loaded plugin embulk-output-redshift (0.6.0)
2016-06-22 09:18:30.704 +0000 [INFO] (0001:transaction): Using local thread executor with max_threads=2 / tasks=1
2016-06-22 09:18:30.746 +0000 [INFO] (0001:transaction): Connecting to jdbc:postgresql://myredshift.xxxxxx.us-east-1.redshift.amazonaws.com:5439/xxxxxx options {user=root, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2016-06-22 09:18:30.861 +0000 [INFO] (0001:transaction): SQL: SET search_path TO "public"
2016-06-22 09:18:30.865 +0000 [INFO] (0001:transaction): > 0.00 seconds
2016-06-22 09:18:30.865 +0000 [INFO] (0001:transaction): Using insert mode
2016-06-22 09:18:30.913 +0000 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "orders_test_0bbdfb40_bl_tmp000"
2016-06-22 09:18:30.917 +0000 [INFO] (0001:transaction): > 0.00 seconds
2016-06-22 09:18:31.110 +0000 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "orders_test_0bbdfb40_bl_tmp000" ("order_id" BIGINT, "order_date" TIMESTAMP, "order_priority" VARCHAR(65535), "order_quantity" BIGINT, "sales" VARCHAR(65535), "discount" VARCHAR(65535), "ship_mode" VARCHAR(65535), "profit" BIGINT, "unit_price" BIGINT, "advertising_expenses" VARCHAR(65535), "shipping_cost" BIGINT, "customer_name" VARCHAR(65535), "prefecture" VARCHAR(65535), "city" VARCHAR(65535), "region" VARCHAR(65535), "shop_name" VARCHAR(65535), "customer_segment" VARCHAR(65535), "product_category" VARCHAR(65535), "product_sub_category" VARCHAR(65535), "product_id" VARCHAR(65535), "product_name" VARCHAR(65535), "product_description" VARCHAR(65535), "product_container" VARCHAR(65535), "product_base_margin" DOUBLE PRECISION, "supplier" VARCHAR(65535), "delivery_date" TIMESTAMP, "ship_date" TIMESTAMP)
2016-06-22 09:18:31.122 +0000 [INFO] (0001:transaction): > 0.01 seconds
2016-06-22 09:18:31.673 +0000 [INFO] (0001:transaction): {done:  0 / 1, running: 0}
2016-06-22 09:18:31.722 +0000 [INFO] (0017:task-0000): Connecting to jdbc:postgresql://myredshift.xxxxxx.us-east-1.redshift.amazonaws.com:5439/xxxxxx options {user=root, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2016-06-22 09:18:31.961 +0000 [INFO] (0017:task-0000): SQL: SET search_path TO "public"
2016-06-22 09:18:31.964 +0000 [INFO] (0017:task-0000): > 0.00 seconds
2016-06-22 09:18:31.964 +0000 [INFO] (0017:task-0000): Copy SQL: COPY "orders_test_0bbdfb40_bl_tmp000" ("order_id", "order_date", "order_priority", "order_quantity", "sales", "discount", "ship_mode", "profit", "unit_price", "advertising_expenses", "shipping_cost", "customer_name", "prefecture", "city", "region", "shop_name", "customer_segment", "product_category", "product_sub_category", "product_id", "product_name", "product_description", "product_container", "product_base_margin", "supplier", "delivery_date", "ship_date") ? GZIP DELIMITER '\t' NULL '\\N' ESCAPE TRUNCATECOLUMNS ACCEPTINVCHARS STATUPDATE OFF COMPUPDATE OFF
2016-06-22 09:18:41.730 +0000 [INFO] (pool-2-thread-1): Uploading file id temp/redshift/ac60ed7f-d163-4a93-b330-17ef8e6a9263 to S3 (773,477 bytes 8,369 rows)
2016-06-22 09:18:42.093 +0000 [INFO] (pool-2-thread-1): Uploaded file temp/redshift/ac60ed7f-d163-4a93-b330-17ef8e6a9263 (0.36 seconds)
2016-06-22 09:18:42.101 +0000 [INFO] (pool-2-thread-2): SQL: SET search_path TO "public"
2016-06-22 09:18:42.103 +0000 [INFO] (pool-2-thread-2): > 0.00 seconds
2016-06-22 09:18:42.103 +0000 [INFO] (pool-2-thread-2): Running COPY from file temp/redshift/ac60ed7f-d163-4a93-b330-17ef8e6a9263
2016-06-22 09:18:43.280 +0000 [INFO] (pool-2-thread-2): Loaded file temp/redshift/ac60ed7f-d163-4a93-b330-17ef8e6a9263 (1.05 seconds for COPY)
2016-06-22 09:18:43.300 +0000 [INFO] (0017:task-0000): Loaded 1 files.
2016-06-22 09:18:43.302 +0000 [INFO] (0001:transaction): {done:  1 / 1, running: 0}
2016-06-22 09:18:43.302 +0000 [INFO] (0001:transaction): Connecting to jdbc:postgresql://myredshift.xxxxxx.us-east-1.redshift.amazonaws.com:5439/xxxxxx options {user=root, tcpKeepAlive=true, loginTimeout=300, socketTimeout=28800}
2016-06-22 09:18:43.308 +0000 [INFO] (0001:transaction): SQL: SET search_path TO "public"
2016-06-22 09:18:43.311 +0000 [INFO] (0001:transaction): > 0.00 seconds
2016-06-22 09:18:43.323 +0000 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS "orders_test_embulk" ("order_id" BIGINT, "order_date" TIMESTAMP, "order_priority" VARCHAR(65535), "order_quantity" BIGINT, "sales" VARCHAR(65535), "discount" VARCHAR(65535), "ship_mode" VARCHAR(65535), "profit" BIGINT, "unit_price" BIGINT, "advertising_expenses" VARCHAR(65535), "shipping_cost" BIGINT, "customer_name" VARCHAR(65535), "prefecture" VARCHAR(65535), "city" VARCHAR(65535), "region" VARCHAR(65535), "shop_name" VARCHAR(65535), "customer_segment" VARCHAR(65535), "product_category" VARCHAR(65535), "product_sub_category" VARCHAR(65535), "product_id" VARCHAR(65535), "product_name" VARCHAR(65535), "product_description" VARCHAR(65535), "product_container" VARCHAR(65535), "product_base_margin" DOUBLE PRECISION, "supplier" VARCHAR(65535), "delivery_date" TIMESTAMP, "ship_date" TIMESTAMP)
2016-06-22 09:18:43.451 +0000 [INFO] (0001:transaction): > 0.13 seconds
2016-06-22 09:18:43.640 +0000 [INFO] (0001:transaction): SQL: INSERT INTO "orders_test_embulk" ("order_id", "order_date", "order_priority", "order_quantity", "sales", "discount", "ship_mode", "profit", "unit_price", "advertising_expenses", "shipping_cost", "customer_name", "prefecture", "city", "region", "shop_name", "customer_segment", "product_category", "product_sub_category", "product_id", "product_name", "product_description", "product_container", "product_base_margin", "supplier", "delivery_date", "ship_date") SELECT "order_id", "order_date", "order_priority", "order_quantity", "sales", "discount", "ship_mode", "profit", "unit_price", "advertising_expenses", "shipping_cost", "customer_name", "prefecture", "city", "region", "shop_name", "customer_segment", "product_category", "product_sub_category", "product_id", "product_name", "product_description", "product_container", "product_base_margin", "supplier", "delivery_date", "ship_date" FROM "orders_test_0bbdfb40_bl_tmp000"
2016-06-22 09:18:43.692 +0000 [INFO] (0001:transaction): > 0.05 seconds (8,369 rows)
2016-06-22 09:18:44.567 +0000 [INFO] (0001:transaction): Connecting to jdbc:postgresql://myredshift.xxxxxx.us-east-1.redshift.amazonaws.com:5439/xxxxxx options {user=root, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2016-06-22 09:18:44.576 +0000 [INFO] (0001:transaction): SQL: SET search_path TO "public"
2016-06-22 09:18:44.578 +0000 [INFO] (0001:transaction): > 0.00 seconds
2016-06-22 09:18:44.578 +0000 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS "orders_test_0bbdfb40_bl_tmp000"
2016-06-22 09:18:44.808 +0000 [INFO] (0001:transaction): > 0.23 seconds
2016-06-22 09:18:44.817 +0000 [INFO] (main): Committed.
2016-06-22 09:18:44.821 +0000 [INFO] (main): Next config diff: {"in":{"last_path":"Sample - Superstore Sales Japan-2013(revised).txt"},"out":{}}

Success. Task state is saved at .digdag/status/20160622T000000+0000 directory.
  * Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
  * Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.

SQL Workbench/JでRedshiftにログインし、テーブルがロードされていることを確認しました。Embulkのお陰で、文字コードもMS932からUTF-8へ自動変換されており、文字化けは発生していません。

load-tsv-from-s3-to-redshift-using-embulk-digdag-02

まとめと今後

Digdagを使って、Embulk処理自動化の第一歩として、ざっくりとではありますが、guessとrunをまとめて実行することができました。事前準備は色々ありましたが、結局手作業で記述したファイルは、.digファイルとguess用の.ymlファイルのたった2つだけでした。これだけで

今後、以下についても試していきます。

  • 変数の活用
  • 複数テーブルへの対応
  • ロード失敗時の処理
  • 定期実行
  • 並列実行

それでは、また。