ローカルファイルを参照してみる | Luigi Advent Calendar 2016 #04

2016.12.04

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

好物はインフラとフロントエンドのかじわらゆたかです。
このエントリは『Luigi Advent Calendar 2016』4日目の内容となります。
今回はローカルファイルを参照していきたいと思います。

先日3日目はタスクパラメータを設定してみた でした。
Luigiは複雑データパイプラインを処理するツールになります。
当然データとしてローカルのファイルも扱うことができます。
今回はその一例としてファイルの中身を参照してみたいと思います。

下準備

参照するTSVファイルを用意しておきます。
今回はRedshiftのサンプルデータの一部をTSVとして抽出したものを用いました。

customer.tsv

1	Customer#000000001	IVhzIApeRb	MOROCCO  0	MOROCCO	AFRICA	25-989-741-2988	BUILDING
2	Customer#000000002	XSTf4,NCwDVaWNe6tE	JORDAN   1	JORDAN	MIDDLE EAST	23-768-687-3665	AUTOMOBILE
3	Customer#000000003	MG9kdTD	ARGENTINA7	ARGENTINA	AMERICA	11-719-748-3364	AUTOMOBILE
4	Customer#000000004	XxVSJsL	EGYPT    4	EGYPT	MIDDLE EAST	14-128-190-5944	MACHINERY
5	Customer#000000005	KvpyuHCplrB84WgAi	CANADA   5	CANADA	AMERICA	13-750-942-6364	HOUSEHOLD
6	Customer#000000006	sKZz0CsnMD7mp4Xd0YrBvx	SAUDI ARA2	SAUDI ARABIA	MIDDLE EAST	30-114-968-4951	AUTOMOBILE
7	Customer#000000007	TcGe5gaZNgVePxU5kR	CHINA    0	CHINA	ASIA	28-190-982-9759	AUTOMOBILE
8	Customer#000000008	I0B10bB0AymmC, 0PrRYBC	PERU     6	PERU	AMERICA	27-147-574-9335	BUILDING
9	Customer#000000009	xKiAFTjUsCuxfele	INDIA    6	INDIA	ASIA	18-338-906-3675	FURNITURE
10	Customer#000000010	6LrEaV6KR6PLVcgl2ArL 	ETHIOPIA 9	ETHIOPIA	AFRICA	15-741-346-9870	HOUSEHOLD

ローカルファイルの参照

Luigiのタスク一つ一つはPythonのクラスで実装され、outputメソッドで最終的な出力の形式を、
runメソッドでそのタスク内での処理を、requiresメソッドでその依存関係を指定します。
また、ファイルの参照をするといった場合も依存関係で解決するため、今回の例ではファイルの参照を行うクラスを実装し、
それを依存関係として指定することで、当該のファイルを参照して処理をするといったことが行えることとなります。

まず、ファイルの参照を実装していきます。
上記に記載した通りファイルの参照はファイルの参照するためのタスクを作成し、outputメソッドでファイルの形式として指定する必要があります。
今回の例では以下のようなりました。

import luigi


class TsvInput(luigi.Task):

    def output(self):
        return luigi.LocalTarget('customer.tsv')

上記のままではファイルの参照を指定しているだけですので、これを依存関係に組み込んで、 実際に値を参照・出力してみたいと思います。

依存関係の設定

タスクの依存関係を指定する際は、上記でも書いた通りrequiresメソッドを用います。
依存関係で用いたいタスクを戻り値として使うことで、依存関係として参照されることになります。 上記で書いたTsvInputを依存関係として使う場合は以下のようになります。

class TsvColumnShow(luigi.Task):

    def requires(self):
        return TsvInput()

依存元のタスクでoutputメソッドが定義されていた際は、
タスク内でinputメソッドを呼ぶことでoutputメソッドで指定した内容を参照することができます。

今回のファイル参照の場合、依存元でファイルの読み込みを実施し、outputメソッド内でluigi.LocalTargetメソッドで ファイルの参照を実施しているため、その後のTsvColumShowのrunメソッド内のinput()メソッドで参照できています。

class TsvColumnShow(luigi.Task):

    def requires(self):
        return TsvInput()

    def run(self):
        with self.input().open('r') as input:
            for line in input:
                print line

最後に、コード全体と、動かした際の挙動を載せておきます。
配置したtsvファイルの内容が表示されていることがわかると思います。

UseTsv.py

import luigi


class TsvInput(luigi.Task):

    def output(self):
        return luigi.LocalTarget('customer.tsv')


class TsvColumnShow:q(luigi.Task):

    def requires(self):
        return TsvInput()

    def run(self):
        with self.input().open('r') as input:
            for line in input:
                print line


if __name__ == '__main__':
    luigi.run()

実行結果

$ ls
UseTsv.py  customer.tsv

$ python ./UseTsv.py  --local-scheduler TsvColumShow
DEBUG: Checking if TsvColumShow() is complete
/Users/kajiwarayutaka/.pyenv/versions/luigiStudy/lib/python2.7/site-packages/luigi/worker.py:295: UserWarning: Task TsvColumShow() without outputs has no custom complete() method
  is_complete = task.complete()
DEBUG: Checking if TsvInput() is complete
INFO: Informed scheduler that task   TsvColumShow__99914b932b   has status   PENDING
INFO: Informed scheduler that task   TsvInput__99914b932b   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 98445] Worker Worker(salt=002911966, workers=1, host=HL00088, username=kajiwarayutaka, pid=98445) running   TsvColumShow()
1	Customer#000000001	IVhzIApeRb	MOROCCO  0	MOROCCO	AFRICA	25-989-741-2988	BUILDING

2	Customer#000000002	XSTf4,NCwDVaWNe6tE	JORDAN   1	JORDAN	MIDDLE EAST	23-768-687-3665	AUTOMOBILE

3	Customer#000000003	MG9kdTD	ARGENTINA7	ARGENTINA	AMERICA	11-719-748-3364	AUTOMOBILE

4	Customer#000000004	XxVSJsL	EGYPT    4	EGYPT	MIDDLE EAST	14-128-190-5944	MACHINERY

5	Customer#000000005	KvpyuHCplrB84WgAi	CANADA   5	CANADA	AMERICA	13-750-942-6364	HOUSEHOLD

6	Customer#000000006	sKZz0CsnMD7mp4Xd0YrBvx	SAUDI ARA2	SAUDI ARABIA	MIDDLE EAST	30-114-968-4951	AUTOMOBILE

7	Customer#000000007	TcGe5gaZNgVePxU5kR	CHINA    0	CHINA	ASIA	28-190-982-9759	AUTOMOBILE

8	Customer#000000008	I0B10bB0AymmC, 0PrRYBC	PERU     6	PERU	AMERICA	27-147-574-9335	BUILDING

9	Customer#000000009	xKiAFTjUsCuxfele	INDIA    6	INDIA	ASIA	18-338-906-3675	FURNITURE

10	Customer#000000010	6LrEaV6KR6PLVcgl2ArL 	ETHIOPIA 9	ETHIOPIA	AFRICA	15-741-346-9870	HOUSEHOLD

INFO: [pid 98445] Worker Worker(salt=002911966, workers=1, host=HL00088, username=kajiwarayutaka, pid=98445) done      TsvColumShow()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   TsvColumShow__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=002911966, workers=1, host=HL00088, username=kajiwarayutaka, pid=98445) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 1 present dependencies were encountered:
    - 1 TsvInput()
* 1 ran successfully:
    - 1 TsvColumShow()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

まとめ

まずはデータ制御のローカルファイルの参照を行なってみました。
今回は単一のファイルを扱ったので、次回は複数のファイルを扱ってみたいと思います。