RedshiftからUnloadを用いてデータ取得してみた | Luigi Advent Calendar 2016 #15

2016.12.15

はじめに

好物はインフラとフロントエンドのかじわらゆたかです。  

このエントリは『Luigi Advent Calendar 2016』15日目の内容となります。
今回はRedshiftへのUnloadを実施します。

先日14日目はRedshiftからのデータ取得してみた(日本語テーブル・カラム抽出)でした。

LuigiのRedshift実装を確認していたところ、Unload用のタスクが実装されていたので、
こちらを使ってみたいと思います。

下準備 サンプルデータ

前回同様、日本語名のテーブル・カラムに対して、 Redshiftチュートリアルのデータを 投入したテーブルを用います。

CREATE TABLE 顧客情報
(
  顧客ID INTEGER     NOT NULL,
  氏名   VARCHAR(25) NOT NULL,
  住所   VARCHAR(25) NOT NULL,
  市    VARCHAR(10) NOT NULL,
  国    VARCHAR(15) NOT NULL,
  地域   VARCHAR(12) NOT NULL,
  電話番号 VARCHAR(15) NOT NULL,
  市場区分 VARCHAR(10) NOT NULL
);

copy 顧客情報 from 's3://awssampledbuswest2/ssbgz/customer' 
CREDENTIALS 'aws_iam_role=arn:aws:iam::nnnnnnnnnnnn:role/redshift-role-mmmmmmmmm'
gzip compupdate off region 'us-west-2';

LuigiのUnloadタスクを用いてRedshiftからデータ抽出を行なってみた。

LuigiのUnloadタスクを用いた実装をGithubで確認してみたところ、Luigiのソースコードとそのテストコードくらいしか無かったので、
テストコードの実装を参考に実装していきたいと思います。

luigi/test/redshift_test.py

実装方針としては、luigi.contrib.redshift.RedshiftUnloadTaskを継承したクラスを作っていき、必要な接続情報等をプロパティにセットし、 最後queryメソッドとして、抽出に用いるSQL文をかけば抽出が行える模様です。

userS3.py

# -*- coding: utf-8 -*-
from logging import getLogger, StreamHandler, DEBUG

import psycopg2
import psycopg2.extras
import luigi
import luigi.contrib.redshift
logger = getLogger(__name__)
handler = StreamHandler()
handler.setLevel(DEBUG)
logger.setLevel(DEBUG)
logger.addHandler(handler)


class unloadRedshiftTableWithMultibyteCharacter (luigi.contrib.redshift.RedshiftUnloadTask):
    host = "[RedshiftHostName]:5439"
    database = "[RedshiftDatabase]"
    user = "[RedshiftUserName]"
    password = "[RedshiftPassword]"
    table = "顧客情報"
    bucket = "cm-kajiwara-redshift-unload"
    key = "emploee"
    aws_access_key_id = "[aws_access_key_id]"
    aws_secret_access_key = "[aws_secret_access_key]"

    s3_unload_path = 's3://%s/%s' % (bucket, key)
    unload_options = "DELIMITER ',' ADDQUOTES GZIP ALLOWOVERWRITE PARALLEL OFF"

    def query(self):
        return "SELECT 顧客id,氏名,住所,電話番号 FROM 顧客情報 ORDER BY 顧客id


if __name__ == '__main__':
    luigi.run(

実行結果

$ python ./useRedshift.py --local-scheduler unloadRedshiftTableWithMultibyteCharacter
DEBUG: Checking if unloadRedshiftTableWithMultibyteCharacter() is complete
INFO: Informed scheduler that task   unloadRedshiftTableWithMultibyteCharacter__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 34802] Worker Worker(salt=082720086, workers=1, host=HL00088.local, username=kajiwarayutaka, pid=34802) running   unloadRedshiftTableWithMultibyteCharacter()
INFO: Executing unload query from task: <class '__main__.unloadRedshiftTableWithMultibyteCharacter'>
INFO: UNLOAD
INFO: [pid 34802] Worker Worker(salt=082720086, workers=1, host=HL00088.local, username=kajiwarayutaka, pid=34802) done      unloadRedshiftTableWithMultibyteCharacter()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   unloadRedshiftTableWithMultibyteCharacter__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=082720086, workers=1, host=HL00088.local, username=kajiwarayutaka, pid=34802) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 unloadRedshiftTableWithMultibyteCharacter()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

$ aws s3 ls s3://cm-kajiwara-redshift-unload
2016-11-27 22:56:13   92720647 emploee000.gz

Unloadコマンドを用いてS3に抽出されていることがわかります。
こちらも日本語テーブルを対象にしましたが、特に問題無さそうです。

まとめ

Redshiftに対するUnloadも特にタスクで用意されているため、 実装なしで行うことができました。
明日はRedsfhitへのUPDATE/DELETEを行いたいと思います。