Redshiftからのデータ取得してみた | Luigi Advent Calendar 2016 #13
はじめに
好物はインフラとフロントエンドのかじわらゆたかです。
このエントリは『Luigi Advent Calendar 2016』13日目の内容となります。
今回から数回に渡ってRedshiftをLuigiで操作していきます。
初回は普通にデータ習得になります。
先日12日目はSQLServerからのデータ取得してみたでした。
今回はRedshiftから値を取得し、ローカルに保存するタスク実装していきます。
また、このLuigiのRedshiftモジュールは内部的にはPostgresのモジュールを使っているものになるため、
Postgresでのデータ習得も同様にできるものになります。
下準備編 テストデータ
以前TimestampTZで用いたサンプルのTSVファイルとテーブルを用いたいと思います。
Amazon Redshift: 新しいデータ型『タイムゾーン付きタイムスタンプ(TIMESTAMPTZ)』をつかってみた。 | Developers.IO
以下のTSVファイルと、テーブルを用いました。
UTC(BASIC0000) 20161025T000000Z 20161025T000000Z 20161025T000000Z UTC(BASIC0100) 20161025T010000Z 20161025T010000Z 20161025T010000Z UTC(BASIC0200) 20161025T020000Z 20161025T020000Z 20161025T020000Z UTC(BASIC0300) 20161025T030000Z 20161025T030000Z 20161025T030000Z UTC(BASIC0400) 20161025T040000Z 20161025T040000Z 20161025T040000Z UTC(BASIC0500) 20161025T050000Z 20161025T050000Z 20161025T050000Z UTC(BASIC0600) 20161025T060000Z 20161025T060000Z 20161025T060000Z UTC(BASIC0700) 20161025T070000Z 20161025T070000Z 20161025T070000Z UTC(BASIC0800) 20161025T080000Z 20161025T080000Z 20161025T080000Z UTC(BASIC0900) 20161025T090000Z 20161025T090000Z 20161025T090000Z UTC(BASIC1000) 20161025T100000Z 20161025T100000Z 20161025T100000Z UTC(BASIC1100) 20161025T110000Z 20161025T110000Z 20161025T110000Z UTC(BASIC1200) 20161025T120000Z 20161025T120000Z 20161025T120000Z UTC(BASIC1300) 20161025T130000Z 20161025T130000Z 20161025T130000Z UTC(BASIC1400) 20161025T140000Z 20161025T140000Z 20161025T140000Z UTC(BASIC1500) 20161025T150000Z 20161025T150000Z 20161025T150000Z UTC(BASIC1600) 20161025T160000Z 20161025T160000Z 20161025T160000Z UTC(BASIC1700) 20161025T170000Z 20161025T170000Z 20161025T170000Z UTC(BASIC1800) 20161025T180000Z 20161025T180000Z 20161025T180000Z UTC(BASIC1900) 20161025T190000Z 20161025T190000Z 20161025T190000Z UTC(BASIC2000) 20161025T200000Z 20161025T200000Z 20161025T200000Z UTC(BASIC2100) 20161025T210000Z 20161025T210000Z 20161025T210000Z UTC(BASIC2200) 20161025T220000Z 20161025T220000Z 20161025T220000Z UTC(BASIC2300) 20161025T230000Z 20161025T230000Z 20161025T230000Z
CREATE TABLE timestamp_check( abbreviation VARCHAR(25), string VARCHAR(50), timestamp_value TIMESTAMP, timestamptz_value TIMESTAMPTZ);
下準備編 psycopg2
LuigiのRedshiftモジュールはPythonのPostgresql向けモジュールであるpsycopg2をラップした形になっています。
その為、動かすためにはpsycopg2の導入が必要です。これは単純にpipから導入が行なえます。
$ pip install psycopg2 Collecting psycopg2 Downloading psycopg2-2.6.2.tar.gz (376kB) 100% |████████████████████████████████| 378kB 260kB/s Building wheels for collected packages: psycopg2 Running setup.py bdist_wheel for psycopg2 ... done Stored in directory: /Users/kajiwarayutaka/Library/Caches/pip/wheels/49/47/2a/5c3f874990ce267228c2df e7a0589f3b0651aa590e329ad382 Successfully built psycopg2 Installing collected packages: psycopg2 Successfully installed psycopg2-2.6.2 $ pip list (省略) psycopg2 (2.6.2) (省略)
Redshiftからのデータ習得
MySQL SQLServerに比べて比較的に楽にここまできました。
今回の作例では、Timezoneの設定もやってみたいと思います。
まずは、単純な抽出を実装してみます。
# -*- coding: utf-8 -*- from logging import getLogger, StreamHandler, DEBUG import psycopg2 import psycopg2.extras import luigi import luigi.contrib.redshift logger = getLogger(__name__) handler = StreamHandler() handler.setLevel(DEBUG) logger.setLevel(DEBUG) logger.addHandler(handler) class extractRedshiftTable(luigi.Task): def run(self): redshiftTarget = luigi.contrib.redshift.RedshiftTarget( host="[RedshiftHostName]:5439", database="[RedshiftDatabase]", user="[RedshiftUserName]", password="[RedshiftPassword]", table="timestamp_check", update_id="sample") connection = redshiftTarget.connect() cursor = connection.cursor(cursor_factory=psycopg2.extras.DictCursor) try: cursor.execute("SELECT * FROM timestamp_check") with self.output().open('w') as out_file: for row in cursor: out_file.write( "{abbreviation}\t{string}\t{timestamptz_value}\n". format(**row)) except: print "Select query error" def output(run): return luigi.LocalTarget(path="timestamp_check.tsv") if __name__ == '__main__': luigi.run()
実装内容としてはMySQLで実装した際とほとんど変わっていません。
注意点としては結果をDictionaryで取得したい際に、cursorに渡す引数が異なるくらいでしょうか。
実行結果
$ python ./useRedshift.py --local-scheduler extractRedshiftTable $ cat ./employees.tsv JST(BASIC0000) 20161025T000000+0900 2016-10-24 15:00:00+00:00 JST(BASIC0100) 20161025T010000+0900 2016-10-24 16:00:00+00:00 JST(BASIC0200) 20161025T020000+0900 2016-10-24 17:00:00+00:00 JST(BASIC0300) 20161025T030000+0900 2016-10-24 18:00:00+00:00 JST(BASIC0400) 20161025T040000+0900 2016-10-24 19:00:00+00:00 JST(BASIC0500) 20161025T050000+0900 2016-10-24 20:00:00+00:00 JST(BASIC0600) 20161025T060000+0900 2016-10-24 21:00:00+00:00 JST(BASIC0700) 20161025T070000+0900 2016-10-24 22:00:00+00:00 JST(BASIC0800) 20161025T080000+0900 2016-10-24 23:00:00+00:00 JST(BASIC0900) 20161025T090000+0900 2016-10-25 00:00:00+00:00 JST(BASIC1000) 20161025T100000+0900 2016-10-25 01:00:00+00:00 JST(BASIC1100) 20161025T110000+0900 2016-10-25 02:00:00+00:00 JST(BASIC1200) 20161025T120000+0900 2016-10-25 03:00:00+00:00 JST(BASIC1300) 20161025T130000+0900 2016-10-25 04:00:00+00:00 JST(BASIC1400) 20161025T140000+0900 2016-10-25 05:00:00+00:00 JST(BASIC1500) 20161025T150000+0900 2016-10-25 06:00:00+00:00 JST(BASIC1600) 20161025T160000+0900 2016-10-25 07:00:00+00:00 JST(BASIC1700) 20161025T170000+0900 2016-10-25 08:00:00+00:00 JST(BASIC1800) 20161025T180000+0900 2016-10-25 09:00:00+00:00 JST(BASIC1900) 20161025T190000+0900 2016-10-25 10:00:00+00:00 JST(BASIC2000) 20161025T200000+0900 2016-10-25 11:00:00+00:00 JST(BASIC2100) 20161025T210000+0900 2016-10-25 12:00:00+00:00 JST(BASIC2200) 20161025T220000+0900 2016-10-25 13:00:00+00:00 JST(BASIC2300) 20161025T230000+0900 2016-10-25 14:00:00+00:00 UTC(BASIC0000) 20161025T000000Z 2016-10-25 00:00:00+00:00 UTC(BASIC0100) 20161025T010000Z 2016-10-25 01:00:00+00:00 UTC(BASIC0200) 20161025T020000Z 2016-10-25 02:00:00+00:00 UTC(BASIC0300) 20161025T030000Z 2016-10-25 03:00:00+00:00 UTC(BASIC0400) 20161025T040000Z 2016-10-25 04:00:00+00:00 UTC(BASIC0500) 20161025T050000Z 2016-10-25 05:00:00+00:00 UTC(BASIC0600) 20161025T060000Z 2016-10-25 06:00:00+00:00 UTC(BASIC0700) 20161025T070000Z 2016-10-25 07:00:00+00:00 UTC(BASIC0800) 20161025T080000Z 2016-10-25 08:00:00+00:00 UTC(BASIC0900) 20161025T090000Z 2016-10-25 09:00:00+00:00 UTC(BASIC1000) 20161025T100000Z 2016-10-25 10:00:00+00:00 UTC(BASIC1100) 20161025T110000Z 2016-10-25 11:00:00+00:00 UTC(BASIC1200) 20161025T120000Z 2016-10-25 12:00:00+00:00 UTC(BASIC1300) 20161025T130000Z 2016-10-25 13:00:00+00:00 UTC(BASIC1400) 20161025T140000Z 2016-10-25 14:00:00+00:00 UTC(BASIC1500) 20161025T150000Z 2016-10-25 15:00:00+00:00 UTC(BASIC1600) 20161025T160000Z 2016-10-25 16:00:00+00:00 UTC(BASIC1700) 20161025T170000Z 2016-10-25 17:00:00+00:00 UTC(BASIC1800) 20161025T180000Z 2016-10-25 18:00:00+00:00 UTC(BASIC1900) 20161025T190000Z 2016-10-25 19:00:00+00:00 UTC(BASIC2000) 20161025T200000Z 2016-10-25 20:00:00+00:00 UTC(BASIC2100) 20161025T210000Z 2016-10-25 21:00:00+00:00 UTC(BASIC2200) 20161025T220000Z 2016-10-25 22:00:00+00:00 UTC(BASIC2300) 20161025T230000Z 2016-10-25 23:00:00+00:00
当然、Timezoneの値はセットしていないので、UTCの値で出力されていることがわかります。
以前Timestamptzの記事でTimezoneのセットは以下のようにしていました。
DELETE * FROM timestamp_check; COPY timestamp_check (abbreviation,string,timestamp_value,timestamptz_value) FROM 's3://cm-kajiwara-redshift-load/Sample.tsv' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789012:role/redshift-role' DELIMITER AS '\t' TIMEFORMAT AS 'auto'; SET timezone to 'Asia/Tokyo'; SELECT * FROM timestamp_check;
Luigiでやる際も普通にSelectを実行するのと同じように、Set句を用いて設定すれば可能です。
try: cursor.execute("set timezone='Asia/Tokyo';") cursor.execute("SELECT * FROM timestamp_check order by abbreviation") with self.output().open('w') as out_file: for row in cursor: out_file.write( u"{abbreviation}\t{string}\t{timestamptz_value}\n". format(**row))
実行結果
$ python ./useRedshift.py --local-scheduler extractRedshiftTable $ cat ./employees.tsv JST(BASIC0000) 20161025T000000+0900 2016-10-25 00:00:00+09:00 JST(BASIC0100) 20161025T010000+0900 2016-10-25 01:00:00+09:00 JST(BASIC0200) 20161025T020000+0900 2016-10-25 02:00:00+09:00
正しく、JSTのタイムゾーンの値として抽出できていることがわかります。
まとめ
まずはRedsfhitからのデータ取得を行いました。
明日は、日本語名のテーブル・カラムからの習得を行います。