この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
はじめに
好物はインフラとフロントエンドのかじわらゆたかです。
このエントリは『Luigi Advent Calendar 2016』20日目の内容となります。
今回はLuigiのRedshiftモジュールのCopyの挙動を書き換えてみたいと思います。
先日19日目はRedshiftに対してCopyコマンドを発行してみた(JSON取り込み)でした。
下準備
RedshiftがIAM Role対応になったのは記憶に新しいと思うのですが、まだまだIAM Role対応をフレームワーク側のモジュールで提供しているのは数少なく、
Luigiもご多分に漏れずIAM Roleを用いてのCopyには対応しておりません。
なかなかこういったフレームワークのモジュール内部の実装に手を出すのは容易では無いことが多いのですが、
Luigiは実装を呼んでみたところ手を出せそうなため、IAM Role対応を行ってみたいと思います。
contrib/redshift_ext.py
# -*- coding: utf-8 -*-
import abc
import luigi
import luigi.contrib.redshift
from logging import getLogger, StreamHandler, DEBUG
logger = getLogger(__name__)
handler = StreamHandler()
handler.setLevel(DEBUG)
logger.setLevel(DEBUG)
logger.addHandler(handler)
class S3CopyWithIAMRole(luigi.contrib.redshift.S3CopyToTable):
@abc.abstractmethod
def iam_role_arn(self):
"""
Override to return IAMRole name.
"""
return None
@property
def aws_access_key_id(self):
return self.task_id
@property
def aws_secret_access_key(self):
return self.task_id
def copy(self, cursor, f):
"""
Defines copying from s3 into redshift with IAM role.
"""
logger.info("Inserting file: %s", f)
cursor.execute("""
COPY %s from '%s'
CREDENTIALS 'aws_iam_role=%s'
%s
;""" % (self.table, f,
self.iam_role_arn, self.copy_options))
IAM Role対応と大手を振りましたが、実際には40行足らずで対応できてしまいました。
あとは、上記のコードを呼ぶようにImportをして呼べばIAM Roleを用いたCopyコマンドの実行が可能です。
実際に動かした際の構成と、動かした側のコードが以下になります。
.
├── contrib
│ ├── __init__.py
│ └── redshift_ext.py
└── useRedshift.py
useRedshift.py
# -*- coding: utf-8 -*-
import luigi
import luigi.contrib.redshift
import contrib.redshift_ext
class copyRedshiftTableFromS3WithIamRole(contrib.redshift_ext.S3CopyWithIAMRole):
s3FilePath = luigi.Parameter()
param = {}
param["host"] = "[Redshift Host]"
param["database"] = "[Redshift database]"
param["password"] = "[Redshift Password]"
param["user"] = "[Redshift User]"
param["table"] = "顧客情報"
param["s3_load_path"] = "s3://cm-kajiwara-redshift-load/{}"
param["copy_options"] = "region AS 'ap-northeast-1' DELIMITER AS '\t' TIMEFORMAT AS 'auto' "
param["iam_role_arn"] = "arn:aws:iam::nnnnnnnnnnnn:role/redshift-role-mmmmmmmmm"
@property
def host(self):
return self.param["host"]
@property
def database(self):
return self.param["database"]
@property
def user(self):
return self.param["user"]
@property
def password(self):
return self.param["password"]
@property
def table(self):
return self.param["table"]
def s3_load_path(self):
return self.param["s3_load_path"].format(self.s3FilePath)
@property
def iam_role_arn(self):
return self.param["iam_role_arn"]
@property
def copy_options(self):
return self.param["copy_options"]
実行結果については、IAM userのCredentialを用いたときと変わらないため、省略します。
まとめ
Luigiのモジュールを元にRedshiftに対してIAM Roleを用いたCopyを実施しました。 次回は通知機能についてしらべてみたいとおもいます。