MySQLからRedshiftへLoadするタスクを作ってみた。その2(S3 → Redshift) | Luigi Advent Calendar 2016 #24

2016.12.24

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

好物はインフラとフロントエンドのかじわらゆたかです。
このエントリは『Luigi Advent Calendar 2016』24日目の内容となります。
前日に引き続きMySQLからRedhisftに取り込むタスクを実装していきます。

先日23日目はMySQLからRedshiftへLoadするタスクを作ってみた。その1(MySQL → S3) でした。

前回まででMySQLから抽出した結果をS3に配置すると言ったタスクを実装しました。 今回は前回S3に配置したファイルをRedshiftに取り込むタスクを実装していきます。

S3にからRedshiftへ取り込む

実装結果としては以下になります。

# -*- coding: utf-8 -*-
from logging import getLogger, StreamHandler, DEBUG

import luigi
import luigi.s3
import luigi.contrib.mysqldb
import contrib.redshift_ext
import os
import csv
logger = getLogger(__name__)
handler = StreamHandler()
handler.setLevel(DEBUG)
logger.setLevel(DEBUG)
logger.addHandler(handler)
try:
    import mysql
    from mysql.connector import errorcode
except ImportError as e:
    logger.warning(
        "Loading MySQL module without the python package mysql-connector-python. \
        This will crash at runtime if MySQL functionality is used.")


class SQLFile(luigi.ExternalTask):
(省略)


class extractMySQLTableToS3(luigi.Task):
(省略)


class LoadS3toRedshift(contrib.redshift_ext.S3CopyWithIAMRole):
    sql_file = luigi.Parameter(default="employee.sql")
    redshift_host = luigi.Parameter()
    redshift_database = luigi.Parameter()
    redshift_password = luigi.Parameter()
    redshift_user = luigi.Parameter()
    redshift_copy_options = "region AS 'ap-northeast-1' DELIMITER AS '\t' TIMEFORMAT AS 'auto' IGNOREHEADER 1 GZIP"
    aws_account = luigi.Parameter()
    role_name = luigi.Parameter()

    def requires(self):
        return extractMySQLTableToS3()

    @property
    def host(self):
        return self.redshift_host

    @property
    def database(self):
        return self.redshift_database

    @property
    def user(self):
        return self.redshift_user

    @property
    def password(self):
        return self.redshift_password

    @property
    def table(self):
        return self.sql_file.replace(".sql", "")

    def s3_load_path(self):
        return self.input().path

    @property
    def iam_role_arn(self):
        iam_role_arn = "arn:aws:iam::{}:role/{}".format(self.aws_account, self.role_name)
        return iam_role_arn

    @property
    def copy_options(self):
        return self.redshift_copy_options

    @property
    def columns(self):
        with self.input().open('r') as file_in:
            for line in file_in:
                columns = [x for x in line.split('\t')]
                return columns


if __name__ == '__main__':
    luigi.run()

実装した特徴としては以下になります。

  • 以前実装したIAM Role対応のモジュールを用いて取り込みを行いました。そのためimportでcontrib.redshift_extを取り込んでいます。
  • テーブル名はMySQLからS3に抽出する際に取り込んだファイル名をテーブル名としています。
  • カラムの順番を調整する必要があったので、S3からファイルを取得し、取得したヘッダーをLoadする際のカラムとして指定をしています。

まとめ

ひとまずMySQLからRedshiftに取り込むようなタスクを実装できました。
明日は、今までのアドベントカレンダーを振り返ってみたいと思います。