RedshiftからUnloadを用いてデータ取得してみた | Luigi Advent Calendar 2016 #15
はじめに
好物はインフラとフロントエンドのかじわらゆたかです。
このエントリは『Luigi Advent Calendar 2016』15日目の内容となります。
今回はRedshiftへのUnloadを実施します。
先日14日目はRedshiftからのデータ取得してみた(日本語テーブル・カラム抽出)でした。
LuigiのRedshift実装を確認していたところ、Unload用のタスクが実装されていたので、
こちらを使ってみたいと思います。
下準備 サンプルデータ
前回同様、日本語名のテーブル・カラムに対して、 Redshiftチュートリアルのデータを 投入したテーブルを用います。
CREATE TABLE 顧客情報 ( 顧客ID INTEGER NOT NULL, 氏名 VARCHAR(25) NOT NULL, 住所 VARCHAR(25) NOT NULL, 市 VARCHAR(10) NOT NULL, 国 VARCHAR(15) NOT NULL, 地域 VARCHAR(12) NOT NULL, 電話番号 VARCHAR(15) NOT NULL, 市場区分 VARCHAR(10) NOT NULL ); copy 顧客情報 from 's3://awssampledbuswest2/ssbgz/customer' CREDENTIALS 'aws_iam_role=arn:aws:iam::nnnnnnnnnnnn:role/redshift-role-mmmmmmmmm' gzip compupdate off region 'us-west-2';
LuigiのUnloadタスクを用いてRedshiftからデータ抽出を行なってみた。
LuigiのUnloadタスクを用いた実装をGithubで確認してみたところ、Luigiのソースコードとそのテストコードくらいしか無かったので、
テストコードの実装を参考に実装していきたいと思います。
実装方針としては、luigi.contrib.redshift.RedshiftUnloadTaskを継承したクラスを作っていき、必要な接続情報等をプロパティにセットし、 最後queryメソッドとして、抽出に用いるSQL文をかけば抽出が行える模様です。
# -*- coding: utf-8 -*- from logging import getLogger, StreamHandler, DEBUG import psycopg2 import psycopg2.extras import luigi import luigi.contrib.redshift logger = getLogger(__name__) handler = StreamHandler() handler.setLevel(DEBUG) logger.setLevel(DEBUG) logger.addHandler(handler) class unloadRedshiftTableWithMultibyteCharacter (luigi.contrib.redshift.RedshiftUnloadTask): host = "[RedshiftHostName]:5439" database = "[RedshiftDatabase]" user = "[RedshiftUserName]" password = "[RedshiftPassword]" table = "顧客情報" bucket = "cm-kajiwara-redshift-unload" key = "emploee" aws_access_key_id = "[aws_access_key_id]" aws_secret_access_key = "[aws_secret_access_key]" s3_unload_path = 's3://%s/%s' % (bucket, key) unload_options = "DELIMITER ',' ADDQUOTES GZIP ALLOWOVERWRITE PARALLEL OFF" def query(self): return "SELECT 顧客id,氏名,住所,電話番号 FROM 顧客情報 ORDER BY 顧客id if __name__ == '__main__': luigi.run(
実行結果
$ python ./useRedshift.py --local-scheduler unloadRedshiftTableWithMultibyteCharacter DEBUG: Checking if unloadRedshiftTableWithMultibyteCharacter() is complete INFO: Informed scheduler that task unloadRedshiftTableWithMultibyteCharacter__99914b932b has status PENDING INFO: Done scheduling tasks INFO: Running Worker with 1 processes DEBUG: Asking scheduler for work... DEBUG: Pending tasks: 1 INFO: [pid 34802] Worker Worker(salt=082720086, workers=1, host=HL00088.local, username=kajiwarayutaka, pid=34802) running unloadRedshiftTableWithMultibyteCharacter() INFO: Executing unload query from task: <class '__main__.unloadRedshiftTableWithMultibyteCharacter'> INFO: UNLOAD INFO: [pid 34802] Worker Worker(salt=082720086, workers=1, host=HL00088.local, username=kajiwarayutaka, pid=34802) done unloadRedshiftTableWithMultibyteCharacter() DEBUG: 1 running tasks, waiting for next task to finish INFO: Informed scheduler that task unloadRedshiftTableWithMultibyteCharacter__99914b932b has status DONE DEBUG: Asking scheduler for work... DEBUG: Done DEBUG: There are no more tasks to run at this time INFO: Worker Worker(salt=082720086, workers=1, host=HL00088.local, username=kajiwarayutaka, pid=34802) was stopped. Shutting down Keep-Alive thread INFO: ===== Luigi Execution Summary ===== Scheduled 1 tasks of which: * 1 ran successfully: - 1 unloadRedshiftTableWithMultibyteCharacter() This progress looks :) because there were no failed tasks or missing external dependencies ===== Luigi Execution Summary ===== $ aws s3 ls s3://cm-kajiwara-redshift-unload 2016-11-27 22:56:13 92720647 emploee000.gz
Unloadコマンドを用いてS3に抽出されていることがわかります。
こちらも日本語テーブルを対象にしましたが、特に問題無さそうです。
まとめ
Redshiftに対するUnloadも特にタスクで用意されているため、
実装なしで行うことができました。
明日はRedsfhitへのUPDATE/DELETEを行いたいと思います。