この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
はじめに
好物はインフラとフロントエンドのかじわらゆたかです。
このエントリは『Luigi Advent Calendar 2016』15日目の内容となります。
今回はRedshiftへのUnloadを実施します。
先日14日目はRedshiftからのデータ取得してみた(日本語テーブル・カラム抽出)でした。
LuigiのRedshift実装を確認していたところ、Unload用のタスクが実装されていたので、
こちらを使ってみたいと思います。
下準備 サンプルデータ
前回同様、日本語名のテーブル・カラムに対して、 Redshiftチュートリアルのデータを 投入したテーブルを用います。
CREATE TABLE 顧客情報
(
顧客ID INTEGER NOT NULL,
氏名 VARCHAR(25) NOT NULL,
住所 VARCHAR(25) NOT NULL,
市 VARCHAR(10) NOT NULL,
国 VARCHAR(15) NOT NULL,
地域 VARCHAR(12) NOT NULL,
電話番号 VARCHAR(15) NOT NULL,
市場区分 VARCHAR(10) NOT NULL
);
copy 顧客情報 from 's3://awssampledbuswest2/ssbgz/customer'
CREDENTIALS 'aws_iam_role=arn:aws:iam::nnnnnnnnnnnn:role/redshift-role-mmmmmmmmm'
gzip compupdate off region 'us-west-2';
LuigiのUnloadタスクを用いてRedshiftからデータ抽出を行なってみた。
LuigiのUnloadタスクを用いた実装をGithubで確認してみたところ、Luigiのソースコードとそのテストコードくらいしか無かったので、
テストコードの実装を参考に実装していきたいと思います。
実装方針としては、luigi.contrib.redshift.RedshiftUnloadTaskを継承したクラスを作っていき、必要な接続情報等をプロパティにセットし、 最後queryメソッドとして、抽出に用いるSQL文をかけば抽出が行える模様です。
userS3.py
# -*- coding: utf-8 -*-
from logging import getLogger, StreamHandler, DEBUG
import psycopg2
import psycopg2.extras
import luigi
import luigi.contrib.redshift
logger = getLogger(__name__)
handler = StreamHandler()
handler.setLevel(DEBUG)
logger.setLevel(DEBUG)
logger.addHandler(handler)
class unloadRedshiftTableWithMultibyteCharacter (luigi.contrib.redshift.RedshiftUnloadTask):
host = "[RedshiftHostName]:5439"
database = "[RedshiftDatabase]"
user = "[RedshiftUserName]"
password = "[RedshiftPassword]"
table = "顧客情報"
bucket = "cm-kajiwara-redshift-unload"
key = "emploee"
aws_access_key_id = "[aws_access_key_id]"
aws_secret_access_key = "[aws_secret_access_key]"
s3_unload_path = 's3://%s/%s' % (bucket, key)
unload_options = "DELIMITER ',' ADDQUOTES GZIP ALLOWOVERWRITE PARALLEL OFF"
def query(self):
return "SELECT 顧客id,氏名,住所,電話番号 FROM 顧客情報 ORDER BY 顧客id
if __name__ == '__main__':
luigi.run(
実行結果
$ python ./useRedshift.py --local-scheduler unloadRedshiftTableWithMultibyteCharacter
DEBUG: Checking if unloadRedshiftTableWithMultibyteCharacter() is complete
INFO: Informed scheduler that task unloadRedshiftTableWithMultibyteCharacter__99914b932b has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 34802] Worker Worker(salt=082720086, workers=1, host=HL00088.local, username=kajiwarayutaka, pid=34802) running unloadRedshiftTableWithMultibyteCharacter()
INFO: Executing unload query from task: <class '__main__.unloadRedshiftTableWithMultibyteCharacter'>
INFO: UNLOAD
INFO: [pid 34802] Worker Worker(salt=082720086, workers=1, host=HL00088.local, username=kajiwarayutaka, pid=34802) done unloadRedshiftTableWithMultibyteCharacter()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task unloadRedshiftTableWithMultibyteCharacter__99914b932b has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=082720086, workers=1, host=HL00088.local, username=kajiwarayutaka, pid=34802) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 1 tasks of which:
* 1 ran successfully:
- 1 unloadRedshiftTableWithMultibyteCharacter()
This progress looks :) because there were no failed tasks or missing external dependencies
===== Luigi Execution Summary =====
$ aws s3 ls s3://cm-kajiwara-redshift-unload
2016-11-27 22:56:13 92720647 emploee000.gz
Unloadコマンドを用いてS3に抽出されていることがわかります。
こちらも日本語テーブルを対象にしましたが、特に問題無さそうです。
まとめ
Redshiftに対するUnloadも特にタスクで用意されているため、
実装なしで行うことができました。
明日はRedsfhitへのUPDATE/DELETEを行いたいと思います。