Redshiftに対してCopyコマンドを発行してみた | Luigi Advent Calendar 2016 #17

2016.12.17

はじめに

好物はインフラとフロントエンドのかじわらゆたかです。

このエントリは『Luigi Advent Calendar 2016』17日目の内容となります。
今回はRedshiftに対してCopyコマンドを発行してみたいと思います。

先日16日目はRedshiftに対してのUPDATEとDELETEを実施してみたでした。

前回まででRedshiftに対する基本的なアクセス方法について確認しました。
今回はCopyコマンドを用いたデータ取り込みについて実装してみます。

下準備

前回使ったTSVファイルとテーブルに対して、今回はジョブからデータ投入をしていきます。

customer0.tsv

1	Customer#000000001	IVhzIApeRb	MOROCCO  0	MOROCCO	AFRICA	25-989-741-2988	BUILDING
2	Customer#000000002	XSTf4,NCwDVaWNe6tE	JORDAN   1	JORDAN	MIDDLE EAST	23-768-687-3665	AUTOMOBILE
3	Customer#000000003	MG9kdTD	ARGENTINA7	ARGENTINA	AMERICA	11-719-748-3364	AUTOMOBILE
4	Customer#000000004	XxVSJsL	EGYPT    4	EGYPT	MIDDLE EAST	14-128-190-5944	MACHINERY
5	Customer#000000005	KvpyuHCplrB84WgAi	CANADA   5	CANADA	AMERICA	13-750-942-6364	HOUSEHOLD
6	Customer#000000006	sKZz0CsnMD7mp4Xd0YrBvx	SAUDI ARA2	SAUDI ARABIA	MIDDLE EAST	30-114-968-4951	AUTOMOBILE
7	Customer#000000007	TcGe5gaZNgVePxU5kR	CHINA    0	CHINA	ASIA	28-190-982-9759	AUTOMOBILE
8	Customer#000000008	I0B10bB0AymmC, 0PrRYBC	PERU     6	PERU	AMERICA	27-147-574-9335	BUILDING
9	Customer#000000009	xKiAFTjUsCuxfele	INDIA    6	INDIA	ASIA	18-338-906-3675	FURNITURE
10	Customer#000000010	6LrEaV6KR6PLVcgl2ArL 	ETHIOPIA 9	ETHIOPIA	AFRICA	15-741-346-9870	HOUSEHOL
CREATE TABLE 顧客情報
(
  顧客ID INTEGER     NOT NULL,
  氏名   VARCHAR(25) NOT NULL,
  住所   VARCHAR(25) NOT NULL,
  市    VARCHAR(10) NOT NULL,
  国    VARCHAR(15) NOT NULL,
  地域   VARCHAR(12) NOT NULL,
  電話番号 VARCHAR(15) NOT NULL,
  市場区分 VARCHAR(10) NOT NULL
);

Redshiftに対してCopyコマンドを発行してみた

実装するとこんな感じになります。

useRedshift.py

import luigi
import luigi.contrib.redshift


class copyRedshiftTableFromS3(luigi.contrib.redshift.S3CopyToTable):

    param = {}
    param["host"] = "[Redshift Host]"
    param["database"] = "[Redshift database]"
    param["password"] = "[Redshift Password]"
    param["user"] = "[Redshift User]"
    param["table"] = "顧客情報"
    param["s3_load_path"] = "s3://cm-kajiwara-redshift-load/{}
    param["copy_options"] = "region AS 'ap-northeast-1' DELIMITER AS '\t' TIMEFORMAT AS 'auto' "
    param["aws_access_key_id"] = "["aws_access_key_id"]"
    param["aws_secret_access_key"] = "["aws_secret_access_key"]"

    @property
    def host(self):
        return self.param["host"]

    @property
    def database(self):
        return self.param["database"]

    @property
    def user(self):
        return self.param["user"]

    @property
    def password(self):
        return self.param["password"]

    @property
    def table(self):
        return self.param["table"]

    def s3_load_path(self):
        return self.param["s3_load_path"]

    @property
    def aws_access_key_id(self):
        return self.param["aws_access_key_id"]

    @property
    def aws_secret_access_key(self):
        return self.param["aws_secret_access_key"]

    @property
    def copy_options(self):
        return self.param["copy_options

必用なパラメータをプロパティとしてセットしていく形になります。
ここで一点だけ注意が必用なのですがs3_load_pathだけはプロパティとしてではなく、
メソッドとして実装する必要があります。

これは継承元のluigi.contrib.redshift.S3CopyToTableに寄るものなのですが、
s3_load_pathだけはプロパティの参照ではなく、メソッドの戻り値と敷いて値を取得しているためこのような書き方になっています。

また、このクラスはconnectionを作り、管理テーブル(table_updates)に書き込みコミットまで行います。
複数のテーブルのCopyを行いたいといったケースに於いては、仮テーブルを作成しておいて、
最後にAlter Tableで反映させると言った方法が必要になるものとなります。

実行結果は以下のようになります。

$ python ./useRedshift.py --local-scheduler copyRedshiftTableFromS3 --s3FilePath customer0.tsv
DEBUG: Checking if copyRedshiftTableFromS3(s3FilePath=customer0.tsv) is complete
INFO: Informed scheduler that task   copyRedshiftTableFromS3_customer0_tsv_c0c96df995   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 36386] Worker Worker(salt=766678766, workers=1, host=ip-172-18-0-107.ap-northeast-1.compute.internal, username=kajiwarayutaka, pid=36386) running   copyRedshiftTableFromS3(s3FilePath=customer0.tsv)
INFO: Inserting file: s3://cm-kajiwara-redshift-load/customer0.tsv
INFO: Executing post copy queries
INFO: [pid 36386] Worker Worker(salt=766678766, workers=1, host=ip-172-18-0-107.ap-northeast-1.compute.internal, username=kajiwarayutaka, pid=36386) done      copyRedshiftTableFromS3(s3FilePath=customer0.tsv)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   copyRedshiftTableFromS3_customer0_tsv_c0c96df995   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=766678766, workers=1, host=ip-172-18-0-107.ap-northeast-1.compute.internal, username=kajiwarayutaka, pid=36386) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 copyRedshiftTableFromS3(s3FilePath=customer0.tsv)

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====
顧客id 氏名 住所 地域 電話番号 市場区分
1 Customer#000000001 IVhzIApeRb MOROCCO 0 MOROCCO AFRICA 25-989-741-2988 BUILDING
2 Customer#000000002 XSTf4,NCwDVaWNe6tE JORDAN 1 JORDAN MIDDLE EAST 23-768-687-3665 AUTOMOBILE
3 Customer#000000003 MG9kdTD ARGENTINA7 ARGENTINA AMERICA 11-719-748-3364 AUTOMOBILE
4 Customer#000000004 XxVSJsL EGYPT 4 EGYPT MIDDLE EAST 14-128-190-5944 MACHINERY
5 Customer#000000005 KvpyuHCplrB84WgAi CANADA 5 CANADA AMERICA 13-750-942-6364 HOUSEHOLD
6 Customer#000000006 sKZz0CsnMD7mp4Xd0YrBvx SAUDI ARA2 SAUDI ARABIA MIDDLE EAST 30-114-968-4951 AUTOMOBILE
7 Customer#000000007 TcGe5gaZNgVePxU5kR CHINA 0 CHINA ASIA 28-190-982-9759 AUTOMOBILE
8 Customer#000000008 I0B10bB0AymmC, 0PrRYBC PERU 6 PERU AMERICA 27-147-574-9335 BUILDING
9 Customer#000000009 xKiAFTjUsCuxfele INDIA 6 INDIA ASIA 18-338-906-3675 FURNITURE
10 Customer#000000010 6LrEaV6KR6PLVcgl2ArL ETHIOPIA 9 ETHIOPIA AFRICA 15-741-346-9870 HOUSEHOLD

まとめ

Copyコマンドの実行についても、一部Luigi側のモジュールの癖は有りましたが問題なく実行できました。
次回はCopyコマンドに用いるManifestの生成を行います。