この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
はじめに
好物はインフラとフロントエンドのかじわらゆたかです。
このエントリは『Luigi Advent Calendar 2016』17日目の内容となります。
今回はRedshiftに対してCopyコマンドを発行してみたいと思います。
先日16日目はRedshiftに対してのUPDATEとDELETEを実施してみたでした。
前回まででRedshiftに対する基本的なアクセス方法について確認しました。
今回はCopyコマンドを用いたデータ取り込みについて実装してみます。
下準備
前回使ったTSVファイルとテーブルに対して、今回はジョブからデータ投入をしていきます。
customer0.tsv
1 Customer#000000001 IVhzIApeRb MOROCCO 0 MOROCCO AFRICA 25-989-741-2988 BUILDING
2 Customer#000000002 XSTf4,NCwDVaWNe6tE JORDAN 1 JORDAN MIDDLE EAST 23-768-687-3665 AUTOMOBILE
3 Customer#000000003 MG9kdTD ARGENTINA7 ARGENTINA AMERICA 11-719-748-3364 AUTOMOBILE
4 Customer#000000004 XxVSJsL EGYPT 4 EGYPT MIDDLE EAST 14-128-190-5944 MACHINERY
5 Customer#000000005 KvpyuHCplrB84WgAi CANADA 5 CANADA AMERICA 13-750-942-6364 HOUSEHOLD
6 Customer#000000006 sKZz0CsnMD7mp4Xd0YrBvx SAUDI ARA2 SAUDI ARABIA MIDDLE EAST 30-114-968-4951 AUTOMOBILE
7 Customer#000000007 TcGe5gaZNgVePxU5kR CHINA 0 CHINA ASIA 28-190-982-9759 AUTOMOBILE
8 Customer#000000008 I0B10bB0AymmC, 0PrRYBC PERU 6 PERU AMERICA 27-147-574-9335 BUILDING
9 Customer#000000009 xKiAFTjUsCuxfele INDIA 6 INDIA ASIA 18-338-906-3675 FURNITURE
10 Customer#000000010 6LrEaV6KR6PLVcgl2ArL ETHIOPIA 9 ETHIOPIA AFRICA 15-741-346-9870 HOUSEHOL
CREATE TABLE 顧客情報
(
顧客ID INTEGER NOT NULL,
氏名 VARCHAR(25) NOT NULL,
住所 VARCHAR(25) NOT NULL,
市 VARCHAR(10) NOT NULL,
国 VARCHAR(15) NOT NULL,
地域 VARCHAR(12) NOT NULL,
電話番号 VARCHAR(15) NOT NULL,
市場区分 VARCHAR(10) NOT NULL
);
Redshiftに対してCopyコマンドを発行してみた
実装するとこんな感じになります。
useRedshift.py
import luigi
import luigi.contrib.redshift
class copyRedshiftTableFromS3(luigi.contrib.redshift.S3CopyToTable):
param = {}
param["host"] = "[Redshift Host]"
param["database"] = "[Redshift database]"
param["password"] = "[Redshift Password]"
param["user"] = "[Redshift User]"
param["table"] = "顧客情報"
param["s3_load_path"] = "s3://cm-kajiwara-redshift-load/{}
param["copy_options"] = "region AS 'ap-northeast-1' DELIMITER AS '\t' TIMEFORMAT AS 'auto' "
param["aws_access_key_id"] = "["aws_access_key_id"]"
param["aws_secret_access_key"] = "["aws_secret_access_key"]"
@property
def host(self):
return self.param["host"]
@property
def database(self):
return self.param["database"]
@property
def user(self):
return self.param["user"]
@property
def password(self):
return self.param["password"]
@property
def table(self):
return self.param["table"]
def s3_load_path(self):
return self.param["s3_load_path"]
@property
def aws_access_key_id(self):
return self.param["aws_access_key_id"]
@property
def aws_secret_access_key(self):
return self.param["aws_secret_access_key"]
@property
def copy_options(self):
return self.param["copy_options
必用なパラメータをプロパティとしてセットしていく形になります。
ここで一点だけ注意が必用なのですがs3_load_pathだけはプロパティとしてではなく、
メソッドとして実装する必要があります。
これは継承元のluigi.contrib.redshift.S3CopyToTableに寄るものなのですが、
s3_load_pathだけはプロパティの参照ではなく、メソッドの戻り値と敷いて値を取得しているためこのような書き方になっています。
また、このクラスはconnectionを作り、管理テーブル(table_updates)に書き込みコミットまで行います。
複数のテーブルのCopyを行いたいといったケースに於いては、仮テーブルを作成しておいて、
最後にAlter Tableで反映させると言った方法が必要になるものとなります。
実行結果は以下のようになります。
$ python ./useRedshift.py --local-scheduler copyRedshiftTableFromS3 --s3FilePath customer0.tsv
DEBUG: Checking if copyRedshiftTableFromS3(s3FilePath=customer0.tsv) is complete
INFO: Informed scheduler that task copyRedshiftTableFromS3_customer0_tsv_c0c96df995 has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 36386] Worker Worker(salt=766678766, workers=1, host=ip-172-18-0-107.ap-northeast-1.compute.internal, username=kajiwarayutaka, pid=36386) running copyRedshiftTableFromS3(s3FilePath=customer0.tsv)
INFO: Inserting file: s3://cm-kajiwara-redshift-load/customer0.tsv
INFO: Executing post copy queries
INFO: [pid 36386] Worker Worker(salt=766678766, workers=1, host=ip-172-18-0-107.ap-northeast-1.compute.internal, username=kajiwarayutaka, pid=36386) done copyRedshiftTableFromS3(s3FilePath=customer0.tsv)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task copyRedshiftTableFromS3_customer0_tsv_c0c96df995 has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=766678766, workers=1, host=ip-172-18-0-107.ap-northeast-1.compute.internal, username=kajiwarayutaka, pid=36386) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 1 tasks of which:
* 1 ran successfully:
- 1 copyRedshiftTableFromS3(s3FilePath=customer0.tsv)
This progress looks :) because there were no failed tasks or missing external dependencies
===== Luigi Execution Summary =====
顧客id | 氏名 | 住所 | 市 | 国 | 地域 | 電話番号 | 市場区分 |
---|---|---|---|---|---|---|---|
1 | Customer#000000001 | IVhzIApeRb | MOROCCO 0 | MOROCCO | AFRICA | 25-989-741-2988 | BUILDING |
2 | Customer#000000002 | XSTf4,NCwDVaWNe6tE | JORDAN 1 | JORDAN | MIDDLE EAST | 23-768-687-3665 | AUTOMOBILE |
3 | Customer#000000003 | MG9kdTD | ARGENTINA7 | ARGENTINA | AMERICA | 11-719-748-3364 | AUTOMOBILE |
4 | Customer#000000004 | XxVSJsL | EGYPT 4 | EGYPT | MIDDLE EAST | 14-128-190-5944 | MACHINERY |
5 | Customer#000000005 | KvpyuHCplrB84WgAi | CANADA 5 | CANADA | AMERICA | 13-750-942-6364 | HOUSEHOLD |
6 | Customer#000000006 | sKZz0CsnMD7mp4Xd0YrBvx | SAUDI ARA2 | SAUDI ARABIA | MIDDLE EAST | 30-114-968-4951 | AUTOMOBILE |
7 | Customer#000000007 | TcGe5gaZNgVePxU5kR | CHINA 0 | CHINA | ASIA | 28-190-982-9759 | AUTOMOBILE |
8 | Customer#000000008 | I0B10bB0AymmC, 0PrRYBC | PERU 6 | PERU | AMERICA | 27-147-574-9335 | BUILDING |
9 | Customer#000000009 | xKiAFTjUsCuxfele | INDIA 6 | INDIA | ASIA | 18-338-906-3675 | FURNITURE |
10 | Customer#000000010 | 6LrEaV6KR6PLVcgl2ArL | ETHIOPIA 9 | ETHIOPIA | AFRICA | 15-741-346-9870 | HOUSEHOLD |
まとめ
Copyコマンドの実行についても、一部Luigi側のモジュールの癖は有りましたが問題なく実行できました。
次回はCopyコマンドに用いるManifestの生成を行います。