LuigiでバッチJobワークフローを実行してみた

2017.02.02

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、三上です。

気付けば2017年も1ヶ月が過ぎ去り。。(日付とか、気を抜いてるとまだ "2016" などと書いてる自分が。。。

はじめに

Python勉強中です。

Pythonのスクリプト言語的な手軽さが楽しいです!v

今回、Pythonで書いたバッチのJobワークフローエンジンを検討するにあたり、Luigiをさわってみました。

Luigiって何?

  • PythonでバッチJobのワークフロー管理してくれるヒト(全部 Pythonコード
  • スケジューラー機能はない(ので、cronからluigiを実行する
  • Jobに依存関係を持たせたい場合、if文ばしばしなコード書かなくて良いのでコードの可読性が上がる

さわってみた

環境準備

Python for Windows

今回は、Windows10(Mac Sierra VMware Fusion)に Python 3.6.0 をインストールしました。

まずは、WindowsでPythonを使えるようにします。

Pythonの ダウンロードページ からインストーラーをダウンロードして実行します。

インストールが終わったら、Pythonとpip、setuptoolsのパスを環境変数に追加します。

Pythonはデフォルトでは、

C:\Users\[ユーザー名]\AppData\Local\Program

配下にインストールされるので、以下の2つをシステム環境変数のPathに追加しました。

C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36
C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\Scripts

コマンドプロンプトを立ち上げなおして、実行できるか確認してみます。

C:\Users\mikami.yuki>py -V
Python 3.6.0

C:\Users\mikami.yuki>pip -V
pip 9.0.1 from c:\users\mikami.yuki\appdata\local\programs\python\python36\lib\site-packages (python 3.6)

C:\Users\mikami.yuki>easy_install --version
setuptools 28.8.0 from c:\users\mikami.yuki\appdata\local\programs\python\python36\lib\site-packages (Python 3.6)

→WindowsでPythonが使えるようになりました。

Luigiサンプルコード

Git からLuigiのサンプルコードを取得して、動かしてみます。

コードを取得します。

C:\Users\mikami.yuki\luigi>git clone https://github.com/spotify/luigi.git
Cloning into 'luigi'...
remote: Counting objects: 16180, done.
remote: Total 16180 (delta 0), reused 0 (delta 0), pack-reused 16180
Receiving objects: 100% (16180/16180), 8.32 MiB | 2.10 MiB/s, done.
Resolving deltas: 100% (11208/11208), done.
Checking out files: 100% (331/331), done.

セットアップスクリプト(setup.py)が入っているので、実行して必要なモジュールをビルド&インストールします。

C:\Users\mikami.yuki\luigi\luigi>py setup.py install
running install
running bdist_egg
running egg_info
creating luigi.egg-info
writing luigi.egg-info\PKG-INFO
writing dependency_links to luigi.egg-info\dependency_links.txt
(中略)

Using c:\users\mikami.yuki\appdata\local\programs\python\python36\lib\site-packages
Finished processing dependencies for luigi==2.5.0

サンプルコードは \examples 配下にあるので、hello_world.py を実行してみます。

C:\Users\mikami.yuki\luigi\luigi\examples>py hello_world.py
C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\luigi\configuration.py:54: UserWarning: LUIGI_CONFIG_PATH points to a file which does not exist. Invalid file: C:\Users\mikami.yuki\SBJ\src\work\ldb\ldb\workflow\luigi.cfg
  warnings.warn("LUIGI_CONFIG_PATH points to a file which does not exist. Invalid file: {path}".format(path=config_file))
利用できるインスタンスがありません。
DEBUG: Checking if examples.HelloWorldTask() is complete
C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\luigi\worker.py:305: UserWarning: Task examples.HelloWorldTask() without outputs has no custom complete() method
  is_complete = task.complete()
INFO: Informed scheduler that task   examples.HelloWorldTask__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 4372] Worker Worker(salt=556257757, workers=1, host=HL00198, username=mikami.yuki, pid=4372) running   examples.HelloWorldTask()
HelloWorldTask says: Hello world!
INFO: [pid 4372] Worker Worker(salt=556257757, workers=1, host=HL00198, username=mikami.yuki, pid=4372) done      examples.HelloWorldTask()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   examples.HelloWorldTask__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=556257757, workers=1, host=HL00198, username=mikami.yuki, pid=4372) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 examples.HelloWorldTask()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

→ successfully!v

Luigiでコードを書いてみる

LuigiのTaskクラスを継承して、自分で作りたいクラスに以下の3つのメソッドを書きます

  • requires():依存関係の指定
  • run():処理本体
  • output():タスク処理のアウトプット

requires()で指定した先行タスクの処理完了後に自タスクが実行されますが、先行タスクの処理が完了しない(失敗した)場合、自タスクも実行されません。

タスク処理の完了(成功)はアウトプットがあるかどうかで判断されるので、アウトプットがなければ後行タスクは実行されず、ワークフローは中断されます。

中断された後にもう一度最初から実行すると、前回の処理が成功した(アウトプットがある)タスクは実行されず、失敗した(アウトプットがない)タスクから先が実行されます。

個人的には、こちら の記事が、クラスのイメージ図もあり、非常にイメージしやすかったですmm

依存関係を指定してみる

先ほど実行したサンプルコードの中身を見てみます。

hello_world.py

import luigi

class HelloWorldTask(luigi.Task):
    task_namespace = 'examples'

    def run(self):
        print("{task} says: Hello world!".format(task=self.__class__.__name__))

if __name__ == '__main__':
    luigi.run(['examples.HelloWorldTask', '--workers', '1', '--local-scheduler'])

処理実行部のみが記載されています。

これに、もう一つタスクを追加して、順番に実行するようにしてみます。

hello_world_kai.py

import luigi

class HelloWorldTask(luigi.Task):
    task_namespace = 'examples'

    def requires(self):
        return WorldIsFineTask()

    def run(self):
        print("{task} says: Hello world!".format(task=self.__class__.__name__))

class WorldIsFineTask(luigi.Task):
    task_namespace = 'examples'

    def run(self):
        with self.output().open('w') as output:
            output.write('{task} says: The world is a fine place!\n'.format(task=self.__class__.__name__))

    def output(self):
        return luigi.LocalTarget('world.txt')

if __name__ == '__main__':
    luigi.run(['examples.HelloWorldTask', '--workers', '1', '--local-scheduler'])

12~20行目に、最初に実行するタスクを追加しました。

先行タスクはないので requires() はありませんが、19~20行目に output() メソッドを追加して、world.txt があれば処理完了(成功)と判断できるようにしています。

15~17行目の run() メソッドで、world.txt を出力しています。

6, 7行目の requires() メソッドで、追加したタスクを先行タスクに指定しています。

実行してみます。

C:\Users\mikami.yuki\luigi\luigi\examples>py hello_world_kai.py
利用できるインスタンスがありません。
DEBUG: Checking if examples.HelloWorldTask() is complete
C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\luigi\worker.py:305: UserWarning: Task examples.HelloWorldTask() without outputs has no custom complete() method
  is_complete = task.complete()
DEBUG: Checking if examples.WorldIsFineTask() is complete
INFO: Informed scheduler that task   examples.HelloWorldTask__99914b932b   has status   PENDING
INFO: Informed scheduler that task   examples.WorldIsFineTask__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 10568] Worker Worker(salt=514878905, workers=1, host=HL00198, username=mikami.yuki, pid=10568) running   examples.WorldIsFineTask()
INFO: [pid 10568] Worker Worker(salt=514878905, workers=1, host=HL00198, username=mikami.yuki, pid=10568) done      examples.WorldIsFineTask()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   examples.WorldIsFineTask__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 10568] Worker Worker(salt=514878905, workers=1, host=HL00198, username=mikami.yuki, pid=10568) running   examples.HelloWorldTask()
HelloWorldTask says: Hello world!
INFO: [pid 10568] Worker Worker(salt=514878905, workers=1, host=HL00198, username=mikami.yuki, pid=10568) done      examples.HelloWorldTask()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   examples.HelloWorldTask__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=514878905, workers=1, host=HL00198, username=mikami.yuki, pid=10568) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 2 ran successfully:
    - 1 examples.HelloWorldTask()
    - 1 examples.WorldIsFineTask()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

→ちゃんと順番に実行できました。

オンメモリのアウトプットを指定してみる

先ほど追加した先行タスクのアウトプットにはファイルを指定しているため、繰り返し実行する場合は出力したファイルを消す必要があります。。

ファイルがある状態で実行するとすでに処理が完了しているとみなされ、以下のように先行タスクの run() メソッドが実行されないためです。。。

C:\Users\mikami.yuki\luigi\luigi\examples>py hello_world_kai.py
(中略)
INFO:
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 1 present dependencies were encountered:
    - 1 examples.WorldIsFineTask()
* 1 ran successfully:
    - 1 examples.HelloWorldTask()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

動作確認時など、毎回ファイルを消すのは手間なので、MockTargetを使ってメモリにアウトプットするよう変更してみます。

from luigi.mock import MockTarget

(中略)

class WorldIsFineTask(luigi.Task):
    task_namespace = 'examples'

    def run(self):
        with self.output().open('w') as output:
            print("{task} says: The world is a fine place!".format(task=self.__class__.__name__))

    def output(self):
        return MockTarget("output")

1行目のimport文を追加し、13行目のアウトプットを MockTarget に変更しました。

10行目でprint文を出力しています。

実行してみます。

C:\Users\mikami.yuki\luigi\luigi\examples>py hello_world_mock.py
DEBUG: Checking if examples.HelloWorldTask() is complete
C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\luigi\worker.py:305: UserWarning: Task examples.HelloWorldTask() without outputs has no custom complete() method
  is_complete = task.complete()
DEBUG: Checking if examples.WorldIsFineTask() is complete
INFO: Informed scheduler that task   examples.HelloWorldTask__99914b932b   has status   PENDING
INFO: Informed scheduler that task   examples.WorldIsFineTask__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 10204] Worker Worker(salt=524562969, workers=1, host=HL00198, username=mikami.yuki, pid=10204) running   examples.WorldIsFineTask()
WorldIsFineTask says: The world is a fine place!
INFO: [pid 10204] Worker Worker(salt=524562969, workers=1, host=HL00198, username=mikami.yuki, pid=10204) done      examples.WorldIsFineTask()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   examples.WorldIsFineTask__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 10204] Worker Worker(salt=524562969, workers=1, host=HL00198, username=mikami.yuki, pid=10204) running   examples.HelloWorldTask()
HelloWorldTask says: Hello world!
INFO: [pid 10204] Worker Worker(salt=524562969, workers=1, host=HL00198, username=mikami.yuki, pid=10204) done      examples.HelloWorldTask()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   examples.HelloWorldTask__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=524562969, workers=1, host=HL00198, username=mikami.yuki, pid=10204) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 2 ran successfully:
    - 1 examples.HelloWorldTask()
    - 1 examples.WorldIsFineTask()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

→何度でも連続して実行できるようになりました。

※自タスクのアウトプットを必要としない(他タスクの実行管理などの)ダミータスクを作りたい場合、これでOKですね。

Luigiでつまづいたこと

ModuleNotFoundError

結論:luigiコマンドで実行する場合、環境変数:PYTHONPATHの追加が必要。。

サンプルコード hello_world.py には

if __name__ == '__main__':
    luigi.run(['examples.HelloWorldTask', '--workers', '1', '--local-scheduler'])

が書いてあるるので、pythonコマンドでファイル実行すると、luigi.run() が実行されます。

この、luigi.run() の部分を消してみると・・・

C:\Users\mikami.yuki\luigi>py hello_world.py

C:\Users\mikami.yuki\luigi>

うんともすんとも。。。

実行する関数ない、ですものね。。(「おまじない」くらいにしか思ってなかった。。(あせ

コマンドラインからLuigi実行する場合、

luigi --module [パッケージ名] [クラス名] [オプション]

で実行するとのことで。。

ではコマンドラインから、luigiコマンドで実行します。

C:\Users\mikami.yuki\luigi>luigi --module hello_world examples.HelloWorldTask --local-scheduler
Traceback (most recent call last):
  File "C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\Scripts\luigi-script.py", line 11, in <module>
    load_entry_point('luigi==2.5.0', 'console_scripts', 'luigi')()
  File "C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\luigi\cmdline.py", line 11, in luigi_run
    run_with_retcodes(argv)
  File "C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\luigi\retcodes.py", line 69, in run_with_retcodes
    with luigi.cmdline_parser.CmdlineParser.global_instance(argv):
  File "C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\contextlib.py", line 82, in __enter__
    return next(self.gen)
  File "C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\luigi\cmdline_parser.py", line 52, in global_instance
    new_value = CmdlineParser(cmdline_args)
  File "C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\luigi\cmdline_parser.py", line 64, in __init__
    self._attempt_load_module(known_args)
  File "C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\luigi\cmdline_parser.py", line 144, in _attempt_load_module
    __import__(module)
ModuleNotFoundError: No module named 'hello_world'

あれ?エラーとな?

→すみません、ちゃんと読んでませんでした。。

システム環境変数にPYTHONPATHを追加し、コマンドプロンプト立ち上げなおして再実行。

C:\Users\mikami.yuki\luigi>luigi --module hello_world examples.HelloWorldTask --local-scheduler
(中略)
INFO:
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 examples.HelloWorldTask()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

→ちゃんと実行できました!v

環境変数で指定したconfigファイルが読み込まれない

結論:環境変数:LUIGI_CONFIG_PATH には、ファイル名まで指定する必要あり。。

Luigiでは、タスク実行時のパラメータをconfigファイルで指定できます。

以下のコードは、パラメータで指定された country をprintする処理です。

hello_world_jp.py

import luigi

class HelloWorldTask(luigi.Task):
    country = luigi.Parameter()

    def run(self):
        print("{task} says: Hello world from {ctry}!".format(task=self.__class__.__name__, ctry=self.country))

if __name__ == '__main__':
    luigi.run(['HelloWorldTask', '--workers', '1', '--local-scheduler'])

このパラメータは下記 config ファイルで指定できます。

luigi.cfg

[HelloWorldTask]
country = Japan

configファイル(luigi.cfg)を同じディレクトリにおいて実行すると、ちゃんとパラメータを読み込んでくれます。

C:\Users\mikami.yuki\luigi>py hello_world_jp.py
DEBUG: Checking if HelloWorldTask(country=Japan) is complete
(中略)
INFO:
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 HelloWorldTask(country=Japan)

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

Luigiのconfigファイルですが、置き場所は環境変数で指定可能とのことなので、システム環境変数に指定してみました。

luigi_conf_ng

実行してみます。

C:\Users\mikami.yuki\luigi>py hello_world_jp.py
C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\luigi\configuration.py:54: UserWarning: LUIGI_CONFIG_PATH points to a file which does not exist. Invalid file: C:\Users\mikami.yuki\luigi\conf
  warnings.warn("LUIGI_CONFIG_PATH points to a file which does not exist. Invalid file: {path}".format(path=config_file))
Traceback (most recent call last):
  File "hello_world_jp.py", line 10, in <module>
    luigi.run(['HelloWorldTask', '--workers', '1', '--local-scheduler'])
  File "C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\luigi\interface.py", line 209, in run
    return _run(*args, **kwargs)['success']
  File "C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\luigi\interface.py", line 237, in _run
    return _schedule_and_run([cp.get_task_obj()], worker_scheduler_factory)
  File "C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\luigi\cmdline_parser.py", line 116, in get_task_obj
    return self._get_task_cls()(**self._get_task_kwargs())
  File "C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\luigi\task_register.py", line 88, in __call__
    param_values = cls.get_param_values(params, args, kwargs)
  File "C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\luigi\task.py", line 360, in get_param_values
    raise parameter.MissingParameterException("%s: requires the '%s' parameter to be set" % (exc_desc, param_name))
luigi.parameter.MissingParameterException: HelloWorldTask[args=(), kwargs={}]: requires the 'country' parameter to be set

configファイルがないとか、パラメータがないとか、怒られとる。。

Luigi本体のコード確認してみたところ、どうやら、環境変数にファイル名まで指定する必要があるようで。。

configuration.py

(前略)
class LuigiConfigParser(ConfigParser):
    NO_DEFAULT = object()
    _instance = None
    _config_paths = [
        '/etc/luigi/client.cfg',  # Deprecated old-style global luigi config
        '/etc/luigi/luigi.cfg',
        'client.cfg',  # Deprecated old-style local luigi config
        'luigi.cfg',
    ]
    if 'LUIGI_CONFIG_PATH' in os.environ:
        config_file = os.environ['LUIGI_CONFIG_PATH']
        if not os.path.isfile(config_file):
            warnings.warn("LUIGI_CONFIG_PATH points to a file which does not exist. Invalid file: {path}".format(path=config_file))
        else:
            _config_paths.append(config_file)
(後略)

環境変数のパスにファイル名を追加して再実行してみます。

luigi_conf_ok

C:\Users\mikami.yuki\luigi>py hello_world_jp.py
利用できるインスタンスがありません。
DEBUG: Checking if HelloWorldTask(country=Japan) is complete
C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\luigi\worker.py:305: UserWarning: Task HelloWorldTask(country=Japan) without outputs has no custom complete() method
  is_complete = task.complete()
INFO: Informed scheduler that task   HelloWorldTask_Japan_7d3920278f   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 7300] Worker Worker(salt=138057246, workers=1, host=HL00198, username=mikami.yuki, pid=7300) running   HelloWorldTask(country=Japan)
HelloWorldTask says: Hello world from Japan!
INFO: [pid 7300] Worker Worker(salt=138057246, workers=1, host=HL00198, username=mikami.yuki, pid=7300) done      HelloWorldTask(country=Japan)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   HelloWorldTask_Japan_7d3920278f   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=138057246, workers=1, host=HL00198, username=mikami.yuki, pid=7300) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 HelloWorldTask(country=Japan)

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

→ちゃんと読み込んでくれました!

ブラウザで実行状況が表示されない

結論:Central Scheduler(luigid &オプション:--local-scheduler なし)で実行する必要があります。。

ブラウザで、こんな 感じに、ワークフローの実行状況が可視化できるらしい。

で、Chrome立ち上げて、URLに http://localhost:8082/ を入力(わくわく

browser_error

あれ?何も表示されない。。

→ --local-schedulerなしで実行します。

C:\Users\mikami.yuki\luigi>luigi --module hello_world_kai HelloWorldTask
DEBUG: Checking if HelloWorldTask() is complete
C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\luigi\worker.py:305: UserWarning: Task HelloWorldTask() without outputs has no custom complete() method
  is_complete = task.complete()
DEBUG: Checking if WorldIsFineTask() is complete
ERROR: Failed connecting to remote scheduler 'http://localhost:8082'
Traceback (most recent call last):
  File "C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\requests\packages\urllib3\connection.py", line 138, in _new_conn
    (self.host, self.port), self.timeout, **extra_kw)
  File "C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\requests\packages\urllib3\util\connection.py", line 98, in create_connection
    raise err
  File "C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\requests\packages\urllib3\util\connection.py", line 88, in create_connection
    sock.connect(sa)
ConnectionRefusedError: [WinError 10061] 対象のコンピューターによって拒否されたため、接続できませんでした。

During handling of the above exception, another exception occurred:
(後略)

なんか、リトライしはじめた・・・?

→すみません、ちゃんと読んで(ry

luigid コマンドでスケジューラーを起動。

C:\Users\mikami.yuki\luigi>luigid
Defaulting to basic logging; consider specifying logging_conf_file in luigi.cfg.
2017-02-02 00:55:42,910 luigi.scheduler[6328] INFO: No prior state file exists at /luigid/state/luigi-state.pickle. Starting with empty state
2017-02-02 00:55:42,923 luigi.server[6328] INFO: Scheduler starting up

再度Luigiタスクを実行して、ブラウザ表示。

$ luigi --module hello_world_kai HelloWorldTask

luigi_workers

→実行状態が表示できました!v

おわりに(所感)

  • サンプルコード(一式)を使えば、意外とお手軽に動かすことができました。
  • 前Jobの出力ファイルを後Jobが取り込む、別Job完了の待ち合わせが必要、など、依存関係のあるワークフローの制御には最適かと思いました。
  • Pythonコードなので、カスタマイズ可能で自由度が高い!
  • Job管理ツールではなく、ワークフロー制御のフレームワーク
  • Jobそのものに近い?(LuigiありきでJobを実装してもいいかも?
  • Airflowも気になる。。(Luigiと比較してみたい!

参考