Python の workflow Package でバッチ Job を管理してみた

こんにちは、みかみです。

Python でバッチ作ってます。Python で Workflow 管理といえば、Airflow とか、Luigi とか?

だけどフレームワークって、便利な反面カスタマイズしにくかったり回りくどくなったりで、バッチ処理くらいだったら全部自分でコーディングできちゃうし。

でも自分で書いてると、だんだん見返すのが億劫なコードになってきて。。。

はじめに

やりたいこと

  • バッチ Job の Workflow 管理部を簡単に実装したい
  • 普通にコード書いて実装してたら if 文とかですごく見にくくなってきた warkflow 制御部を、誰でもメンテできる可読性の高いコードにしたい
  • Python と親和性の高い Job 管理部にしたい(Job を Python で書いてるので
  • お手軽に実装したい(GUI やスケジューラー、監視(通知)機能はなくてもいい

動作環境

  • OS:Windows10(Mac VMware Fusion)
  • Python 3.6.0

やってみた

workflow(2.1.2) を インストール

pip で workflow をインストールしました。

インストール時、2件ほどエラー出ましたが。。

AttributeError: module 'enum' has no attribute 'IntFlag'
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x83 in position 32: invalid start byte

\Python36\Lib\site-packages\enum のフォルダ名変更して、pull request の変更反映して、install できました。。

C:\Users\mikami.yuki>pip install workflow
Collecting workflow
  Using cached workflow-2.1.2-py2.py3-none-any.whl
(省略)
Installing collected packages: dulwich, autosemver, workflow
Successfully installed autosemver-0.5.2 dulwich-0.18.2 workflow-2.1.2

先行 Job の実行結果によって次に実行する Job を変える

実行する Job 本体を4つ準備しました。

  • Job check:とりあえず、チェック結果(True or False)はコマンドラインのパラメータでもらい、そのまま返します。
  • Job A:もらったパラメータを出力して、Job A 実行状態を更新します。
  • Job B:もらったパラメータを出力して、Job B 実行状態を更新します。
  • Job comp:もらったパラメータを出力します。
def check(obj, eng):
    print('execute check')
    return obj.data

def job_a(obj, eng):
    print('execute Job A')
    print('  Job A  {}'.format(obj.job_a))
    print('  Job B  {}'.format(obj.job_b))
    obj.job_a = 'done'
    return obj

def job_b(obj, eng):
    print('execute Job B')
    print('  Job A : {}'.format(obj.job_a))
    print('  Job B : {}'.format(obj.job_b))
    obj.job_b = 'done'
    return obj

def comp(obj, eng):
    print('execute comp')
    print('  input : {}'.format(obj.data))
    print('  Job A : {}'.format(obj.job_a))
    print('  Job B : {}'.format(obj.job_b))

実行するJobは、配列で定義して workflow パッケージに渡します。

制御条件も指定できます。

Job check の結果が True ならば Job Aと B を、False ならば Job A だけを実行するように定義しました。

from workflow.patterns.controlflow import IF_ELSE
from job import check, comp, job_a, job_b

my_workflow = [
    IF_ELSE(
        check,
        [ job_a, job_b ],
        [ job_a ]
    ),
    comp
]

次は workflow のインスタンスを作成して実行するメインタスクです。

Job の実行パラメータを Wrapper クラスを介して Job に渡します。

パラメータには Job A と Job B の実行状態も持たせました。

from workflow.engine import GenericWorkflowEngine
from my_flow import my_workflow

import sys
from distutils.util import strtobool

class ParamWrapper(object):
    def __init__(self, data):
        self.data = data
        self.job_a = 'yet'
        self.job_b = 'yet'

my_engine = GenericWorkflowEngine()
my_engine.callbacks.replace(my_workflow)

my_object = ParamWrapper(strtobool(sys.argv[1]))
my_engine.process([my_object])

コードの準備はできました。

まずはコマンドラインから True を指定して実行してみます。

Job check の実行結果が True になるはずなので、Job A と Job B が実行されるはずです。

C:\Users\mikami.yuki\work\workflow\src>py task_main.py True
execute check
execute Job A
  Job A  yet
  Job B  yet
execute Job B
  Job A : done
  Job B : yet
execute comp
  input : 1
  Job A : done
  Job B : done

Job A、 Job B ともに実行されました。

次はパラメータで False を指定してみます。

Job check の結果が False になるので、Job A しか実行されないはずです。

C:\Users\mikami.yuki\work\workflow\src>py task_main.py False
execute check
execute Job A
  Job A  yet
  Job B  yet
execute comp
  input : 0
  Job A : done
  Job B : yet

期待通り、Job A だけが実行されました。

Job が失敗したらリトライする

例えばDBコネクトエラーなどでリトライしたいケースを想定。

準備した Job は A, B, C, D の4つ。Job B に処理中断する仕込みを入れました。

コマンドラインでパラメータに True を指定した場合は中断します。

def job_a(obj, eng):
    print('execute Job A')

def job_b(obj, eng):
    print('execute Job B')
    if obj.data:
        print('Raising HaltProcessing')
        eng.halt('interrupting this workflow.')

def job_c(obj, eng):
    print('execute Job C')

def job_d(obj, eng):
    print('execute Job D')

ワークフロー定義では特に制御条件を入れていません。

from job import job_a, job_b, job_c, job_d

my_workflow = [
    job_a,
    job_b,
    job_c,
    job_d
]

メインタスクで処理中断を catch して、リトライします。

from workflow.errors import HaltProcessing
from workflow.engine import GenericWorkflowEngine
from my_flow import my_workflow

import sys
from distutils.util import strtobool

class ParamWrapper(object):
    def __init__(self, data):
        self.data = data

my_engine = GenericWorkflowEngine()
my_engine.callbacks.replace(my_workflow)

my_object = ParamWrapper(strtobool(sys.argv[1]))
try:
    my_engine.process([my_object])
except HaltProcessing:
    for i in range(3):
        try:
            my_engine.restart('current', 'current')
        except HaltProcessing:
            continue
        else:
            break

処理中断の仕込みに引っかからないパラメータ、False を指定した場合は

C:\Users\mikami.yuki\work\workflow\src\halt>py task_main.py False
execute Job A
execute Job B
execute Job C
execute Job D

Job はワークフロー定義で指定した通り順番に実行されます。

が、コマンドラインからパラメータに True を指定して、処理を中断させてみると、

C:\Users\mikami.yuki\work\workflow\src\halt>py task_main.py True
execute Job A
execute Job B
Raising HaltProcessing
execute Job B
Raising HaltProcessing
execute Job B
Raising HaltProcessing
execute Job B
Raising HaltProcessing

仕込みを入れた Job B で処理中断した後、メインタスクで指定した3回リトライしてワークフロー終了しました。

リトライで処理が正常復帰したケースを想定して、メインタスクにも仕込みを入れてみます。

(省略)
try:
    my_engine.process([my_object])
except HaltProcessing:
    for i in range(3):
        try:
            my_object.data = False
            my_engine.restart('current', 'current')
        except HaltProcessing:
            continue
        else:
            break

実行すると

C:\Users\mikami.yuki\work\workflow\src\halt>py task_main.py True
execute Job A
execute Job B
Raising HaltProcessing
execute Job B
execute Job C
execute Job D

ちゃんと、中断した Job から先のワークフローを実行してくれました。

他にも、正常復帰したら最初の Job からやり直すことや

(省略)
try:
    my_engine.process([my_object])
except HaltProcessing:
    for i in range(3):
        try:
            my_object.data = False
            my_engine.restart('current', 'first')
        except HaltProcessing:
            continue
        else:
            break
C:\Users\mikami.yuki\work\workflow\src\halt>py task_main.py True
execute Job A
execute Job B
Raising HaltProcessing
execute Job A
execute Job B
execute Job C
execute Job D

正常復帰しない場合に次の Job から継続することも、簡単に実装できました。

(省略)
try:
    my_engine.process([my_object])
except HaltProcessing:
    for i in range(3):
        try:
            my_engine.restart('current', 'current')
        except HaltProcessing:
            continue
        else:
            break
    my_engine.restart('current', 'next')
C:\Users\mikami.yuki\work\workflow\src\halt>py task_main.py True
execute Job A
execute Job B
Raising HaltProcessing
execute Job B
Raising HaltProcessing
execute Job B
Raising HaltProcessing
execute Job B
Raising HaltProcessing
execute Job C
execute Job D

おわりに(まとめ・所感)

  • お手軽に実装できました。
  • Wrapper 作ったりすれば好きにカスタマイズできそう。
  • ドキュメントは少ない?(google 先生に聞いてもあんまりいい答え返ってこない。。
  • あるものは使うに限る!(自分でちまちまコーディングするより、やっぱりパッケージ使った方がらくちん

参考