この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
はじめに
好物はインフラとフロントエンドのかじわらゆたかです。
このエントリは『Luigi Advent Calendar 2016』24日目の内容となります。
前日に引き続きMySQLからRedhisftに取り込むタスクを実装していきます。
先日23日目はMySQLからRedshiftへLoadするタスクを作ってみた。その1(MySQL → S3) でした。
前回まででMySQLから抽出した結果をS3に配置すると言ったタスクを実装しました。 今回は前回S3に配置したファイルをRedshiftに取り込むタスクを実装していきます。
S3にからRedshiftへ取り込む
実装結果としては以下になります。
# -*- coding: utf-8 -*-
from logging import getLogger, StreamHandler, DEBUG
import luigi
import luigi.s3
import luigi.contrib.mysqldb
import contrib.redshift_ext
import os
import csv
logger = getLogger(__name__)
handler = StreamHandler()
handler.setLevel(DEBUG)
logger.setLevel(DEBUG)
logger.addHandler(handler)
try:
import mysql
from mysql.connector import errorcode
except ImportError as e:
logger.warning(
"Loading MySQL module without the python package mysql-connector-python. \
This will crash at runtime if MySQL functionality is used.")
class SQLFile(luigi.ExternalTask):
(省略)
class extractMySQLTableToS3(luigi.Task):
(省略)
class LoadS3toRedshift(contrib.redshift_ext.S3CopyWithIAMRole):
sql_file = luigi.Parameter(default="employee.sql")
redshift_host = luigi.Parameter()
redshift_database = luigi.Parameter()
redshift_password = luigi.Parameter()
redshift_user = luigi.Parameter()
redshift_copy_options = "region AS 'ap-northeast-1' DELIMITER AS '\t' TIMEFORMAT AS 'auto' IGNOREHEADER 1 GZIP"
aws_account = luigi.Parameter()
role_name = luigi.Parameter()
def requires(self):
return extractMySQLTableToS3()
@property
def host(self):
return self.redshift_host
@property
def database(self):
return self.redshift_database
@property
def user(self):
return self.redshift_user
@property
def password(self):
return self.redshift_password
@property
def table(self):
return self.sql_file.replace(".sql", "")
def s3_load_path(self):
return self.input().path
@property
def iam_role_arn(self):
iam_role_arn = "arn:aws:iam::{}:role/{}".format(self.aws_account, self.role_name)
return iam_role_arn
@property
def copy_options(self):
return self.redshift_copy_options
@property
def columns(self):
with self.input().open('r') as file_in:
for line in file_in:
columns = [x for x in line.split('\t')]
return columns
if __name__ == '__main__':
luigi.run()
実装した特徴としては以下になります。
- 以前実装したIAM Role対応のモジュールを用いて取り込みを行いました。そのためimportでcontrib.redshift_extを取り込んでいます。
- テーブル名はMySQLからS3に抽出する際に取り込んだファイル名をテーブル名としています。
- カラムの順番を調整する必要があったので、S3からファイルを取得し、取得したヘッダーをLoadする際のカラムとして指定をしています。
まとめ
ひとまずMySQLからRedshiftに取り込むようなタスクを実装できました。
明日は、今までのアドベントカレンダーを振り返ってみたいと思います。