この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
こんにちは、みかみです。
Python でバッチ作ってます。Python で Workflow 管理といえば、Airflow とか、Luigi とか?
だけどフレームワークって、便利な反面カスタマイズしにくかったり回りくどくなったりで、バッチ処理くらいだったら全部自分でコーディングできちゃうし。
でも自分で書いてると、だんだん見返すのが億劫なコードになってきて。。。
はじめに
やりたいこと
- バッチ Job の Workflow 管理部を簡単に実装したい
- 普通にコード書いて実装してたら if 文とかですごく見にくくなってきた warkflow 制御部を、誰でもメンテできる可読性の高いコードにしたい
- Python と親和性の高い Job 管理部にしたい(Job を Python で書いてるので
- お手軽に実装したい(GUI やスケジューラー、監視(通知)機能はなくてもいい
動作環境
- OS:Windows10(Mac VMware Fusion)
- Python 3.6.0
やってみた
workflow(2.1.2) を インストール
pip で workflow をインストールしました。
インストール時、2件ほどエラー出ましたが。。
AttributeError: module 'enum' has no attribute 'IntFlag'
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x83 in position 32: invalid start byte
- pipでUnicodeDecodeErrorが発生(Windows環境) | Qiita
- Add one more fail-safe when decoding console to str in Py3 #4310 | GitHub
\Python36\Lib\site-packages\enum
のフォルダ名変更して、pull request の変更反映して、install できました。。
C:\Users\mikami.yuki>pip install workflow
Collecting workflow
Using cached workflow-2.1.2-py2.py3-none-any.whl
(省略)
Installing collected packages: dulwich, autosemver, workflow
Successfully installed autosemver-0.5.2 dulwich-0.18.2 workflow-2.1.2
先行 Job の実行結果によって次に実行する Job を変える
実行する Job 本体を4つ準備しました。
- Job check:とりあえず、チェック結果(True or False)はコマンドラインのパラメータでもらい、そのまま返します。
- Job A:もらったパラメータを出力して、Job A 実行状態を更新します。
- Job B:もらったパラメータを出力して、Job B 実行状態を更新します。
- Job comp:もらったパラメータを出力します。
job.py
def check(obj, eng):
print('execute check')
return obj.data
def job_a(obj, eng):
print('execute Job A')
print(' Job A {}'.format(obj.job_a))
print(' Job B {}'.format(obj.job_b))
obj.job_a = 'done'
return obj
def job_b(obj, eng):
print('execute Job B')
print(' Job A : {}'.format(obj.job_a))
print(' Job B : {}'.format(obj.job_b))
obj.job_b = 'done'
return obj
def comp(obj, eng):
print('execute comp')
print(' input : {}'.format(obj.data))
print(' Job A : {}'.format(obj.job_a))
print(' Job B : {}'.format(obj.job_b))
実行するJobは、配列で定義して workflow パッケージに渡します。
制御条件も指定できます。
Job check の結果が True ならば Job Aと B を、False ならば Job A だけを実行するように定義しました。
my_flow.py
from workflow.patterns.controlflow import IF_ELSE
from job import check, comp, job_a, job_b
my_workflow = [
IF_ELSE(
check,
[ job_a, job_b ],
[ job_a ]
),
comp
]
次は workflow のインスタンスを作成して実行するメインタスクです。
Job の実行パラメータを Wrapper クラスを介して Job に渡します。
パラメータには Job A と Job B の実行状態も持たせました。
task_main.py
from workflow.engine import GenericWorkflowEngine
from my_flow import my_workflow
import sys
from distutils.util import strtobool
class ParamWrapper(object):
def __init__(self, data):
self.data = data
self.job_a = 'yet'
self.job_b = 'yet'
my_engine = GenericWorkflowEngine()
my_engine.callbacks.replace(my_workflow)
my_object = ParamWrapper(strtobool(sys.argv[1]))
my_engine.process([my_object])
コードの準備はできました。
まずはコマンドラインから True を指定して実行してみます。
Job check の実行結果が True になるはずなので、Job A と Job B が実行されるはずです。
C:\Users\mikami.yuki\work\workflow\src>py task_main.py True
execute check
execute Job A
Job A yet
Job B yet
execute Job B
Job A : done
Job B : yet
execute comp
input : 1
Job A : done
Job B : done
Job A、 Job B ともに実行されました。
次はパラメータで False を指定してみます。
Job check の結果が False になるので、Job A しか実行されないはずです。
C:\Users\mikami.yuki\work\workflow\src>py task_main.py False
execute check
execute Job A
Job A yet
Job B yet
execute comp
input : 0
Job A : done
Job B : yet
期待通り、Job A だけが実行されました。
Job が失敗したらリトライする
例えばDBコネクトエラーなどでリトライしたいケースを想定。
準備した Job は A, B, C, D の4つ。Job B に処理中断する仕込みを入れました。
コマンドラインでパラメータに True を指定した場合は中断します。
job.py
def job_a(obj, eng):
print('execute Job A')
def job_b(obj, eng):
print('execute Job B')
if obj.data:
print('Raising HaltProcessing')
eng.halt('interrupting this workflow.')
def job_c(obj, eng):
print('execute Job C')
def job_d(obj, eng):
print('execute Job D')
ワークフロー定義では特に制御条件を入れていません。
my_flow.py
from job import job_a, job_b, job_c, job_d
my_workflow = [
job_a,
job_b,
job_c,
job_d
]
メインタスクで処理中断を catch して、リトライします。
task_main.py
from workflow.errors import HaltProcessing
from workflow.engine import GenericWorkflowEngine
from my_flow import my_workflow
import sys
from distutils.util import strtobool
class ParamWrapper(object):
def __init__(self, data):
self.data = data
my_engine = GenericWorkflowEngine()
my_engine.callbacks.replace(my_workflow)
my_object = ParamWrapper(strtobool(sys.argv[1]))
try:
my_engine.process([my_object])
except HaltProcessing:
for i in range(3):
try:
my_engine.restart('current', 'current')
except HaltProcessing:
continue
else:
break
処理中断の仕込みに引っかからないパラメータ、False を指定した場合は
C:\Users\mikami.yuki\work\workflow\src\halt>py task_main.py False
execute Job A
execute Job B
execute Job C
execute Job D
Job はワークフロー定義で指定した通り順番に実行されます。
が、コマンドラインからパラメータに True を指定して、処理を中断させてみると、
C:\Users\mikami.yuki\work\workflow\src\halt>py task_main.py True
execute Job A
execute Job B
Raising HaltProcessing
execute Job B
Raising HaltProcessing
execute Job B
Raising HaltProcessing
execute Job B
Raising HaltProcessing
仕込みを入れた Job B で処理中断した後、メインタスクで指定した3回リトライしてワークフロー終了しました。
リトライで処理が正常復帰したケースを想定して、メインタスクにも仕込みを入れてみます。
task_main.py
(省略)
try:
my_engine.process([my_object])
except HaltProcessing:
for i in range(3):
try:
my_object.data = False
my_engine.restart('current', 'current')
except HaltProcessing:
continue
else:
break
実行すると
C:\Users\mikami.yuki\work\workflow\src\halt>py task_main.py True
execute Job A
execute Job B
Raising HaltProcessing
execute Job B
execute Job C
execute Job D
ちゃんと、中断した Job から先のワークフローを実行してくれました。
他にも、正常復帰したら最初の Job からやり直すことや
task_main.py
(省略)
try:
my_engine.process([my_object])
except HaltProcessing:
for i in range(3):
try:
my_object.data = False
my_engine.restart('current', 'first')
except HaltProcessing:
continue
else:
break
C:\Users\mikami.yuki\work\workflow\src\halt>py task_main.py True
execute Job A
execute Job B
Raising HaltProcessing
execute Job A
execute Job B
execute Job C
execute Job D
正常復帰しない場合に次の Job から継続することも、簡単に実装できました。
task_main.py
(省略)
try:
my_engine.process([my_object])
except HaltProcessing:
for i in range(3):
try:
my_engine.restart('current', 'current')
except HaltProcessing:
continue
else:
break
my_engine.restart('current', 'next')
C:\Users\mikami.yuki\work\workflow\src\halt>py task_main.py True
execute Job A
execute Job B
Raising HaltProcessing
execute Job B
Raising HaltProcessing
execute Job B
Raising HaltProcessing
execute Job B
Raising HaltProcessing
execute Job C
execute Job D
おわりに(まとめ・所感)
- お手軽に実装できました。
- Wrapper 作ったりすれば好きにカスタマイズできそう。
- ドキュメントは少ない?(google 先生に聞いてもあんまりいい答え返ってこない。。
- あるものは使うに限る!(自分でちまちまコーディングするより、やっぱりパッケージ使った方がらくちん