LuigiでバッチJobワークフローを実行してみた
こんにちは、三上です。
気付けば2017年も1ヶ月が過ぎ去り。。(日付とか、気を抜いてるとまだ "2016" などと書いてる自分が。。。
はじめに
Python勉強中です。
Pythonのスクリプト言語的な手軽さが楽しいです!v
今回、Pythonで書いたバッチのJobワークフローエンジンを検討するにあたり、Luigiをさわってみました。
Luigiって何?
- PythonでバッチJobのワークフロー管理してくれるヒト(全部 Pythonコード
- スケジューラー機能はない(ので、cronからluigiを実行する
- Jobに依存関係を持たせたい場合、if文ばしばしなコード書かなくて良いのでコードの可読性が上がる
- Luigi Advent Calendar 2016 | シリーズ | Developers.IO
- OSSのワークフローエンジンを使ってみた感想 | Qiita
- データフロー制御フレームワークLuigiを使ってビッグデータ解析をする | Qiita
さわってみた
環境準備
Python for Windows
今回は、Windows10(Mac Sierra VMware Fusion)に Python 3.6.0 をインストールしました。
まずは、WindowsでPythonを使えるようにします。
Pythonの ダウンロードページ からインストーラーをダウンロードして実行します。
インストールが終わったら、Pythonとpip、setuptoolsのパスを環境変数に追加します。
Pythonはデフォルトでは、
C:\Users\[ユーザー名]\AppData\Local\Program
配下にインストールされるので、以下の2つをシステム環境変数のPathに追加しました。
C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36 C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\Scripts
コマンドプロンプトを立ち上げなおして、実行できるか確認してみます。
C:\Users\mikami.yuki>py -V Python 3.6.0 C:\Users\mikami.yuki>pip -V pip 9.0.1 from c:\users\mikami.yuki\appdata\local\programs\python\python36\lib\site-packages (python 3.6) C:\Users\mikami.yuki>easy_install --version setuptools 28.8.0 from c:\users\mikami.yuki\appdata\local\programs\python\python36\lib\site-packages (Python 3.6)
→WindowsでPythonが使えるようになりました。
Luigiサンプルコード
Git からLuigiのサンプルコードを取得して、動かしてみます。
コードを取得します。
C:\Users\mikami.yuki\luigi>git clone https://github.com/spotify/luigi.git Cloning into 'luigi'... remote: Counting objects: 16180, done. remote: Total 16180 (delta 0), reused 0 (delta 0), pack-reused 16180 Receiving objects: 100% (16180/16180), 8.32 MiB | 2.10 MiB/s, done. Resolving deltas: 100% (11208/11208), done. Checking out files: 100% (331/331), done.
セットアップスクリプト(setup.py)が入っているので、実行して必要なモジュールをビルド&インストールします。
C:\Users\mikami.yuki\luigi\luigi>py setup.py install running install running bdist_egg running egg_info creating luigi.egg-info writing luigi.egg-info\PKG-INFO writing dependency_links to luigi.egg-info\dependency_links.txt (中略) Using c:\users\mikami.yuki\appdata\local\programs\python\python36\lib\site-packages Finished processing dependencies for luigi==2.5.0
サンプルコードは \examples 配下にあるので、hello_world.py を実行してみます。
C:\Users\mikami.yuki\luigi\luigi\examples>py hello_world.py C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\luigi\configuration.py:54: UserWarning: LUIGI_CONFIG_PATH points to a file which does not exist. Invalid file: C:\Users\mikami.yuki\SBJ\src\work\ldb\ldb\workflow\luigi.cfg warnings.warn("LUIGI_CONFIG_PATH points to a file which does not exist. Invalid file: {path}".format(path=config_file)) 利用できるインスタンスがありません。 DEBUG: Checking if examples.HelloWorldTask() is complete C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\luigi\worker.py:305: UserWarning: Task examples.HelloWorldTask() without outputs has no custom complete() method is_complete = task.complete() INFO: Informed scheduler that task examples.HelloWorldTask__99914b932b has status PENDING INFO: Done scheduling tasks INFO: Running Worker with 1 processes DEBUG: Asking scheduler for work... DEBUG: Pending tasks: 1 INFO: [pid 4372] Worker Worker(salt=556257757, workers=1, host=HL00198, username=mikami.yuki, pid=4372) running examples.HelloWorldTask() HelloWorldTask says: Hello world! INFO: [pid 4372] Worker Worker(salt=556257757, workers=1, host=HL00198, username=mikami.yuki, pid=4372) done examples.HelloWorldTask() DEBUG: 1 running tasks, waiting for next task to finish INFO: Informed scheduler that task examples.HelloWorldTask__99914b932b has status DONE DEBUG: Asking scheduler for work... DEBUG: Done DEBUG: There are no more tasks to run at this time INFO: Worker Worker(salt=556257757, workers=1, host=HL00198, username=mikami.yuki, pid=4372) was stopped. Shutting down Keep-Alive thread INFO: ===== Luigi Execution Summary ===== Scheduled 1 tasks of which: * 1 ran successfully: - 1 examples.HelloWorldTask() This progress looks :) because there were no failed tasks or missing external dependencies ===== Luigi Execution Summary =====
→ successfully!v
Luigiでコードを書いてみる
LuigiのTaskクラスを継承して、自分で作りたいクラスに以下の3つのメソッドを書きます
- requires():依存関係の指定
- run():処理本体
- output():タスク処理のアウトプット
requires()で指定した先行タスクの処理完了後に自タスクが実行されますが、先行タスクの処理が完了しない(失敗した)場合、自タスクも実行されません。
タスク処理の完了(成功)はアウトプットがあるかどうかで判断されるので、アウトプットがなければ後行タスクは実行されず、ワークフローは中断されます。
中断された後にもう一度最初から実行すると、前回の処理が成功した(アウトプットがある)タスクは実行されず、失敗した(アウトプットがない)タスクから先が実行されます。
個人的には、こちら の記事が、クラスのイメージ図もあり、非常にイメージしやすかったですmm
依存関係を指定してみる
先ほど実行したサンプルコードの中身を見てみます。
import luigi class HelloWorldTask(luigi.Task): task_namespace = 'examples' def run(self): print("{task} says: Hello world!".format(task=self.__class__.__name__)) if __name__ == '__main__': luigi.run(['examples.HelloWorldTask', '--workers', '1', '--local-scheduler'])
処理実行部のみが記載されています。
これに、もう一つタスクを追加して、順番に実行するようにしてみます。
import luigi class HelloWorldTask(luigi.Task): task_namespace = 'examples' def requires(self): return WorldIsFineTask() def run(self): print("{task} says: Hello world!".format(task=self.__class__.__name__)) class WorldIsFineTask(luigi.Task): task_namespace = 'examples' def run(self): with self.output().open('w') as output: output.write('{task} says: The world is a fine place!\n'.format(task=self.__class__.__name__)) def output(self): return luigi.LocalTarget('world.txt') if __name__ == '__main__': luigi.run(['examples.HelloWorldTask', '--workers', '1', '--local-scheduler'])
12~20行目に、最初に実行するタスクを追加しました。
先行タスクはないので requires() はありませんが、19~20行目に output() メソッドを追加して、world.txt があれば処理完了(成功)と判断できるようにしています。
15~17行目の run() メソッドで、world.txt を出力しています。
6, 7行目の requires() メソッドで、追加したタスクを先行タスクに指定しています。
実行してみます。
C:\Users\mikami.yuki\luigi\luigi\examples>py hello_world_kai.py 利用できるインスタンスがありません。 DEBUG: Checking if examples.HelloWorldTask() is complete C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\luigi\worker.py:305: UserWarning: Task examples.HelloWorldTask() without outputs has no custom complete() method is_complete = task.complete() DEBUG: Checking if examples.WorldIsFineTask() is complete INFO: Informed scheduler that task examples.HelloWorldTask__99914b932b has status PENDING INFO: Informed scheduler that task examples.WorldIsFineTask__99914b932b has status PENDING INFO: Done scheduling tasks INFO: Running Worker with 1 processes DEBUG: Asking scheduler for work... DEBUG: Pending tasks: 2 INFO: [pid 10568] Worker Worker(salt=514878905, workers=1, host=HL00198, username=mikami.yuki, pid=10568) running examples.WorldIsFineTask() INFO: [pid 10568] Worker Worker(salt=514878905, workers=1, host=HL00198, username=mikami.yuki, pid=10568) done examples.WorldIsFineTask() DEBUG: 1 running tasks, waiting for next task to finish INFO: Informed scheduler that task examples.WorldIsFineTask__99914b932b has status DONE DEBUG: Asking scheduler for work... DEBUG: Pending tasks: 1 INFO: [pid 10568] Worker Worker(salt=514878905, workers=1, host=HL00198, username=mikami.yuki, pid=10568) running examples.HelloWorldTask() HelloWorldTask says: Hello world! INFO: [pid 10568] Worker Worker(salt=514878905, workers=1, host=HL00198, username=mikami.yuki, pid=10568) done examples.HelloWorldTask() DEBUG: 1 running tasks, waiting for next task to finish INFO: Informed scheduler that task examples.HelloWorldTask__99914b932b has status DONE DEBUG: Asking scheduler for work... DEBUG: Done DEBUG: There are no more tasks to run at this time INFO: Worker Worker(salt=514878905, workers=1, host=HL00198, username=mikami.yuki, pid=10568) was stopped. Shutting down Keep-Alive thread INFO: ===== Luigi Execution Summary ===== Scheduled 2 tasks of which: * 2 ran successfully: - 1 examples.HelloWorldTask() - 1 examples.WorldIsFineTask() This progress looks :) because there were no failed tasks or missing external dependencies ===== Luigi Execution Summary =====
→ちゃんと順番に実行できました。
オンメモリのアウトプットを指定してみる
先ほど追加した先行タスクのアウトプットにはファイルを指定しているため、繰り返し実行する場合は出力したファイルを消す必要があります。。
ファイルがある状態で実行するとすでに処理が完了しているとみなされ、以下のように先行タスクの run() メソッドが実行されないためです。。。
C:\Users\mikami.yuki\luigi\luigi\examples>py hello_world_kai.py (中略) INFO: ===== Luigi Execution Summary ===== Scheduled 2 tasks of which: * 1 present dependencies were encountered: - 1 examples.WorldIsFineTask() * 1 ran successfully: - 1 examples.HelloWorldTask() This progress looks :) because there were no failed tasks or missing external dependencies ===== Luigi Execution Summary =====
動作確認時など、毎回ファイルを消すのは手間なので、MockTargetを使ってメモリにアウトプットするよう変更してみます。
from luigi.mock import MockTarget (中略) class WorldIsFineTask(luigi.Task): task_namespace = 'examples' def run(self): with self.output().open('w') as output: print("{task} says: The world is a fine place!".format(task=self.__class__.__name__)) def output(self): return MockTarget("output")
1行目のimport文を追加し、13行目のアウトプットを MockTarget に変更しました。
10行目でprint文を出力しています。
実行してみます。
C:\Users\mikami.yuki\luigi\luigi\examples>py hello_world_mock.py DEBUG: Checking if examples.HelloWorldTask() is complete C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\luigi\worker.py:305: UserWarning: Task examples.HelloWorldTask() without outputs has no custom complete() method is_complete = task.complete() DEBUG: Checking if examples.WorldIsFineTask() is complete INFO: Informed scheduler that task examples.HelloWorldTask__99914b932b has status PENDING INFO: Informed scheduler that task examples.WorldIsFineTask__99914b932b has status PENDING INFO: Done scheduling tasks INFO: Running Worker with 1 processes DEBUG: Asking scheduler for work... DEBUG: Pending tasks: 2 INFO: [pid 10204] Worker Worker(salt=524562969, workers=1, host=HL00198, username=mikami.yuki, pid=10204) running examples.WorldIsFineTask() WorldIsFineTask says: The world is a fine place! INFO: [pid 10204] Worker Worker(salt=524562969, workers=1, host=HL00198, username=mikami.yuki, pid=10204) done examples.WorldIsFineTask() DEBUG: 1 running tasks, waiting for next task to finish INFO: Informed scheduler that task examples.WorldIsFineTask__99914b932b has status DONE DEBUG: Asking scheduler for work... DEBUG: Pending tasks: 1 INFO: [pid 10204] Worker Worker(salt=524562969, workers=1, host=HL00198, username=mikami.yuki, pid=10204) running examples.HelloWorldTask() HelloWorldTask says: Hello world! INFO: [pid 10204] Worker Worker(salt=524562969, workers=1, host=HL00198, username=mikami.yuki, pid=10204) done examples.HelloWorldTask() DEBUG: 1 running tasks, waiting for next task to finish INFO: Informed scheduler that task examples.HelloWorldTask__99914b932b has status DONE DEBUG: Asking scheduler for work... DEBUG: Done DEBUG: There are no more tasks to run at this time INFO: Worker Worker(salt=524562969, workers=1, host=HL00198, username=mikami.yuki, pid=10204) was stopped. Shutting down Keep-Alive thread INFO: ===== Luigi Execution Summary ===== Scheduled 2 tasks of which: * 2 ran successfully: - 1 examples.HelloWorldTask() - 1 examples.WorldIsFineTask() This progress looks :) because there were no failed tasks or missing external dependencies ===== Luigi Execution Summary =====
→何度でも連続して実行できるようになりました。
※自タスクのアウトプットを必要としない(他タスクの実行管理などの)ダミータスクを作りたい場合、これでOKですね。
Luigiでつまづいたこと
ModuleNotFoundError
結論:luigiコマンドで実行する場合、環境変数:PYTHONPATHの追加が必要。。
サンプルコード hello_world.py には
if __name__ == '__main__': luigi.run(['examples.HelloWorldTask', '--workers', '1', '--local-scheduler'])
が書いてあるるので、pythonコマンドでファイル実行すると、luigi.run() が実行されます。
この、luigi.run() の部分を消してみると・・・
C:\Users\mikami.yuki\luigi>py hello_world.py C:\Users\mikami.yuki\luigi>
うんともすんとも。。。
実行する関数ない、ですものね。。(「おまじない」くらいにしか思ってなかった。。(あせ
コマンドラインからLuigi実行する場合、
luigi --module [パッケージ名] [クラス名] [オプション]
で実行するとのことで。。
ではコマンドラインから、luigiコマンドで実行します。
C:\Users\mikami.yuki\luigi>luigi --module hello_world examples.HelloWorldTask --local-scheduler Traceback (most recent call last): File "C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\Scripts\luigi-script.py", line 11, in <module> load_entry_point('luigi==2.5.0', 'console_scripts', 'luigi')() File "C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\luigi\cmdline.py", line 11, in luigi_run run_with_retcodes(argv) File "C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\luigi\retcodes.py", line 69, in run_with_retcodes with luigi.cmdline_parser.CmdlineParser.global_instance(argv): File "C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\contextlib.py", line 82, in __enter__ return next(self.gen) File "C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\luigi\cmdline_parser.py", line 52, in global_instance new_value = CmdlineParser(cmdline_args) File "C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\luigi\cmdline_parser.py", line 64, in __init__ self._attempt_load_module(known_args) File "C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\luigi\cmdline_parser.py", line 144, in _attempt_load_module __import__(module) ModuleNotFoundError: No module named 'hello_world'
あれ?エラーとな?
→すみません、ちゃんと読んでませんでした。。
システム環境変数にPYTHONPATHを追加し、コマンドプロンプト立ち上げなおして再実行。
C:\Users\mikami.yuki\luigi>luigi --module hello_world examples.HelloWorldTask --local-scheduler (中略) INFO: ===== Luigi Execution Summary ===== Scheduled 1 tasks of which: * 1 ran successfully: - 1 examples.HelloWorldTask() This progress looks :) because there were no failed tasks or missing external dependencies ===== Luigi Execution Summary =====
→ちゃんと実行できました!v
環境変数で指定したconfigファイルが読み込まれない
結論:環境変数:LUIGI_CONFIG_PATH には、ファイル名まで指定する必要あり。。
Luigiでは、タスク実行時のパラメータをconfigファイルで指定できます。
以下のコードは、パラメータで指定された country をprintする処理です。
import luigi class HelloWorldTask(luigi.Task): country = luigi.Parameter() def run(self): print("{task} says: Hello world from {ctry}!".format(task=self.__class__.__name__, ctry=self.country)) if __name__ == '__main__': luigi.run(['HelloWorldTask', '--workers', '1', '--local-scheduler'])
このパラメータは下記 config ファイルで指定できます。
[HelloWorldTask] country = Japan
configファイル(luigi.cfg)を同じディレクトリにおいて実行すると、ちゃんとパラメータを読み込んでくれます。
C:\Users\mikami.yuki\luigi>py hello_world_jp.py DEBUG: Checking if HelloWorldTask(country=Japan) is complete (中略) INFO: ===== Luigi Execution Summary ===== Scheduled 1 tasks of which: * 1 ran successfully: - 1 HelloWorldTask(country=Japan) This progress looks :) because there were no failed tasks or missing external dependencies ===== Luigi Execution Summary =====
Luigiのconfigファイルですが、置き場所は環境変数で指定可能とのことなので、システム環境変数に指定してみました。
実行してみます。
C:\Users\mikami.yuki\luigi>py hello_world_jp.py C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\luigi\configuration.py:54: UserWarning: LUIGI_CONFIG_PATH points to a file which does not exist. Invalid file: C:\Users\mikami.yuki\luigi\conf warnings.warn("LUIGI_CONFIG_PATH points to a file which does not exist. Invalid file: {path}".format(path=config_file)) Traceback (most recent call last): File "hello_world_jp.py", line 10, in <module> luigi.run(['HelloWorldTask', '--workers', '1', '--local-scheduler']) File "C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\luigi\interface.py", line 209, in run return _run(*args, **kwargs)['success'] File "C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\luigi\interface.py", line 237, in _run return _schedule_and_run([cp.get_task_obj()], worker_scheduler_factory) File "C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\luigi\cmdline_parser.py", line 116, in get_task_obj return self._get_task_cls()(**self._get_task_kwargs()) File "C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\luigi\task_register.py", line 88, in __call__ param_values = cls.get_param_values(params, args, kwargs) File "C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\luigi\task.py", line 360, in get_param_values raise parameter.MissingParameterException("%s: requires the '%s' parameter to be set" % (exc_desc, param_name)) luigi.parameter.MissingParameterException: HelloWorldTask[args=(), kwargs={}]: requires the 'country' parameter to be set
configファイルがないとか、パラメータがないとか、怒られとる。。
Luigi本体のコード確認してみたところ、どうやら、環境変数にファイル名まで指定する必要があるようで。。
(前略) class LuigiConfigParser(ConfigParser): NO_DEFAULT = object() _instance = None _config_paths = [ '/etc/luigi/client.cfg', # Deprecated old-style global luigi config '/etc/luigi/luigi.cfg', 'client.cfg', # Deprecated old-style local luigi config 'luigi.cfg', ] if 'LUIGI_CONFIG_PATH' in os.environ: config_file = os.environ['LUIGI_CONFIG_PATH'] if not os.path.isfile(config_file): warnings.warn("LUIGI_CONFIG_PATH points to a file which does not exist. Invalid file: {path}".format(path=config_file)) else: _config_paths.append(config_file) (後略)
環境変数のパスにファイル名を追加して再実行してみます。
C:\Users\mikami.yuki\luigi>py hello_world_jp.py 利用できるインスタンスがありません。 DEBUG: Checking if HelloWorldTask(country=Japan) is complete C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\luigi\worker.py:305: UserWarning: Task HelloWorldTask(country=Japan) without outputs has no custom complete() method is_complete = task.complete() INFO: Informed scheduler that task HelloWorldTask_Japan_7d3920278f has status PENDING INFO: Done scheduling tasks INFO: Running Worker with 1 processes DEBUG: Asking scheduler for work... DEBUG: Pending tasks: 1 INFO: [pid 7300] Worker Worker(salt=138057246, workers=1, host=HL00198, username=mikami.yuki, pid=7300) running HelloWorldTask(country=Japan) HelloWorldTask says: Hello world from Japan! INFO: [pid 7300] Worker Worker(salt=138057246, workers=1, host=HL00198, username=mikami.yuki, pid=7300) done HelloWorldTask(country=Japan) DEBUG: 1 running tasks, waiting for next task to finish INFO: Informed scheduler that task HelloWorldTask_Japan_7d3920278f has status DONE DEBUG: Asking scheduler for work... DEBUG: Done DEBUG: There are no more tasks to run at this time INFO: Worker Worker(salt=138057246, workers=1, host=HL00198, username=mikami.yuki, pid=7300) was stopped. Shutting down Keep-Alive thread INFO: ===== Luigi Execution Summary ===== Scheduled 1 tasks of which: * 1 ran successfully: - 1 HelloWorldTask(country=Japan) This progress looks :) because there were no failed tasks or missing external dependencies ===== Luigi Execution Summary =====
→ちゃんと読み込んでくれました!
ブラウザで実行状況が表示されない
結論:Central Scheduler(luigid &オプション:--local-scheduler なし)で実行する必要があります。。
ブラウザで、こんな 感じに、ワークフローの実行状況が可視化できるらしい。
で、Chrome立ち上げて、URLに http://localhost:8082/ を入力(わくわく
あれ?何も表示されない。。
→ --local-schedulerなしで実行します。
C:\Users\mikami.yuki\luigi>luigi --module hello_world_kai HelloWorldTask DEBUG: Checking if HelloWorldTask() is complete C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\luigi\worker.py:305: UserWarning: Task HelloWorldTask() without outputs has no custom complete() method is_complete = task.complete() DEBUG: Checking if WorldIsFineTask() is complete ERROR: Failed connecting to remote scheduler 'http://localhost:8082' Traceback (most recent call last): File "C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\requests\packages\urllib3\connection.py", line 138, in _new_conn (self.host, self.port), self.timeout, **extra_kw) File "C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\requests\packages\urllib3\util\connection.py", line 98, in create_connection raise err File "C:\Users\mikami.yuki\AppData\Local\Programs\Python\Python36\lib\site-packages\requests\packages\urllib3\util\connection.py", line 88, in create_connection sock.connect(sa) ConnectionRefusedError: [WinError 10061] 対象のコンピューターによって拒否されたため、接続できませんでした。 During handling of the above exception, another exception occurred: (後略)
なんか、リトライしはじめた・・・?
→すみません、ちゃんと読んで(ry
luigid コマンドでスケジューラーを起動。
C:\Users\mikami.yuki\luigi>luigid Defaulting to basic logging; consider specifying logging_conf_file in luigi.cfg. 2017-02-02 00:55:42,910 luigi.scheduler[6328] INFO: No prior state file exists at /luigid/state/luigi-state.pickle. Starting with empty state 2017-02-02 00:55:42,923 luigi.server[6328] INFO: Scheduler starting up
再度Luigiタスクを実行して、ブラウザ表示。
$ luigi --module hello_world_kai HelloWorldTask
→実行状態が表示できました!v
おわりに(所感)
- サンプルコード(一式)を使えば、意外とお手軽に動かすことができました。
- 前Jobの出力ファイルを後Jobが取り込む、別Job完了の待ち合わせが必要、など、依存関係のあるワークフローの制御には最適かと思いました。
- Pythonコードなので、カスタマイズ可能で自由度が高い!
- Job管理ツールではなく、ワークフロー制御のフレームワーク
- Jobそのものに近い?(LuigiありきでJobを実装してもいいかも?
- Airflowも気になる。。(Luigiと比較してみたい!