RedshiftへのCopyをIAM ROLEを用いる形にしてみた | Luigi Advent Calendar 2016 #20

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

好物はインフラとフロントエンドのかじわらゆたかです。
このエントリは『Luigi Advent Calendar 2016』20日目の内容となります。
今回はLuigiのRedshiftモジュールのCopyの挙動を書き換えてみたいと思います。

先日19日目はRedshiftに対してCopyコマンドを発行してみた(JSON取り込み)でした。

下準備

RedshiftがIAM Role対応になったのは記憶に新しいと思うのですが、まだまだIAM Role対応をフレームワーク側のモジュールで提供しているのは数少なく、 Luigiもご多分に漏れずIAM Roleを用いてのCopyには対応しておりません。
なかなかこういったフレームワークのモジュール内部の実装に手を出すのは容易では無いことが多いのですが、
Luigiは実装を呼んでみたところ手を出せそうなため、IAM Role対応を行ってみたいと思います。

# -*- coding: utf-8 -*-
import abc
import luigi
import luigi.contrib.redshift
from logging import getLogger, StreamHandler, DEBUG
logger = getLogger(__name__)
handler = StreamHandler()
handler.setLevel(DEBUG)
logger.setLevel(DEBUG)
logger.addHandler(handler)


class S3CopyWithIAMRole(luigi.contrib.redshift.S3CopyToTable):
    @abc.abstractmethod
    def iam_role_arn(self):
        """
        Override to return IAMRole name.
        """
        return None

    @property
    def aws_access_key_id(self):
        return self.task_id

    @property
    def aws_secret_access_key(self):
        return self.task_id

    def copy(self, cursor, f):
            """
            Defines copying from s3 into redshift with IAM role.
            """

            logger.info("Inserting file: %s", f)
            cursor.execute("""
            COPY %s from '%s'
            CREDENTIALS 'aws_iam_role=%s'
            %s
            ;""" % (self.table, f,
                    self.iam_role_arn, self.copy_options))

IAM Role対応と大手を振りましたが、実際には40行足らずで対応できてしまいました。
あとは、上記のコードを呼ぶようにImportをして呼べばIAM Roleを用いたCopyコマンドの実行が可能です。

実際に動かした際の構成と、動かした側のコードが以下になります。

.
├── contrib
│   ├── __init__.py
│   └── redshift_ext.py
└── useRedshift.py
# -*- coding: utf-8 -*-
import luigi
import luigi.contrib.redshift
import contrib.redshift_ext

class copyRedshiftTableFromS3WithIamRole(contrib.redshift_ext.S3CopyWithIAMRole):

    s3FilePath = luigi.Parameter()
    param = {}
    param["host"] = "[Redshift Host]"
    param["database"] = "[Redshift database]"
    param["password"] = "[Redshift Password]"
    param["user"] = "[Redshift User]"
    param["table"] = "顧客情報"
    param["s3_load_path"] = "s3://cm-kajiwara-redshift-load/{}"
    param["copy_options"] = "region AS 'ap-northeast-1' DELIMITER AS '\t' TIMEFORMAT AS 'auto' "
    param["iam_role_arn"] = "arn:aws:iam::nnnnnnnnnnnn:role/redshift-role-mmmmmmmmm"

    @property
    def host(self):
        return self.param["host"]

    @property
    def database(self):
        return self.param["database"]

    @property
    def user(self):
        return self.param["user"]

    @property
    def password(self):
        return self.param["password"]

    @property
    def table(self):
        return self.param["table"]

    def s3_load_path(self):
        return self.param["s3_load_path"].format(self.s3FilePath)

    @property
    def iam_role_arn(self):
        return self.param["iam_role_arn"]

    @property
    def copy_options(self):
        return self.param["copy_options"]

実行結果については、IAM userのCredentialを用いたときと変わらないため、省略します。

まとめ

Luigiのモジュールを元にRedshiftに対してIAM Roleを用いたCopyを実施しました。 次回は通知機能についてしらべてみたいとおもいます。