Redshiftからのデータ取得してみた | Luigi Advent Calendar 2016 #13

2016.12.13

はじめに

好物はインフラとフロントエンドのかじわらゆたかです。

このエントリは『Luigi Advent Calendar 2016』13日目の内容となります。
今回から数回に渡ってRedshiftをLuigiで操作していきます。
初回は普通にデータ習得になります。

先日12日目はSQLServerからのデータ取得してみたでした。

今回はRedshiftから値を取得し、ローカルに保存するタスク実装していきます。
また、このLuigiのRedshiftモジュールは内部的にはPostgresのモジュールを使っているものになるため、
Postgresでのデータ習得も同様にできるものになります。

下準備編 テストデータ

以前TimestampTZで用いたサンプルのTSVファイルとテーブルを用いたいと思います。
Amazon Redshift: 新しいデータ型『タイムゾーン付きタイムスタンプ(TIMESTAMPTZ)』をつかってみた。 | Developers.IO 以下のTSVファイルと、テーブルを用いました。

Sample.tsv

UTC(BASIC0000)  20161025T000000Z    20161025T000000Z    20161025T000000Z
UTC(BASIC0100)  20161025T010000Z    20161025T010000Z    20161025T010000Z
UTC(BASIC0200)  20161025T020000Z    20161025T020000Z    20161025T020000Z
UTC(BASIC0300)  20161025T030000Z    20161025T030000Z    20161025T030000Z
UTC(BASIC0400)  20161025T040000Z    20161025T040000Z    20161025T040000Z
UTC(BASIC0500)  20161025T050000Z    20161025T050000Z    20161025T050000Z
UTC(BASIC0600)  20161025T060000Z    20161025T060000Z    20161025T060000Z
UTC(BASIC0700)  20161025T070000Z    20161025T070000Z    20161025T070000Z
UTC(BASIC0800)  20161025T080000Z    20161025T080000Z    20161025T080000Z
UTC(BASIC0900)  20161025T090000Z    20161025T090000Z    20161025T090000Z
UTC(BASIC1000)  20161025T100000Z    20161025T100000Z    20161025T100000Z
UTC(BASIC1100)  20161025T110000Z    20161025T110000Z    20161025T110000Z
UTC(BASIC1200)  20161025T120000Z    20161025T120000Z    20161025T120000Z
UTC(BASIC1300)  20161025T130000Z    20161025T130000Z    20161025T130000Z
UTC(BASIC1400)  20161025T140000Z    20161025T140000Z    20161025T140000Z
UTC(BASIC1500)  20161025T150000Z    20161025T150000Z    20161025T150000Z
UTC(BASIC1600)  20161025T160000Z    20161025T160000Z    20161025T160000Z
UTC(BASIC1700)  20161025T170000Z    20161025T170000Z    20161025T170000Z
UTC(BASIC1800)  20161025T180000Z    20161025T180000Z    20161025T180000Z
UTC(BASIC1900)  20161025T190000Z    20161025T190000Z    20161025T190000Z
UTC(BASIC2000)  20161025T200000Z    20161025T200000Z    20161025T200000Z
UTC(BASIC2100)  20161025T210000Z    20161025T210000Z    20161025T210000Z
UTC(BASIC2200)  20161025T220000Z    20161025T220000Z    20161025T220000Z
UTC(BASIC2300)  20161025T230000Z    20161025T230000Z    20161025T230000Z
CREATE TABLE timestamp_check(
    abbreviation VARCHAR(25),
    string VARCHAR(50),
    timestamp_value TIMESTAMP,
    timestamptz_value TIMESTAMPTZ);

下準備編 psycopg2

LuigiのRedshiftモジュールはPythonのPostgresql向けモジュールであるpsycopg2をラップした形になっています。
その為、動かすためにはpsycopg2の導入が必要です。これは単純にpipから導入が行なえます。

$ pip install psycopg2
Collecting psycopg2
  Downloading psycopg2-2.6.2.tar.gz (376kB)
    100% |████████████████████████████████| 378kB 260kB/s
Building wheels for collected packages: psycopg2
  Running setup.py bdist_wheel for psycopg2 ... done
  Stored in directory: /Users/kajiwarayutaka/Library/Caches/pip/wheels/49/47/2a/5c3f874990ce267228c2df
e7a0589f3b0651aa590e329ad382
Successfully built psycopg2
Installing collected packages: psycopg2
Successfully installed psycopg2-2.6.2
$ pip list
(省略)
psycopg2 (2.6.2)
(省略)

Redshiftからのデータ習得

MySQL SQLServerに比べて比較的に楽にここまできました。
今回の作例では、Timezoneの設定もやってみたいと思います。
まずは、単純な抽出を実装してみます。

useRedshift.py

# -*- coding: utf-8 -*-
from logging import getLogger, StreamHandler, DEBUG

import psycopg2
import psycopg2.extras
import luigi
import luigi.contrib.redshift
logger = getLogger(__name__)
handler = StreamHandler()
handler.setLevel(DEBUG)
logger.setLevel(DEBUG)
logger.addHandler(handler)


class extractRedshiftTable(luigi.Task):
    def run(self):
        redshiftTarget = luigi.contrib.redshift.RedshiftTarget(
            host="[RedshiftHostName]:5439",
            database="[RedshiftDatabase]",
            user="[RedshiftUserName]",
            password="[RedshiftPassword]",
            table="timestamp_check",
            update_id="sample")
        connection = redshiftTarget.connect()
        cursor = connection.cursor(cursor_factory=psycopg2.extras.DictCursor)
        try:
            cursor.execute("SELECT * FROM timestamp_check")

            with self.output().open('w') as out_file:
                for row in cursor:
                    out_file.write(
                        "{abbreviation}\t{string}\t{timestamptz_value}\n".
                        format(**row))

        except:
            print "Select query error"

    def output(run):
        return luigi.LocalTarget(path="timestamp_check.tsv")


if __name__ == '__main__':
    luigi.run()

実装内容としてはMySQLで実装した際とほとんど変わっていません。
注意点としては結果をDictionaryで取得したい際に、cursorに渡す引数が異なるくらいでしょうか。

実行結果

$ python ./useRedshift.py --local-scheduler extractRedshiftTable
$ cat ./employees.tsv
JST(BASIC0000)	20161025T000000+0900	2016-10-24 15:00:00+00:00
JST(BASIC0100)	20161025T010000+0900	2016-10-24 16:00:00+00:00
JST(BASIC0200)	20161025T020000+0900	2016-10-24 17:00:00+00:00
JST(BASIC0300)	20161025T030000+0900	2016-10-24 18:00:00+00:00
JST(BASIC0400)	20161025T040000+0900	2016-10-24 19:00:00+00:00
JST(BASIC0500)	20161025T050000+0900	2016-10-24 20:00:00+00:00
JST(BASIC0600)	20161025T060000+0900	2016-10-24 21:00:00+00:00
JST(BASIC0700)	20161025T070000+0900	2016-10-24 22:00:00+00:00
JST(BASIC0800)	20161025T080000+0900	2016-10-24 23:00:00+00:00
JST(BASIC0900)	20161025T090000+0900	2016-10-25 00:00:00+00:00
JST(BASIC1000)	20161025T100000+0900	2016-10-25 01:00:00+00:00
JST(BASIC1100)	20161025T110000+0900	2016-10-25 02:00:00+00:00
JST(BASIC1200)	20161025T120000+0900	2016-10-25 03:00:00+00:00
JST(BASIC1300)	20161025T130000+0900	2016-10-25 04:00:00+00:00
JST(BASIC1400)	20161025T140000+0900	2016-10-25 05:00:00+00:00
JST(BASIC1500)	20161025T150000+0900	2016-10-25 06:00:00+00:00
JST(BASIC1600)	20161025T160000+0900	2016-10-25 07:00:00+00:00
JST(BASIC1700)	20161025T170000+0900	2016-10-25 08:00:00+00:00
JST(BASIC1800)	20161025T180000+0900	2016-10-25 09:00:00+00:00
JST(BASIC1900)	20161025T190000+0900	2016-10-25 10:00:00+00:00
JST(BASIC2000)	20161025T200000+0900	2016-10-25 11:00:00+00:00
JST(BASIC2100)	20161025T210000+0900	2016-10-25 12:00:00+00:00
JST(BASIC2200)	20161025T220000+0900	2016-10-25 13:00:00+00:00
JST(BASIC2300)	20161025T230000+0900	2016-10-25 14:00:00+00:00
UTC(BASIC0000)	20161025T000000Z	2016-10-25 00:00:00+00:00
UTC(BASIC0100)	20161025T010000Z	2016-10-25 01:00:00+00:00
UTC(BASIC0200)	20161025T020000Z	2016-10-25 02:00:00+00:00
UTC(BASIC0300)	20161025T030000Z	2016-10-25 03:00:00+00:00
UTC(BASIC0400)	20161025T040000Z	2016-10-25 04:00:00+00:00
UTC(BASIC0500)	20161025T050000Z	2016-10-25 05:00:00+00:00
UTC(BASIC0600)	20161025T060000Z	2016-10-25 06:00:00+00:00
UTC(BASIC0700)	20161025T070000Z	2016-10-25 07:00:00+00:00
UTC(BASIC0800)	20161025T080000Z	2016-10-25 08:00:00+00:00
UTC(BASIC0900)	20161025T090000Z	2016-10-25 09:00:00+00:00
UTC(BASIC1000)	20161025T100000Z	2016-10-25 10:00:00+00:00
UTC(BASIC1100)	20161025T110000Z	2016-10-25 11:00:00+00:00
UTC(BASIC1200)	20161025T120000Z	2016-10-25 12:00:00+00:00
UTC(BASIC1300)	20161025T130000Z	2016-10-25 13:00:00+00:00
UTC(BASIC1400)	20161025T140000Z	2016-10-25 14:00:00+00:00
UTC(BASIC1500)	20161025T150000Z	2016-10-25 15:00:00+00:00
UTC(BASIC1600)	20161025T160000Z	2016-10-25 16:00:00+00:00
UTC(BASIC1700)	20161025T170000Z	2016-10-25 17:00:00+00:00
UTC(BASIC1800)	20161025T180000Z	2016-10-25 18:00:00+00:00
UTC(BASIC1900)	20161025T190000Z	2016-10-25 19:00:00+00:00
UTC(BASIC2000)	20161025T200000Z	2016-10-25 20:00:00+00:00
UTC(BASIC2100)	20161025T210000Z	2016-10-25 21:00:00+00:00
UTC(BASIC2200)	20161025T220000Z	2016-10-25 22:00:00+00:00
UTC(BASIC2300)	20161025T230000Z	2016-10-25 23:00:00+00:00

当然、Timezoneの値はセットしていないので、UTCの値で出力されていることがわかります。
以前Timestamptzの記事でTimezoneのセットは以下のようにしていました。

DELETE * FROM timestamp_check;
COPY timestamp_check 
(abbreviation,string,timestamp_value,timestamptz_value) 
FROM 's3://cm-kajiwara-redshift-load/Sample.tsv' 
CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789012:role/redshift-role'
DELIMITER AS '\t' TIMEFORMAT AS 'auto';
SET timezone to 'Asia/Tokyo';
SELECT * FROM timestamp_check;

Luigiでやる際も普通にSelectを実行するのと同じように、Set句を用いて設定すれば可能です。

        try:
            cursor.execute("set timezone='Asia/Tokyo';")
            cursor.execute("SELECT * FROM timestamp_check order by abbreviation")

            with self.output().open('w') as out_file:
                for row in cursor:
                    out_file.write(
                        u"{abbreviation}\t{string}\t{timestamptz_value}\n".
                        format(**row))

実行結果

$ python ./useRedshift.py --local-scheduler extractRedshiftTable
$ cat ./employees.tsv
JST(BASIC0000)	20161025T000000+0900	2016-10-25 00:00:00+09:00
JST(BASIC0100)	20161025T010000+0900	2016-10-25 01:00:00+09:00
JST(BASIC0200)	20161025T020000+0900	2016-10-25 02:00:00+09:00

正しく、JSTのタイムゾーンの値として抽出できていることがわかります。

まとめ

まずはRedsfhitからのデータ取得を行いました。
明日は、日本語名のテーブル・カラムからの習得を行います。