この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
はじめに
好物はインフラとフロントエンドのかじわらゆたかです。
このエントリは『Luigi Advent Calendar 2016』11日目の内容となります。
今回は、MySQLから値を取得し、ローカルに保存するタスク実装していきます。
先日10日目はS3のファイル一覧習得と削除を行なってみたでした。
下準備編 テストデータ
MySQLのテストデータが有りますので、そちらを用いることとしました。 datacharmer/test_db
適用方法としてはREADMEに書いてあるとおりですが、
上記のリポジトリをCloneしemployees.sqlを流せば適用できます。
下準備 mysql-connector-python
LuigiのMySQLモジュールはmysql-connector-pythonをラップした形で実装されています。
また、mysql-connector-pythonはpipでは取得できないため、
公式のリポジトリからCloneしてインストールをする形をとりました。
$ git clone https://github.com/mysql/mysql-connector-python.git
$ cd mysql/mysql-connector-python
$ python ./setup.py install
$ pip list
(省略)
mysql-connector-python (2.2.1)
(省略)
MySQLからのデータ習得
LuigiのMySQLは機能としては、タスクの結果をMySQLに格納すると言った機能を提供しています。
しかし、それ以外の機能として値の習得といったことも行うことも可能です。
作例ではDictionaryで取得し、
取得したDictionaryのキーで値を抽出しローカルファイルへの出力を行なっています。
# -*- coding: utf-8 -*-
from logging import getLogger, StreamHandler, DEBUG
import luigi
import luigi.contrib.mysqldb
logger = getLogger(__name__)
handler = StreamHandler()
handler.setLevel(DEBUG)
logger.setLevel(DEBUG)
logger.addHandler(handler)
try:
import mysql
from mysql.connector import errorcode
except ImportError as e:
logger.warning(
"Loading MySQL module without the python package mysql-connector-python. \
This will crash at runtime if MySQL functionality is used.")
class extractMySQLTable(luigi.Task):
def run(self):
mySqlTarget = luigi.contrib.mysqldb.MySqlTarget(
host="localhost:13306",
database="employees",
password="root_secret",
user="root",
table="employees",
update_id="sample")
connection = mySqlTarget.connect()
cursor = connection.cursor(dictionary=True)
try:
cursor.execute("""SELECT emp_no,birth_date,first_name,last_name,
(CASE WHEN gender = 'M'
THEN '男'
WHEN gender = 'F'
THEN '女'
ELSE '-' END) as gender,hire_date FROM employees.employees LIMIT 10""")
with self.output().open('w') as out_file:
for row in cursor:
out_file.write(
u"{emp_no}\t{birth_date}\t{first_name}\t{last_name}\t{gender}\n".
format(**row))
except mysql.connector.Error as e:
if e.errno != errorcode.ER_NO_SUCH_TABLE:
raise
def output(run):
return luigi.LocalTarget(
path="employees.tsv", format=luigi.format.UTF8)
if __name__ == '__main__':
luigi.run()
実行結果
$ python ./useMySQL.py --local-scheduler extractMySQLTable
$ cat employees.tsv
10001 1953-09-02 Georgi Facello 男
10002 1964-06-02 Bezalel Simmel 女
10003 1959-12-03 Parto Bamford 男
10004 1954-05-01 Chirstian Koblick 男
10005 1955-01-21 Kyoichi Maliniak 男
10006 1953-04-20 Anneke Preusig 女
10007 1957-05-23 Tzvetan Zielinski 女
10008 1958-02-19 Saniya Kalloufi 男
10009 1952-04-19 Sumant Peac 女
10010 1963-06-01 Duangkaew Piveteau 女
まとめ
今回はMySQLからデータを取得してみました。
ローカルへの配置でしたが、拡張していけばS3への配置等も問題なく行えると思います。
明日はデータ・ソース取得元をSQLServerとして、取得してみたいと思います。