Redshiftからのデータ取得してみた(日本語テーブル・カラム抽出) | Luigi Advent Calendar 2016 #14

2016.12.14

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

好物はインフラとフロントエンドのかじわらゆたかです。

このエントリは『Luigi Advent Calendar 2016』14日目の内容となります。
少し前にあった日本語のデータオブジェクトの多言語対応状況について調べてみます。

先日13日目はRedshiftからのデータ取得してみたでした。

先日Redshiftにデータベースオブジェクト名の多言語の対応がきました。
【速報】Amazon Redshiftのデータベースオブジェクト名が多言語に対応しました! | Developers.IO
これについてLuigiでも問題なく取り込めるのか確認してみたいと思います。

下準備 サンプルデータ

今回多言語の対応として、日本語名のテーブル・カラムを用意してLuigiから参照できるかを確認したいと思います。
以下のようなクエリでテーブルを作成し、その後AwsのRedshiftチュートリアルで公開されているデータを取り込みました。

CREATE TABLE 顧客情報
(
  顧客ID INTEGER     NOT NULL,
  氏名   VARCHAR(25) NOT NULL,
  住所   VARCHAR(25) NOT NULL,
  市    VARCHAR(10) NOT NULL,
  国    VARCHAR(15) NOT NULL,
  地域   VARCHAR(12) NOT NULL,
  電話番号 VARCHAR(15) NOT NULL,
  市場区分 VARCHAR(10) NOT NULL
);

copy 顧客情報 from 's3://awssampledbuswest2/ssbgz/customer' 
CREDENTIALS 'aws_iam_role=arn:aws:iam::nnnnnnnnnnnn:role/redshift-role-mmmmmmmmm'
gzip compupdate off region 'us-west-2';

日本語名テーブル・カラムからデータ参照をしてみた。

コードの内容としては、前回のRedshiftからの参照とほぼ一緒になります。 日本語カラムの対応も確認したいため、SELECT文でカラムの指定も行なっています。

useRedshift.py

# -*- coding: utf-8 -*-
from logging import getLogger, StreamHandler, DEBUG

import psycopg2
import psycopg2.extras
import luigi
import luigi.contrib.redshift
logger = getLogger(__name__)
handler = StreamHandler()
handler.setLevel(DEBUG)
logger.setLevel(DEBUG)
logger.addHandler(handler)


class extractRedshiftTableWithMultibyteCharacter(luigi.Task):
    def run(self):
        redshiftTarget = luigi.contrib.redshift.RedshiftTarget(
            host="[RedshiftHostName]:5439",
            database="[RedshiftDatabase]",
            user="[RedshiftUserName]",
            password="[RedshiftPassword]",
            table="顧客情報",
            update_id="sample")
        connection = redshiftTarget.connect()
        cursor = connection.cursor(cursor_factory=psycopg2.extras.DictCursor)
        try:
            cursor.execute("SELECT 顧客id,'*****' as 氏名,住所,電話番号 FROM 顧客情報 ORDER BY 顧客id LIMIT 1000")
            with self.output().open('w') as out_file:
                for row in cursor:
                    writeValue = "{顧客id}\t{氏名}\t{住所}\t{電話番号}\n".format(**row)
                    out_file.write(writeValue)

        except:
            print "Select query error"

    def output(run):
        return luigi.LocalTarget(
            path="use_JapaneseTableName_ColumnName_check.tsv")


if __name__ == '__main__':
    luigi.run()

実行結果

$ python ./useRedshift.py --local-scheduler extractRedshiftTableWithMultibyteCharacter
$ cat ./use_JapaneseTableName_ColumnName_check.tsv
1	*****	IVhzIApeRb	25-989-741-2988
2	*****	XSTf4,NCwDVaWNe6tE	23-768-687-3665
3	*****	MG9kdTD	11-719-748-3364
4	*****	XxVSJsL	14-128-190-5944
5	*****	KvpyuHCplrB84WgAi	13-750-942-6364
6	*****	sKZz0CsnMD7mp4Xd0YrBvx	30-114-968-4951
7	*****	TcGe5gaZNgVePxU5kR	28-190-982-9759
8	*****	I0B10bB0AymmC, 0PrRYBC	27-147-574-9335
9	*****	xKiAFTjUsCuxfele	18-338-906-3675
10	*****	6LrEaV6KR6PLVcgl2ArL 	15-741-346-9870
11	*****	PkWS 3HlXqwTuz	33-464-151-3439
12	*****	9PWKuhzT	23-791-276-1263
13	*****	nsXQu0oVjD7PM6	13-761-547-5974
(以下省略)

まとめ

抽出に関わるコードについては何の変更も無しに、日本語名テーブル・カラムからの抽出が行えました。 PythonのDictionaryのKeyに関しても問題なくアクセスできて、抽出が行えています。
LuigiでRedshiftとのやりとりで、特に意識することでなく日本語名テーブル・カラムを扱うことができそうです。 明日はRedshiftへUnloadを実行させてみたいと思います。