Redshiftからのデータ取得してみた | Luigi Advent Calendar 2016 #13
はじめに
好物はインフラとフロントエンドのかじわらゆたかです。
このエントリは『Luigi Advent Calendar 2016』13日目の内容となります。
今回から数回に渡ってRedshiftをLuigiで操作していきます。
初回は普通にデータ習得になります。
先日12日目はSQLServerからのデータ取得してみたでした。
今回はRedshiftから値を取得し、ローカルに保存するタスク実装していきます。
また、このLuigiのRedshiftモジュールは内部的にはPostgresのモジュールを使っているものになるため、
Postgresでのデータ習得も同様にできるものになります。
下準備編 テストデータ
以前TimestampTZで用いたサンプルのTSVファイルとテーブルを用いたいと思います。
Amazon Redshift: 新しいデータ型『タイムゾーン付きタイムスタンプ(TIMESTAMPTZ)』をつかってみた。 | Developers.IO
以下のTSVファイルと、テーブルを用いました。

CREATE TABLE timestamp_check( abbreviation VARCHAR(25), string VARCHAR(50), timestamp_value TIMESTAMP, timestamptz_value TIMESTAMPTZ);
下準備編 psycopg2
LuigiのRedshiftモジュールはPythonのPostgresql向けモジュールであるpsycopg2をラップした形になっています。
その為、動かすためにはpsycopg2の導入が必要です。これは単純にpipから導入が行なえます。
$ pip install psycopg2 Collecting psycopg2 Downloading psycopg2-2.6.2.tar.gz (376kB) 100% |████████████████████████████████| 378kB 260kB/s Building wheels for collected packages: psycopg2 Running setup.py bdist_wheel for psycopg2 ... done Stored in directory: /Users/kajiwarayutaka/Library/Caches/pip/wheels/49/47/2a/5c3f874990ce267228c2df e7a0589f3b0651aa590e329ad382 Successfully built psycopg2 Installing collected packages: psycopg2 Successfully installed psycopg2-2.6.2 $ pip list (省略) psycopg2 (2.6.2) (省略)
Redshiftからのデータ習得
MySQL SQLServerに比べて比較的に楽にここまできました。
今回の作例では、Timezoneの設定もやってみたいと思います。
まずは、単純な抽出を実装してみます。
# -*- coding: utf-8 -*- from logging import getLogger, StreamHandler, DEBUG import psycopg2 import psycopg2.extras import luigi import luigi.contrib.redshift logger = getLogger(__name__) handler = StreamHandler() handler.setLevel(DEBUG) logger.setLevel(DEBUG) logger.addHandler(handler) class extractRedshiftTable(luigi.Task): def run(self): redshiftTarget = luigi.contrib.redshift.RedshiftTarget( host="[RedshiftHostName]:5439", database="[RedshiftDatabase]", user="[RedshiftUserName]", password="[RedshiftPassword]", table="timestamp_check", update_id="sample") connection = redshiftTarget.connect() cursor = connection.cursor(cursor_factory=psycopg2.extras.DictCursor) try: cursor.execute("SELECT * FROM timestamp_check") with self.output().open('w') as out_file: for row in cursor: out_file.write( "{abbreviation}\t{string}\t{timestamptz_value}\n". format(**row)) except: print "Select query error" def output(run): return luigi.LocalTarget(path="timestamp_check.tsv") if __name__ == '__main__': luigi.run()
実装内容としてはMySQLで実装した際とほとんど変わっていません。
注意点としては結果をDictionaryで取得したい際に、cursorに渡す引数が異なるくらいでしょうか。
実行結果
$ python ./useRedshift.py --local-scheduler extractRedshiftTable $ cat ./employees.tsv JST(BASIC0000) 20161025T000000+0900 2016-10-24 15:00:00+00:00 JST(BASIC0100) 20161025T010000+0900 2016-10-24 16:00:00+00:00 JST(BASIC0200) 20161025T020000+0900 2016-10-24 17:00:00+00:00 JST(BASIC0300) 20161025T030000+0900 2016-10-24 18:00:00+00:00 JST(BASIC0400) 20161025T040000+0900 2016-10-24 19:00:00+00:00 JST(BASIC0500) 20161025T050000+0900 2016-10-24 20:00:00+00:00 JST(BASIC0600) 20161025T060000+0900 2016-10-24 21:00:00+00:00 JST(BASIC0700) 20161025T070000+0900 2016-10-24 22:00:00+00:00 JST(BASIC0800) 20161025T080000+0900 2016-10-24 23:00:00+00:00 JST(BASIC0900) 20161025T090000+0900 2016-10-25 00:00:00+00:00 JST(BASIC1000) 20161025T100000+0900 2016-10-25 01:00:00+00:00 JST(BASIC1100) 20161025T110000+0900 2016-10-25 02:00:00+00:00 JST(BASIC1200) 20161025T120000+0900 2016-10-25 03:00:00+00:00 JST(BASIC1300) 20161025T130000+0900 2016-10-25 04:00:00+00:00 JST(BASIC1400) 20161025T140000+0900 2016-10-25 05:00:00+00:00 JST(BASIC1500) 20161025T150000+0900 2016-10-25 06:00:00+00:00 JST(BASIC1600) 20161025T160000+0900 2016-10-25 07:00:00+00:00 JST(BASIC1700) 20161025T170000+0900 2016-10-25 08:00:00+00:00 JST(BASIC1800) 20161025T180000+0900 2016-10-25 09:00:00+00:00 JST(BASIC1900) 20161025T190000+0900 2016-10-25 10:00:00+00:00 JST(BASIC2000) 20161025T200000+0900 2016-10-25 11:00:00+00:00 JST(BASIC2100) 20161025T210000+0900 2016-10-25 12:00:00+00:00 JST(BASIC2200) 20161025T220000+0900 2016-10-25 13:00:00+00:00 JST(BASIC2300) 20161025T230000+0900 2016-10-25 14:00:00+00:00 UTC(BASIC0000) 20161025T000000Z 2016-10-25 00:00:00+00:00 UTC(BASIC0100) 20161025T010000Z 2016-10-25 01:00:00+00:00 UTC(BASIC0200) 20161025T020000Z 2016-10-25 02:00:00+00:00 UTC(BASIC0300) 20161025T030000Z 2016-10-25 03:00:00+00:00 UTC(BASIC0400) 20161025T040000Z 2016-10-25 04:00:00+00:00 UTC(BASIC0500) 20161025T050000Z 2016-10-25 05:00:00+00:00 UTC(BASIC0600) 20161025T060000Z 2016-10-25 06:00:00+00:00 UTC(BASIC0700) 20161025T070000Z 2016-10-25 07:00:00+00:00 UTC(BASIC0800) 20161025T080000Z 2016-10-25 08:00:00+00:00 UTC(BASIC0900) 20161025T090000Z 2016-10-25 09:00:00+00:00 UTC(BASIC1000) 20161025T100000Z 2016-10-25 10:00:00+00:00 UTC(BASIC1100) 20161025T110000Z 2016-10-25 11:00:00+00:00 UTC(BASIC1200) 20161025T120000Z 2016-10-25 12:00:00+00:00 UTC(BASIC1300) 20161025T130000Z 2016-10-25 13:00:00+00:00 UTC(BASIC1400) 20161025T140000Z 2016-10-25 14:00:00+00:00 UTC(BASIC1500) 20161025T150000Z 2016-10-25 15:00:00+00:00 UTC(BASIC1600) 20161025T160000Z 2016-10-25 16:00:00+00:00 UTC(BASIC1700) 20161025T170000Z 2016-10-25 17:00:00+00:00 UTC(BASIC1800) 20161025T180000Z 2016-10-25 18:00:00+00:00 UTC(BASIC1900) 20161025T190000Z 2016-10-25 19:00:00+00:00 UTC(BASIC2000) 20161025T200000Z 2016-10-25 20:00:00+00:00 UTC(BASIC2100) 20161025T210000Z 2016-10-25 21:00:00+00:00 UTC(BASIC2200) 20161025T220000Z 2016-10-25 22:00:00+00:00 UTC(BASIC2300) 20161025T230000Z 2016-10-25 23:00:00+00:00
当然、Timezoneの値はセットしていないので、UTCの値で出力されていることがわかります。
以前Timestamptzの記事でTimezoneのセットは以下のようにしていました。
DELETE * FROM timestamp_check; COPY timestamp_check (abbreviation,string,timestamp_value,timestamptz_value) FROM 's3://cm-kajiwara-redshift-load/Sample.tsv' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789012:role/redshift-role' DELIMITER AS '\t' TIMEFORMAT AS 'auto'; SET timezone to 'Asia/Tokyo'; SELECT * FROM timestamp_check;
Luigiでやる際も普通にSelectを実行するのと同じように、Set句を用いて設定すれば可能です。
try: cursor.execute("set timezone='Asia/Tokyo';") cursor.execute("SELECT * FROM timestamp_check order by abbreviation") with self.output().open('w') as out_file: for row in cursor: out_file.write( u"{abbreviation}\t{string}\t{timestamptz_value}\n". format(**row))
実行結果
$ python ./useRedshift.py --local-scheduler extractRedshiftTable $ cat ./employees.tsv JST(BASIC0000) 20161025T000000+0900 2016-10-25 00:00:00+09:00 JST(BASIC0100) 20161025T010000+0900 2016-10-25 01:00:00+09:00 JST(BASIC0200) 20161025T020000+0900 2016-10-25 02:00:00+09:00
正しく、JSTのタイムゾーンの値として抽出できていることがわかります。
まとめ
まずはRedsfhitからのデータ取得を行いました。
明日は、日本語名のテーブル・カラムからの習得を行います。