気付けば2017年も1ヶ月が過ぎ去り。。(日付とか、気を抜いてるとまだ "2016" などと書いてる自分が。。。
- PythonでバッチJobのワークフロー管理してくれるヒト(全部 Pythonコード
- スケジューラー機能はない(ので、cronからluigiを実行する
- Jobに依存関係を持たせたい場合、if文ばしばしなコード書かなくて良いのでコードの可読性が上がる
- Luigi Advent Calendar 2016 | シリーズ | Developers.IO
- OSSのワークフローエンジンを使ってみた感想 | Qiita
- データフロー制御フレームワークLuigiを使ってビッグデータ解析をする | Qiita
Python for Windows
今回は、Windows10(Mac Sierra VMware Fusion)に Python 3.6.0 をインストールしました。
Pythonの ダウンロードページ からインストーラーをダウンロードして実行します。
C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36 C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\Scripts
C:\Users\mikami.yuki>py -V Python 3.6.0 C:\Users\mikami.yuki>pip -V pip 9.0.1 from c:\users\mikami.yuki\appdata\local\programs\python\python36\lib\site-packages (python 3.6) C:\Users\mikami.yuki>easy_install --version setuptools 28.8.0 from c:\users\mikami.yuki\appdata\local\programs\python\python36\lib\site-packages (Python 3.6)
Git からLuigiのサンプルコードを取得して、動かしてみます。
C:\Users\mikami.yuki\luigi>git clone Cloning into 'luigi'... remote: Counting objects: 16180, done. remote: Total 16180 (delta 0), reused 0 (delta 0), pack-reused 16180 Receiving objects: 100% (16180/16180), 8.32 MiB | 2.10 MiB/s, done. Resolving deltas: 100% (11208/11208), done. Checking out files: 100% (331/331), done.
C:\Users\mikami.yuki\luigi\luigi>py install running install running bdist_egg running egg_info creating luigi.egg-info writing luigi.egg-info\PKG-INFO writing dependency_links to luigi.egg-info\dependency_links.txt (中略) Using c:\users\mikami.yuki\appdata\local\programs\python\python36\lib\site-packages Finished processing dependencies for luigi==2.5.0
サンプルコードは \examples 配下にあるので、 を実行してみます。
C:\Users\mikami.yuki\luigi\luigi\examples>py C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\luigi\ UserWarning: LUIGI_CONFIG_PATH points to a file which does not exist. Invalid file: C:\Users\mikami.yuki\SBJ\src\work\ldb\ldb\workflow\luigi.cfg warnings.warn("LUIGI_CONFIG_PATH points to a file which does not exist. Invalid file: {path}".format(path=config_file)) 利用できるインスタンスがありません。 DEBUG: Checking if examples.HelloWorldTask() is complete C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\luigi\ UserWarning: Task examples.HelloWorldTask() without outputs has no custom complete() method is_complete = task.complete() INFO: Informed scheduler that task examples.HelloWorldTask__99914b932b has status PENDING INFO: Done scheduling tasks INFO: Running Worker with 1 processes DEBUG: Asking scheduler for work... DEBUG: Pending tasks: 1 INFO: [pid 4372] Worker Worker(salt=556257757, workers=1, host=HL00198, username=mikami.yuki, pid=4372) running examples.HelloWorldTask() HelloWorldTask says: Hello world! INFO: [pid 4372] Worker Worker(salt=556257757, workers=1, host=HL00198, username=mikami.yuki, pid=4372) done examples.HelloWorldTask() DEBUG: 1 running tasks, waiting for next task to finish INFO: Informed scheduler that task examples.HelloWorldTask__99914b932b has status DONE DEBUG: Asking scheduler for work... DEBUG: Done DEBUG: There are no more tasks to run at this time INFO: Worker Worker(salt=556257757, workers=1, host=HL00198, username=mikami.yuki, pid=4372) was stopped. Shutting down Keep-Alive thread INFO: ===== Luigi Execution Summary ===== Scheduled 1 tasks of which: * 1 ran successfully: - 1 examples.HelloWorldTask() This progress looks :) because there were no failed tasks or missing external dependencies ===== Luigi Execution Summary =====
→ successfully!v
- requires():依存関係の指定
- run():処理本体
- output():タスク処理のアウトプット
個人的には、こちら の記事が、クラスのイメージ図もあり、非常にイメージしやすかったですmm
import luigi class HelloWorldTask(luigi.Task): task_namespace = 'examples' def run(self): print("{task} says: Hello world!".format(task=self.__class__.__name__)) if __name__ == '__main__':['examples.HelloWorldTask', '--workers', '1', '--local-scheduler'])
import luigi class HelloWorldTask(luigi.Task): task_namespace = 'examples' def requires(self): return WorldIsFineTask() def run(self): print("{task} says: Hello world!".format(task=self.__class__.__name__)) class WorldIsFineTask(luigi.Task): task_namespace = 'examples' def run(self): with self.output().open('w') as output: output.write('{task} says: The world is a fine place!\n'.format(task=self.__class__.__name__)) def output(self): return luigi.LocalTarget('world.txt') if __name__ == '__main__':['examples.HelloWorldTask', '--workers', '1', '--local-scheduler'])
先行タスクはないので requires() はありませんが、19~20行目に output() メソッドを追加して、world.txt があれば処理完了(成功)と判断できるようにしています。
15~17行目の run() メソッドで、world.txt を出力しています。
6, 7行目の requires() メソッドで、追加したタスクを先行タスクに指定しています。
C:\Users\mikami.yuki\luigi\luigi\examples>py 利用できるインスタンスがありません。 DEBUG: Checking if examples.HelloWorldTask() is complete C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\luigi\ UserWarning: Task examples.HelloWorldTask() without outputs has no custom complete() method is_complete = task.complete() DEBUG: Checking if examples.WorldIsFineTask() is complete INFO: Informed scheduler that task examples.HelloWorldTask__99914b932b has status PENDING INFO: Informed scheduler that task examples.WorldIsFineTask__99914b932b has status PENDING INFO: Done scheduling tasks INFO: Running Worker with 1 processes DEBUG: Asking scheduler for work... DEBUG: Pending tasks: 2 INFO: [pid 10568] Worker Worker(salt=514878905, workers=1, host=HL00198, username=mikami.yuki, pid=10568) running examples.WorldIsFineTask() INFO: [pid 10568] Worker Worker(salt=514878905, workers=1, host=HL00198, username=mikami.yuki, pid=10568) done examples.WorldIsFineTask() DEBUG: 1 running tasks, waiting for next task to finish INFO: Informed scheduler that task examples.WorldIsFineTask__99914b932b has status DONE DEBUG: Asking scheduler for work... DEBUG: Pending tasks: 1 INFO: [pid 10568] Worker Worker(salt=514878905, workers=1, host=HL00198, username=mikami.yuki, pid=10568) running examples.HelloWorldTask() HelloWorldTask says: Hello world! INFO: [pid 10568] Worker Worker(salt=514878905, workers=1, host=HL00198, username=mikami.yuki, pid=10568) done examples.HelloWorldTask() DEBUG: 1 running tasks, waiting for next task to finish INFO: Informed scheduler that task examples.HelloWorldTask__99914b932b has status DONE DEBUG: Asking scheduler for work... DEBUG: Done DEBUG: There are no more tasks to run at this time INFO: Worker Worker(salt=514878905, workers=1, host=HL00198, username=mikami.yuki, pid=10568) was stopped. Shutting down Keep-Alive thread INFO: ===== Luigi Execution Summary ===== Scheduled 2 tasks of which: * 2 ran successfully: - 1 examples.HelloWorldTask() - 1 examples.WorldIsFineTask() This progress looks :) because there were no failed tasks or missing external dependencies ===== Luigi Execution Summary =====
ファイルがある状態で実行するとすでに処理が完了しているとみなされ、以下のように先行タスクの run() メソッドが実行されないためです。。。
C:\Users\mikami.yuki\luigi\luigi\examples>py (中略) INFO: ===== Luigi Execution Summary ===== Scheduled 2 tasks of which: * 1 present dependencies were encountered: - 1 examples.WorldIsFineTask() * 1 ran successfully: - 1 examples.HelloWorldTask() This progress looks :) because there were no failed tasks or missing external dependencies ===== Luigi Execution Summary =====
from luigi.mock import MockTarget (中略) class WorldIsFineTask(luigi.Task): task_namespace = 'examples' def run(self): with self.output().open('w') as output: print("{task} says: The world is a fine place!".format(task=self.__class__.__name__)) def output(self): return MockTarget("output")
1行目のimport文を追加し、13行目のアウトプットを MockTarget に変更しました。
C:\Users\mikami.yuki\luigi\luigi\examples>py DEBUG: Checking if examples.HelloWorldTask() is complete C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\luigi\ UserWarning: Task examples.HelloWorldTask() without outputs has no custom complete() method is_complete = task.complete() DEBUG: Checking if examples.WorldIsFineTask() is complete INFO: Informed scheduler that task examples.HelloWorldTask__99914b932b has status PENDING INFO: Informed scheduler that task examples.WorldIsFineTask__99914b932b has status PENDING INFO: Done scheduling tasks INFO: Running Worker with 1 processes DEBUG: Asking scheduler for work... DEBUG: Pending tasks: 2 INFO: [pid 10204] Worker Worker(salt=524562969, workers=1, host=HL00198, username=mikami.yuki, pid=10204) running examples.WorldIsFineTask() WorldIsFineTask says: The world is a fine place! INFO: [pid 10204] Worker Worker(salt=524562969, workers=1, host=HL00198, username=mikami.yuki, pid=10204) done examples.WorldIsFineTask() DEBUG: 1 running tasks, waiting for next task to finish INFO: Informed scheduler that task examples.WorldIsFineTask__99914b932b has status DONE DEBUG: Asking scheduler for work... DEBUG: Pending tasks: 1 INFO: [pid 10204] Worker Worker(salt=524562969, workers=1, host=HL00198, username=mikami.yuki, pid=10204) running examples.HelloWorldTask() HelloWorldTask says: Hello world! INFO: [pid 10204] Worker Worker(salt=524562969, workers=1, host=HL00198, username=mikami.yuki, pid=10204) done examples.HelloWorldTask() DEBUG: 1 running tasks, waiting for next task to finish INFO: Informed scheduler that task examples.HelloWorldTask__99914b932b has status DONE DEBUG: Asking scheduler for work... DEBUG: Done DEBUG: There are no more tasks to run at this time INFO: Worker Worker(salt=524562969, workers=1, host=HL00198, username=mikami.yuki, pid=10204) was stopped. Shutting down Keep-Alive thread INFO: ===== Luigi Execution Summary ===== Scheduled 2 tasks of which: * 2 ran successfully: - 1 examples.HelloWorldTask() - 1 examples.WorldIsFineTask() This progress looks :) because there were no failed tasks or missing external dependencies ===== Luigi Execution Summary =====
サンプルコード には
if __name__ == '__main__':['examples.HelloWorldTask', '--workers', '1', '--local-scheduler'])
が書いてあるるので、pythonコマンドでファイル実行すると、 が実行されます。
この、 の部分を消してみると・・・
C:\Users\mikami.yuki\luigi>py C:\Users\mikami.yuki\luigi>
luigi --module [パッケージ名] [クラス名] [オプション]
C:\Users\mikami.yuki\luigi>luigi --module hello_world examples.HelloWorldTask --local-scheduler Traceback (most recent call last): File "C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\Scripts\", line 11, in <module> load_entry_point('luigi==2.5.0', 'console_scripts', 'luigi')() File "C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\luigi\", line 11, in luigi_run run_with_retcodes(argv) File "C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\luigi\", line 69, in run_with_retcodes with luigi.cmdline_parser.CmdlineParser.global_instance(argv): File "C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\", line 82, in __enter__ return next(self.gen) File "C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\luigi\", line 52, in global_instance new_value = CmdlineParser(cmdline_args) File "C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\luigi\", line 64, in __init__ self._attempt_load_module(known_args) File "C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\luigi\", line 144, in _attempt_load_module __import__(module) ModuleNotFoundError: No module named 'hello_world'
C:\Users\mikami.yuki\luigi>luigi --module hello_world examples.HelloWorldTask --local-scheduler (中略) INFO: ===== Luigi Execution Summary ===== Scheduled 1 tasks of which: * 1 ran successfully: - 1 examples.HelloWorldTask() This progress looks :) because there were no failed tasks or missing external dependencies ===== Luigi Execution Summary =====
結論:環境変数:LUIGI_CONFIG_PATH には、ファイル名まで指定する必要あり。。
以下のコードは、パラメータで指定された country をprintする処理です。
import luigi class HelloWorldTask(luigi.Task): country = luigi.Parameter() def run(self): print("{task} says: Hello world from {ctry}!".format(task=self.__class__.__name__, if __name__ == '__main__':['HelloWorldTask', '--workers', '1', '--local-scheduler'])
このパラメータは下記 config ファイルで指定できます。
[HelloWorldTask] country = Japan
C:\Users\mikami.yuki\luigi>py DEBUG: Checking if HelloWorldTask(country=Japan) is complete (中略) INFO: ===== Luigi Execution Summary ===== Scheduled 1 tasks of which: * 1 ran successfully: - 1 HelloWorldTask(country=Japan) This progress looks :) because there were no failed tasks or missing external dependencies ===== Luigi Execution Summary =====
C:\Users\mikami.yuki\luigi>py C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\luigi\ UserWarning: LUIGI_CONFIG_PATH points to a file which does not exist. Invalid file: C:\Users\mikami.yuki\luigi\conf warnings.warn("LUIGI_CONFIG_PATH points to a file which does not exist. Invalid file: {path}".format(path=config_file)) Traceback (most recent call last): File "", line 10, in <module>['HelloWorldTask', '--workers', '1', '--local-scheduler']) File "C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\luigi\", line 209, in run return _run(*args, **kwargs)['success'] File "C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\luigi\", line 237, in _run return _schedule_and_run([cp.get_task_obj()], worker_scheduler_factory) File "C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\luigi\", line 116, in get_task_obj return self._get_task_cls()(**self._get_task_kwargs()) File "C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\luigi\", line 88, in __call__ param_values = cls.get_param_values(params, args, kwargs) File "C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\luigi\", line 360, in get_param_values raise parameter.MissingParameterException("%s: requires the '%s' parameter to be set" % (exc_desc, param_name)) luigi.parameter.MissingParameterException: HelloWorldTask[args=(), kwargs={}]: requires the 'country' parameter to be set
(前略) class LuigiConfigParser(ConfigParser): NO_DEFAULT = object() _instance = None _config_paths = [ '/etc/luigi/client.cfg', # Deprecated old-style global luigi config '/etc/luigi/luigi.cfg', 'client.cfg', # Deprecated old-style local luigi config 'luigi.cfg', ] if 'LUIGI_CONFIG_PATH' in os.environ: config_file = os.environ['LUIGI_CONFIG_PATH'] if not os.path.isfile(config_file): warnings.warn("LUIGI_CONFIG_PATH points to a file which does not exist. Invalid file: {path}".format(path=config_file)) else: _config_paths.append(config_file) (後略)
C:\Users\mikami.yuki\luigi>py 利用できるインスタンスがありません。 DEBUG: Checking if HelloWorldTask(country=Japan) is complete C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\luigi\ UserWarning: Task HelloWorldTask(country=Japan) without outputs has no custom complete() method is_complete = task.complete() INFO: Informed scheduler that task HelloWorldTask_Japan_7d3920278f has status PENDING INFO: Done scheduling tasks INFO: Running Worker with 1 processes DEBUG: Asking scheduler for work... DEBUG: Pending tasks: 1 INFO: [pid 7300] Worker Worker(salt=138057246, workers=1, host=HL00198, username=mikami.yuki, pid=7300) running HelloWorldTask(country=Japan) HelloWorldTask says: Hello world from Japan! INFO: [pid 7300] Worker Worker(salt=138057246, workers=1, host=HL00198, username=mikami.yuki, pid=7300) done HelloWorldTask(country=Japan) DEBUG: 1 running tasks, waiting for next task to finish INFO: Informed scheduler that task HelloWorldTask_Japan_7d3920278f has status DONE DEBUG: Asking scheduler for work... DEBUG: Done DEBUG: There are no more tasks to run at this time INFO: Worker Worker(salt=138057246, workers=1, host=HL00198, username=mikami.yuki, pid=7300) was stopped. Shutting down Keep-Alive thread INFO: ===== Luigi Execution Summary ===== Scheduled 1 tasks of which: * 1 ran successfully: - 1 HelloWorldTask(country=Japan) This progress looks :) because there were no failed tasks or missing external dependencies ===== Luigi Execution Summary =====
結論:Central Scheduler(luigid &オプション:--local-scheduler なし)で実行する必要があります。。
ブラウザで、こんな 感じに、ワークフローの実行状況が可視化できるらしい。
で、Chrome立ち上げて、URLに http://localhost:8082/ を入力(わくわく
→ --local-schedulerなしで実行します。
C:\Users\mikami.yuki\luigi>luigi --module hello_world_kai HelloWorldTask DEBUG: Checking if HelloWorldTask() is complete C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\luigi\ UserWarning: Task HelloWorldTask() without outputs has no custom complete() method is_complete = task.complete() DEBUG: Checking if WorldIsFineTask() is complete ERROR: Failed connecting to remote scheduler 'http://localhost:8082' Traceback (most recent call last): File "C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\requests\packages\urllib3\", line 138, in _new_conn (, self.port), self.timeout, **extra_kw) File "C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\requests\packages\urllib3\util\", line 98, in create_connection raise err File "C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\requests\packages\urllib3\util\", line 88, in create_connection sock.connect(sa) ConnectionRefusedError: [WinError 10061] 対象のコンピューターによって拒否されたため、接続できませんでした。 During handling of the above exception, another exception occurred: (後略)
luigid コマンドでスケジューラーを起動。
C:\Users\mikami.yuki\luigi>luigid Defaulting to basic logging; consider specifying logging_conf_file in luigi.cfg. 2017-02-02 00:55:42,910 luigi.scheduler[6328] INFO: No prior state file exists at /luigid/state/luigi-state.pickle. Starting with empty state 2017-02-02 00:55:42,923 luigi.server[6328] INFO: Scheduler starting up
$ luigi --module hello_world_kai HelloWorldTask
- サンプルコード(一式)を使えば、意外とお手軽に動かすことができました。
- 前Jobの出力ファイルを後Jobが取り込む、別Job完了の待ち合わせが必要、など、依存関係のあるワークフローの制御には最適かと思いました。
- Pythonコードなので、カスタマイズ可能で自由度が高い!
- Job管理ツールではなく、ワークフロー制御のフレームワーク
- Jobそのものに近い?(LuigiありきでJobを実装してもいいかも?
- Airflowも気になる。。(Luigiと比較してみたい!