Redshiftに対してのUPDATEとDELETEを実施してみた | Luigi Advent Calendar 2016 #16

2016.12.16

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

好物はインフラとフロントエンドのかじわらゆたかです。

このエントリは『Luigi Advent Calendar 2016』16日目の内容となります。

先日15日目はRedshiftからUnloadを用いてデータ取得してみたでした。

前回まででSELECT UNLOADを試しました。
UPDATE/DELETEといった、それ以外のクエリの書き方等を調べてみたいと思います。
その中で、Luigiでのトランザクションの用い方等も調べていきたいと思います。

下準備

以下のテストデータとそれを取り込めるテーブルを用意して、データは読み込ませておきます。

customer0.tsv

1	Customer#000000001	IVhzIApeRb	MOROCCO  0	MOROCCO	AFRICA	25-989-741-2988	BUILDING
2	Customer#000000002	XSTf4,NCwDVaWNe6tE	JORDAN   1	JORDAN	MIDDLE EAST	23-768-687-3665	AUTOMOBILE
3	Customer#000000003	MG9kdTD	ARGENTINA7	ARGENTINA	AMERICA	11-719-748-3364	AUTOMOBILE
4	Customer#000000004	XxVSJsL	EGYPT    4	EGYPT	MIDDLE EAST	14-128-190-5944	MACHINERY
5	Customer#000000005	KvpyuHCplrB84WgAi	CANADA   5	CANADA	AMERICA	13-750-942-6364	HOUSEHOLD
6	Customer#000000006	sKZz0CsnMD7mp4Xd0YrBvx	SAUDI ARA2	SAUDI ARABIA	MIDDLE EAST	30-114-968-4951	AUTOMOBILE
7	Customer#000000007	TcGe5gaZNgVePxU5kR	CHINA    0	CHINA	ASIA	28-190-982-9759	AUTOMOBILE
8	Customer#000000008	I0B10bB0AymmC, 0PrRYBC	PERU     6	PERU	AMERICA	27-147-574-9335	BUILDING
9	Customer#000000009	xKiAFTjUsCuxfele	INDIA    6	INDIA	ASIA	18-338-906-3675	FURNITURE
10	Customer#000000010	6LrEaV6KR6PLVcgl2ArL 	ETHIOPIA 9	ETHIOPIA	AFRICA	15-741-346-9870	HOUSEHOL
CREATE TABLE 顧客情報
(
  顧客ID INTEGER     NOT NULL,
  氏名   VARCHAR(25) NOT NULL,
  住所   VARCHAR(25) NOT NULL,
  市    VARCHAR(10) NOT NULL,
  国    VARCHAR(15) NOT NULL,
  地域   VARCHAR(12) NOT NULL,
  電話番号 VARCHAR(15) NOT NULL,
  市場区分 VARCHAR(10) NOT NULL
);

copy 顧客情報 from 's3://cm-kajiwara-redshift-copy/customer0.tsv' 
CREDENTIALS 'aws_iam_role=arn:aws:iam::nnnnnnnnnnnn:role/redshift-role-mmmmmmmmm'
DELIMITER AS '\t' TIMEFORMAT AS 'auto' region AS 'ap-northeast-1';;

UPDATE / DELETE文の実行

こちらはSELECTの時と同様にconnectionからqueryメソッドをもちいることで、実行可能です。
SELECTの時と異なる点としては、RedshiftTargetが実際に用いているpsycopg2の実装はクエリ発行時にトランザクションを貼っているため、
明示的にcommitをする必要があると言った点です。commitをしないともちろん、データは反映されません。

useRedshift.py

import luigi
import luigi.contrib.redshift


class updateRedshiftTable(luigi.Task):
    host = "[Redshift Host]"
    database = "[Redshift database]"
    password = "[Redshift password]"
    user = "[Redshift user]"
    table = "顧客情報"

    def run(self):
        output = self.output()
        connection = output.connect()
        cursor = connection.cursor(cursor_factory=psycopg2.extras.DictCursor)
        try:
            cursor.execute("update {} SET 氏名 = 'UpdateCustomer#000000002' WHERE 顧客ID=1".format(self.table))
            output.touch(connection)
            connection.commit()

            # commit and clean up
            connection.close()
        except:
            logger.warning("UPDATE query error")
            raise

    def output(self):
        return luigi.contrib.redshift.RedshiftTarget(
            host=self.host,
            database=self.database,
            password=self.password,
            user=self.user,
            table=self.table,
            update_id="updateRedshiftTable")
import luigi
import luigi.contrib.redshift


class deleteRedshiftTable(luigi.Task):
    host = "[Redshift Host]"
    database = "[Redshift database]"
    password = "[Redshift password]"
    user = "[Redshift user]"
    table = "顧客情報"

    def run(self):
        output = self.output()
        connection = output.connect()
        cursor = connection.cursor(cursor_factory=psycopg2.extras.DictCursor)
        try:
            cursor.execute("delete from  {} WHERE 顧客ID=2".format(self.table))
            output.touch(connection)
            connection.commit()

            # commit and clean up
            connection.close()
        except:
            logger.warning("DELETE query error")
            raise

    def output(self):
        return luigi.contrib.redshift.RedshiftTarget(
            host=self.host,
            database=self.database,
            password=self.password,
            user=self.user,
            table=self.table,
            update_id="deleteRedshiftTable")

実行結果

顧客id 氏名 住所 地域 電話番号 市場区分
1 UpdateCustomer#000000002 IVhzIApeRb MOROCCO 0 MOROCCO AFRICA 25-989-741-2988 BUILDING
3 Customer#000000003 MG9kdTD ARGENTINA7 ARGENTINA AMERICA 11-719-748-3364 AUTOMOBILE
4 Customer#000000004 XxVSJsL EGYPT 4 EGYPT MIDDLE EAST 14-128-190-5944 MACHINERY
5 Customer#000000005 KvpyuHCplrB84WgAi CANADA 5 CANADA AMERICA 13-750-942-6364 HOUSEHOLD
6 Customer#000000006 sKZz0CsnMD7mp4Xd0YrBvx SAUDI ARA2 SAUDI ARABIA MIDDLE EAST 30-114-968-4951 AUTOMOBILE
7 Customer#000000007 TcGe5gaZNgVePxU5kR CHINA 0 CHINA ASIA 28-190-982-9759 AUTOMOBILE
8 Customer#000000008 I0B10bB0AymmC, 0PrRYBC PERU 6 PERU AMERICA 27-147-574-9335 BUILDING
9 Customer#000000009 xKiAFTjUsCuxfele INDIA 6 INDIA ASIA 18-338-906-3675 FURNITURE
10 Customer#000000010 6LrEaV6KR6PLVcgl2ArL ETHIOPIA 9 ETHIOPIA AFRICA 15-741-346-9870 HOUSEHOLD

顧客id 1 の氏名が更新されていて、顧客id 2のレコードが消えていることがわかります。

トランザクションのロールバック

先ほども書きましたが、LuigiのRedshiftモジュールがラップしているpsycopg2はデフォルトでトランザクションを貼っています。
今度は更新・削除を実施後、結果がロールバックしていることを確認してみたいと思います。

import luigi
import luigi.contrib.redshift


class rollbackRedshiftTable(luigi.Task):
    host = "[Redshift Host]"
    database = "[Redshift database]"
    password = "[Redshift password]"
    user = "[Redshift user]"
    table = "顧客情報"

    def run(self):
        output = self.output()
        connection = output.connect()
        cursor = connection.cursor(cursor_factory=psycopg2.extras.DictCursor)
        try:
            cursor.execute("update {} SET 氏名 = 'RbkCustomer#000000002' WHERE 顧客ID=3".format(self.table))
            cursor.execute("delete from  {} WHERE 顧客ID=4".format(self.table))
            output.touch(connection)
            connection.rollback()

            # commit and clean up
            connection.close()
        except:
            logger.warning("Rallback query error")
            raise

    def output(self):
        return luigi.contrib.redshift.RedshiftTarget(
            host=self.host,
            database=self.database,
            password=self.password,
            user=self.user,
            table=self.table,
            update_id="rollbackRedshiftTable")

実装としては、commitしていた個所をrollbackメソッドに変更しています。
このようにすることで、変更はコミットされず、ロールバックされることになります。
実行結果に付いては割愛します。

まとめ

基本的なクエリの受け渡しもpsycopg2の仕様の問題はありましたが、それ以外は特に問題なく行えそうです。
明日からはRedshiftへのCopyコマンドの実行を行なってみたいと思います。