この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
はじめに
好物はインフラとフロントエンドのかじわらゆたかです。
このエントリは『Luigi Advent Calendar 2016』16日目の内容となります。
先日15日目はRedshiftからUnloadを用いてデータ取得してみたでした。
前回まででSELECT UNLOADを試しました。
UPDATE/DELETEといった、それ以外のクエリの書き方等を調べてみたいと思います。
その中で、Luigiでのトランザクションの用い方等も調べていきたいと思います。
下準備
以下のテストデータとそれを取り込めるテーブルを用意して、データは読み込ませておきます。
customer0.tsv
1 Customer#000000001 IVhzIApeRb MOROCCO 0 MOROCCO AFRICA 25-989-741-2988 BUILDING
2 Customer#000000002 XSTf4,NCwDVaWNe6tE JORDAN 1 JORDAN MIDDLE EAST 23-768-687-3665 AUTOMOBILE
3 Customer#000000003 MG9kdTD ARGENTINA7 ARGENTINA AMERICA 11-719-748-3364 AUTOMOBILE
4 Customer#000000004 XxVSJsL EGYPT 4 EGYPT MIDDLE EAST 14-128-190-5944 MACHINERY
5 Customer#000000005 KvpyuHCplrB84WgAi CANADA 5 CANADA AMERICA 13-750-942-6364 HOUSEHOLD
6 Customer#000000006 sKZz0CsnMD7mp4Xd0YrBvx SAUDI ARA2 SAUDI ARABIA MIDDLE EAST 30-114-968-4951 AUTOMOBILE
7 Customer#000000007 TcGe5gaZNgVePxU5kR CHINA 0 CHINA ASIA 28-190-982-9759 AUTOMOBILE
8 Customer#000000008 I0B10bB0AymmC, 0PrRYBC PERU 6 PERU AMERICA 27-147-574-9335 BUILDING
9 Customer#000000009 xKiAFTjUsCuxfele INDIA 6 INDIA ASIA 18-338-906-3675 FURNITURE
10 Customer#000000010 6LrEaV6KR6PLVcgl2ArL ETHIOPIA 9 ETHIOPIA AFRICA 15-741-346-9870 HOUSEHOL
CREATE TABLE 顧客情報
(
顧客ID INTEGER NOT NULL,
氏名 VARCHAR(25) NOT NULL,
住所 VARCHAR(25) NOT NULL,
市 VARCHAR(10) NOT NULL,
国 VARCHAR(15) NOT NULL,
地域 VARCHAR(12) NOT NULL,
電話番号 VARCHAR(15) NOT NULL,
市場区分 VARCHAR(10) NOT NULL
);
copy 顧客情報 from 's3://cm-kajiwara-redshift-copy/customer0.tsv'
CREDENTIALS 'aws_iam_role=arn:aws:iam::nnnnnnnnnnnn:role/redshift-role-mmmmmmmmm'
DELIMITER AS '\t' TIMEFORMAT AS 'auto' region AS 'ap-northeast-1';;
UPDATE / DELETE文の実行
こちらはSELECTの時と同様にconnectionからqueryメソッドをもちいることで、実行可能です。
SELECTの時と異なる点としては、RedshiftTargetが実際に用いているpsycopg2の実装はクエリ発行時にトランザクションを貼っているため、
明示的にcommitをする必要があると言った点です。commitをしないともちろん、データは反映されません。
useRedshift.py
import luigi
import luigi.contrib.redshift
class updateRedshiftTable(luigi.Task):
host = "[Redshift Host]"
database = "[Redshift database]"
password = "[Redshift password]"
user = "[Redshift user]"
table = "顧客情報"
def run(self):
output = self.output()
connection = output.connect()
cursor = connection.cursor(cursor_factory=psycopg2.extras.DictCursor)
try:
cursor.execute("update {} SET 氏名 = 'UpdateCustomer#000000002' WHERE 顧客ID=1".format(self.table))
output.touch(connection)
connection.commit()
# commit and clean up
connection.close()
except:
logger.warning("UPDATE query error")
raise
def output(self):
return luigi.contrib.redshift.RedshiftTarget(
host=self.host,
database=self.database,
password=self.password,
user=self.user,
table=self.table,
update_id="updateRedshiftTable")
import luigi
import luigi.contrib.redshift
class deleteRedshiftTable(luigi.Task):
host = "[Redshift Host]"
database = "[Redshift database]"
password = "[Redshift password]"
user = "[Redshift user]"
table = "顧客情報"
def run(self):
output = self.output()
connection = output.connect()
cursor = connection.cursor(cursor_factory=psycopg2.extras.DictCursor)
try:
cursor.execute("delete from {} WHERE 顧客ID=2".format(self.table))
output.touch(connection)
connection.commit()
# commit and clean up
connection.close()
except:
logger.warning("DELETE query error")
raise
def output(self):
return luigi.contrib.redshift.RedshiftTarget(
host=self.host,
database=self.database,
password=self.password,
user=self.user,
table=self.table,
update_id="deleteRedshiftTable")
実行結果
顧客id | 氏名 | 住所 | 市 | 国 | 地域 | 電話番号 | 市場区分 |
---|---|---|---|---|---|---|---|
1 | UpdateCustomer#000000002 | IVhzIApeRb | MOROCCO 0 | MOROCCO | AFRICA | 25-989-741-2988 | BUILDING |
3 | Customer#000000003 | MG9kdTD | ARGENTINA7 | ARGENTINA | AMERICA | 11-719-748-3364 | AUTOMOBILE |
4 | Customer#000000004 | XxVSJsL | EGYPT 4 | EGYPT | MIDDLE EAST | 14-128-190-5944 | MACHINERY |
5 | Customer#000000005 | KvpyuHCplrB84WgAi | CANADA 5 | CANADA | AMERICA | 13-750-942-6364 | HOUSEHOLD |
6 | Customer#000000006 | sKZz0CsnMD7mp4Xd0YrBvx | SAUDI ARA2 | SAUDI ARABIA | MIDDLE EAST | 30-114-968-4951 | AUTOMOBILE |
7 | Customer#000000007 | TcGe5gaZNgVePxU5kR | CHINA 0 | CHINA | ASIA | 28-190-982-9759 | AUTOMOBILE |
8 | Customer#000000008 | I0B10bB0AymmC, 0PrRYBC | PERU 6 | PERU | AMERICA | 27-147-574-9335 | BUILDING |
9 | Customer#000000009 | xKiAFTjUsCuxfele | INDIA 6 | INDIA | ASIA | 18-338-906-3675 | FURNITURE |
10 | Customer#000000010 | 6LrEaV6KR6PLVcgl2ArL | ETHIOPIA 9 | ETHIOPIA | AFRICA | 15-741-346-9870 | HOUSEHOLD |
顧客id 1 の氏名が更新されていて、顧客id 2のレコードが消えていることがわかります。
トランザクションのロールバック
先ほども書きましたが、LuigiのRedshiftモジュールがラップしているpsycopg2はデフォルトでトランザクションを貼っています。
今度は更新・削除を実施後、結果がロールバックしていることを確認してみたいと思います。
import luigi
import luigi.contrib.redshift
class rollbackRedshiftTable(luigi.Task):
host = "[Redshift Host]"
database = "[Redshift database]"
password = "[Redshift password]"
user = "[Redshift user]"
table = "顧客情報"
def run(self):
output = self.output()
connection = output.connect()
cursor = connection.cursor(cursor_factory=psycopg2.extras.DictCursor)
try:
cursor.execute("update {} SET 氏名 = 'RbkCustomer#000000002' WHERE 顧客ID=3".format(self.table))
cursor.execute("delete from {} WHERE 顧客ID=4".format(self.table))
output.touch(connection)
connection.rollback()
# commit and clean up
connection.close()
except:
logger.warning("Rallback query error")
raise
def output(self):
return luigi.contrib.redshift.RedshiftTarget(
host=self.host,
database=self.database,
password=self.password,
user=self.user,
table=self.table,
update_id="rollbackRedshiftTable")
実装としては、commitしていた個所をrollbackメソッドに変更しています。
このようにすることで、変更はコミットされず、ロールバックされることになります。
実行結果に付いては割愛します。
まとめ
基本的なクエリの受け渡しもpsycopg2の仕様の問題はありましたが、それ以外は特に問題なく行えそうです。
明日からはRedshiftへのCopyコマンドの実行を行なってみたいと思います。