MySQLからデータ取得してみた | Luigi Advent Calendar 2016 #11

2016.12.11

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

好物はインフラとフロントエンドのかじわらゆたかです。

このエントリは『Luigi Advent Calendar 2016』11日目の内容となります。
今回は、MySQLから値を取得し、ローカルに保存するタスク実装していきます。

先日10日目はS3のファイル一覧習得と削除を行なってみたでした。

下準備編 テストデータ

MySQLのテストデータが有りますので、そちらを用いることとしました。 datacharmer/test_db

適用方法としてはREADMEに書いてあるとおりですが、
上記のリポジトリをCloneしemployees.sqlを流せば適用できます。

下準備 mysql-connector-python

LuigiのMySQLモジュールはmysql-connector-pythonをラップした形で実装されています。
また、mysql-connector-pythonはpipでは取得できないため、
公式のリポジトリからCloneしてインストールをする形をとりました。

$ git clone https://github.com/mysql/mysql-connector-python.git
$ cd mysql/mysql-connector-python
$ python ./setup.py install
$ pip list
(省略)
mysql-connector-python (2.2.1)
(省略)

MySQLからのデータ習得

LuigiのMySQLは機能としては、タスクの結果をMySQLに格納すると言った機能を提供しています。
しかし、それ以外の機能として値の習得といったことも行うことも可能です。

作例ではDictionaryで取得し、
取得したDictionaryのキーで値を抽出しローカルファイルへの出力を行なっています。

# -*- coding: utf-8 -*-
from logging import getLogger, StreamHandler, DEBUG

import luigi
import luigi.contrib.mysqldb
logger = getLogger(__name__)
handler = StreamHandler()
handler.setLevel(DEBUG)
logger.setLevel(DEBUG)
logger.addHandler(handler)
try:
    import mysql
    from mysql.connector import errorcode
except ImportError as e:
    logger.warning(
        "Loading MySQL module without the python package mysql-connector-python. \
        This will crash at runtime if MySQL functionality is used.")


class extractMySQLTable(luigi.Task):
    def run(self):
        mySqlTarget = luigi.contrib.mysqldb.MySqlTarget(
            host="localhost:13306",
            database="employees",
            password="root_secret",
            user="root",
            table="employees",
            update_id="sample")
        connection = mySqlTarget.connect()
        cursor = connection.cursor(dictionary=True)
        try:
            cursor.execute("""SELECT emp_no,birth_date,first_name,last_name,
(CASE WHEN gender = 'M'
    THEN '男'
   WHEN gender = 'F'
     THEN '女'
   ELSE '-' END) as gender,hire_date FROM employees.employees LIMIT 10""")

            with self.output().open('w') as out_file:
                for row in cursor:
                    out_file.write(
                        u"{emp_no}\t{birth_date}\t{first_name}\t{last_name}\t{gender}\n".
                        format(**row))

        except mysql.connector.Error as e:
            if e.errno != errorcode.ER_NO_SUCH_TABLE:
                raise

    def output(run):
        return luigi.LocalTarget(
            path="employees.tsv", format=luigi.format.UTF8)


if __name__ == '__main__':
    luigi.run()

実行結果

$ python ./useMySQL.py --local-scheduler extractMySQLTable
$ cat employees.tsv
10001	1953-09-02	Georgi	Facello	男
10002	1964-06-02	Bezalel	Simmel	女
10003	1959-12-03	Parto	Bamford	男
10004	1954-05-01	Chirstian	Koblick	男
10005	1955-01-21	Kyoichi	Maliniak	男
10006	1953-04-20	Anneke	Preusig	女
10007	1957-05-23	Tzvetan	Zielinski	女
10008	1958-02-19	Saniya	Kalloufi	男
10009	1952-04-19	Sumant	Peac	女
10010	1963-06-01	Duangkaew	Piveteau	女

まとめ

今回はMySQLからデータを取得してみました。
ローカルへの配置でしたが、拡張していけばS3への配置等も問題なく行えると思います。
明日はデータ・ソース取得元をSQLServerとして、取得してみたいと思います。