Python製のワークフロー管理ツール「Prefect」を試してみた
「Prefect」というPython製のワークフロー管理ツールを試したみたので、簡単に紹介いたします。
Prefectについて
概要
Prefect is a new workflow management system, designed for modern infrastructure and powered by the open-source Prefect Core workflow engine. Users organize Tasks into Flows, and Prefect takes care of the rest.
上記の公式ドキュメントの説明を機械翻訳すると下記となります。
Prefectは、最新のインフラストラクチャ用に設計され、オープンソースのPrefectCoreワークフローエンジンを搭載した新しいワークフロー管理システムです。ユーザーは
Task
をFlow
に編成し、残りはPrefectが処理します。
3種類のプロダクトとして提供されているようです。
- Prefect Core
- OSS(Open Source Software)であるため無償で利用可能
- Python製のワークフローエンジン
- 自前でホスティングして利用することも可能
- Prefect Cloud
- マネージドなクラウドサービスで基本的に有償で利用可能
- 主な用途としては本番運用
- Prefect Orion
- Prefectの第2世代のオーケストレーションエンジン
※本記事ではPrefect Core(OSSワークフローエンジン)について試してみた内容をまとめています。
Apache Airflow との比較
Prefectの開発者がApache Airflowのコントリビューターということもあり、PrefectはApache Airflowを意識して作られたワークフロー管理ツールのようです。
公式ドキュメントにも、Why Not Airflow? というタイトルでApache Airflowとの比較について言及されたページがあります。
下記などが主な違いとして挙げられています。
- 動的なDAG
- Apache Airflowで動的なDAGを対応するには、タスク内でのループ処理を実装するなどの必要がある (例: Airflowで動的なワークフローを構成する )
- Prefect ではmapで動的なDAGを対応可能
- スケジュール
- Apache Airflowではスケジュール実行前提での実装になるが、Prefectでは単発手動実行のワークフローなどが実装しやすい(スケジュール実行も可能)
またApache Airflowに比べて、下記の機能が容易に実装できるようです。
- ローカルデバッグ
- Slack通知
試してみた
実行環境
※Prefectを実行するには、Python 3.6以上が必須です。
- macOS Catalina 10.15.7
- Python 3.7.2
- prefect 0.15.6
- pip 21.1.3
インストール
pip にて prefect をインストールします。
$ pip install prefect
正しくインストールできると、prefect コマンドが使えるようになります。(バージョンを確認)
$ prefect version 0.15.6
Flow実行
定番の「Hello world!」を出力するだけの下記スクリプトを実行してみます。
@task
デコレータを関数に指定し、Taskとして実際の処理を実装get_param
とhello
という2つのTaskを作って順番に実行するだけのワークフロー
Flow
の部分で、必要なTask(関数)を指定- Flowの
run()
メソッドを呼ぶことで実際にワークフローが開始される
- Flowの
from prefect import Flow, task @task def get_param(): return "world" @task def hello(param): print(f"Hello, {param}!") with Flow("HelloWorld") as flow: param = get_param() hello_world = hello(param) flow.run()
Taskが順番通りに実行されていることがわかります。
$ python hello_world.py [2021-11-28 04:26:26+0900] INFO - prefect.FlowRunner | Beginning Flow run for 'HelloWorld' [2021-11-28 04:26:26+0900] INFO - prefect.TaskRunner | Task 'get_param': Starting task run... [2021-11-28 04:26:26+0900] INFO - prefect.TaskRunner | Task 'get_param': Finished task run for task with final state: 'Success' [2021-11-28 04:26:26+0900] INFO - prefect.TaskRunner | Task 'hello': Starting task run... Hello, world! [2021-11-28 04:26:26+0900] INFO - prefect.TaskRunner | Task 'hello': Finished task run for task with final state: 'Success' [2021-11-28 04:26:26+0900] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
パラメータ指定での実行
Flow実行時にパラメータとして、タスクで使用する値を指定することができます。(Parameter
クラス)
say_hello
の引数として、Parameterクラスから取得したparam
を指定- Flow実行時に、
param
を指定して実行
from prefect import task, Flow, Parameter @task(log_stdout=True) def say_hello(param): print(f"Hello, {param}!") with Flow("My First Flow") as flow: param = Parameter('param') say_hello(param) flow.run(param='world') # "Hello, world!" flow.run(param='cloud') # "Hello, cloud!"
指定した順番でパラメータ指定されたFlowが実行されていることがわかります。
$ python prefect_hello_param.py [2021-11-28 03:48:12+0900] INFO - prefect.FlowRunner | Beginning Flow run for 'My First Flow' [2021-11-28 03:48:12+0900] INFO - prefect.TaskRunner | Task 'param': Starting task run... [2021-11-28 03:48:12+0900] INFO - prefect.TaskRunner | Task 'param': Finished task run for task with final state: 'Success' [2021-11-28 03:48:12+0900] INFO - prefect.TaskRunner | Task 'say_hello': Starting task run... [2021-11-28 03:48:12+0900] INFO - prefect.TaskRunner | Hello, world! [2021-11-28 03:48:12+0900] INFO - prefect.TaskRunner | Task 'say_hello': Finished task run for task with final state: 'Success' [2021-11-28 03:48:12+0900] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded [2021-11-28 03:48:12+0900] INFO - prefect.FlowRunner | Beginning Flow run for 'My First Flow' [2021-11-28 03:48:12+0900] INFO - prefect.TaskRunner | Task 'param': Starting task run... [2021-11-28 03:48:12+0900] INFO - prefect.TaskRunner | Task 'param': Finished task run for task with final state: 'Success' [2021-11-28 03:48:12+0900] INFO - prefect.TaskRunner | Task 'say_hello': Starting task run... [2021-11-28 03:48:12+0900] INFO - prefect.TaskRunner | Hello, cloud! [2021-11-28 03:48:12+0900] INFO - prefect.TaskRunner | Task 'say_hello': Finished task run for task with final state: 'Success' [2021-11-28 03:48:12+0900] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
動的なFlow
動的なFlowを構成するには、タスクのmap
メソッドを使用することができます。
- 実行時に上流タスクから渡された値に応じて動的にタスクを生成しFlowを構成
map
メソッドの引数には、上流タスクの結果(リスト)を指定
こちらのコード(dynamic_flow.py)を拝借して、実際の動きを確認してみます。
- 実行ごとにランダムな長さのリストを生成
- リストの値をそれぞれ2乗した値の合計を出力
リストの分だけ2乗処理が実行されていることが確認できます。
$ python dynamic_flow.py // 実行1回目 [2021-11-28 05:36:25+0900] INFO - prefect.FlowRunner | Beginning Flow run for 'DynamicFlow' [2021-11-28 05:36:25+0900] INFO - prefect.TaskRunner | Task 'random_list': Starting task run... [2021-11-28 05:36:25+0900] INFO - prefect.TaskRunner | Task 'random_list': Finished task run for task with final state: 'Success' [2021-11-28 05:36:25+0900] INFO - prefect.TaskRunner | Task 'squared': Starting task run... [2021-11-28 05:36:25+0900] INFO - prefect.TaskRunner | Task 'squared': Finished task run for task with final state: 'Mapped' [2021-11-28 05:36:25+0900] INFO - prefect.TaskRunner | Task 'squared[0]': Starting task run... [2021-11-28 05:36:25+0900] INFO - prefect.TaskRunner | Task 'squared[0]': Finished task run for task with final state: 'Success' [2021-11-28 05:36:25+0900] INFO - prefect.TaskRunner | Task 'squared[1]': Starting task run... [2021-11-28 05:36:25+0900] INFO - prefect.TaskRunner | Task 'squared[1]': Finished task run for task with final state: 'Success' [2021-11-28 05:36:25+0900] INFO - prefect.TaskRunner | Task 'squared[2]': Starting task run... [2021-11-28 05:36:25+0900] INFO - prefect.TaskRunner | Task 'squared[2]': Finished task run for task with final state: 'Success' [2021-11-28 05:36:25+0900] INFO - prefect.TaskRunner | Task 'squared[3]': Starting task run... [2021-11-28 05:36:25+0900] INFO - prefect.TaskRunner | Task 'squared[3]': Finished task run for task with final state: 'Success' [2021-11-28 05:36:25+0900] INFO - prefect.TaskRunner | Task 'squared[4]': Starting task run... [2021-11-28 05:36:25+0900] INFO - prefect.TaskRunner | Task 'squared[4]': Finished task run for task with final state: 'Success' [2021-11-28 05:36:25+0900] INFO - prefect.TaskRunner | Task 'sum_up': Starting task run... Squared results: [0, 1, 4, 9, 16] Sum: 30 [2021-11-28 05:36:25+0900] INFO - prefect.TaskRunner | Task 'sum_up': Finished task run for task with final state: 'Success' [2021-11-28 05:36:25+0900] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
実行ごとにリストの長さが動的であることが確認できます。
$ python dynamic_flow.py // 実行2回目 [2021-11-28 05:38:13+0900] INFO - prefect.FlowRunner | Beginning Flow run for 'DynamicFlow' [2021-11-28 05:38:13+0900] INFO - prefect.TaskRunner | Task 'random_list': Starting task run... [2021-11-28 05:38:13+0900] INFO - prefect.TaskRunner | Task 'random_list': Finished task run for task with final state: 'Success' [2021-11-28 05:38:13+0900] INFO - prefect.TaskRunner | Task 'squared': Starting task run... [2021-11-28 05:38:13+0900] INFO - prefect.TaskRunner | Task 'squared': Finished task run for task with final state: 'Mapped' [2021-11-28 05:38:13+0900] INFO - prefect.TaskRunner | Task 'squared[0]': Starting task run... [2021-11-28 05:38:13+0900] INFO - prefect.TaskRunner | Task 'squared[0]': Finished task run for task with final state: 'Success' [2021-11-28 05:38:13+0900] INFO - prefect.TaskRunner | Task 'squared[1]': Starting task run... [2021-11-28 05:38:13+0900] INFO - prefect.TaskRunner | Task 'squared[1]': Finished task run for task with final state: 'Success' [2021-11-28 05:38:13+0900] INFO - prefect.TaskRunner | Task 'sum_up': Starting task run... Squared results: [0, 1] Sum: 1 [2021-11-28 05:38:13+0900] INFO - prefect.TaskRunner | Task 'sum_up': Finished task run for task with final state: 'Success' [2021-11-28 05:38:13+0900] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Prefect Server
ローカル上でPrefect Serverを起動して確認しました。 Docker, Docker Compose が起動していれば、下記のコマンドで簡単に立ち上げることが可能です。
$ prefect backend server Backend switched to server
$ prefect server start Pulling postgres ... done Pulling hasura ... done Pulling graphql ... done Pulling apollo ... done Pulling towel ... done Pulling ui ... done Creating network "prefect-server" with the default driver Creating t_postgres_1 ... done Creating t_hasura_1 ... done Creating t_graphql_1 ... done Creating t_towel_1 ... done Creating t_apollo_1 ... done Creating t_ui_1 ... done Attaching to t_postgres_1, t_hasura_1, t_graphql_1, t_towel_1, t_apollo_1, t_ui_1 --- (中略) --- apollo_1 | Building schema complete! apollo_1 | Server ready at http://0.0.0.0:4200 ? (version: 2021.09.02) apollo_1 | Sending telemetry to Prefect Technologies, Inc.: {"source":"prefect_server","type":"startup","payload":{"id":"e277c28a-a669-4b65-aec3-e911821bcf8b","prefect_server_version":"2021.09.02","api_version":"0.2.0"}} graphql_1 | INFO: 172.20.0.5:52838 - "POST /graphql/ HTTP/1.1" 200 OK graphql_1 | INFO: 172.20.0.5:52850 - "POST /graphql/ HTTP/1.1" 200 OK WELCOME TO _____ _____ ______ ______ ______ _____ _______ _____ ______ _______ ________ _____ | __ \| __ \| ____| ____| ____/ ____|__ __| / ____| ____| __ \ \ / / ____| __ \ | |__) | |__) | |__ | |__ | |__ | | | | | (___ | |__ | |__) \ \ / /| |__ | |__) | | ___/| _ /| __| | __| | __|| | | | \___ \| __| | _ / \ \/ / | __| | _ / | | | | \ \| |____| | | |___| |____ | | ____) | |____| | \ \ \ / | |____| | \ \ |_| |_| \_\______|_| |______\_____| |_| |_____/|______|_| \_\ \/ |______|_| \_\ Visit http://localhost:8080 to get started, or check out the docs at https://docs.prefect.io
(Prefect Server内では、PostgreSQLやGraphQLなどの複数のサーバーが起動しているようです。)
ローカルでPrefect Serverが立ち上がったので、任意のプロジェクトを作成し、http://localhost:8080
から確認してみました。
$ prefect create project "sample" sample created
お試しで動的なFlowを確認した際のコード(dynamic_flow.py)を登録します。
$ prefect register --project "sample" --path dynamic_flow.py Collecting flows... Processing 'dynamic_flow.py': Building `Local` storage... Registering 'DynamicFlow'... Done └── ID: XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX └── Version: 1 ======================== 1 registered ========================
Flowを実行するためにagentをローカルで起動します。
$ prefect agent local start [2021-11-28 09:51:03,306] INFO - agent | Registering agent... [2021-11-28 09:51:03,347] INFO - agent | Registration successful! ____ __ _ _ _ | _ \ _ __ ___ / _| ___ ___| |_ / \ __ _ ___ _ __ | |_ | |_) | '__/ _ \ |_ / _ \/ __| __| / _ \ / _` |/ _ \ '_ \| __| | __/| | | __/ _| __/ (__| |_ / ___ \ (_| | __/ | | | |_ |_| |_| \___|_| \___|\___|\__| /_/ \_\__, |\___|_| |_|\__| |___/ [2021-11-28 09:51:03,357] INFO - agent | Starting LocalAgent with labels ['XXXXXXXXXX'] [2021-11-28 09:51:03,357] INFO - agent | Agent documentation can be found at https://docs.prefect.io/orchestration/ [2021-11-28 09:51:03,358] INFO - agent | Waiting for flow runs...
画面上から、QUICK RUN
を押下します。
下記のようなログがagentの方でも流れます。
[2021-11-28 10:26:06,848] INFO - agent | Deploying flow run XXXXXXXXXXXXXXXXXXXXXXXXXXXXX to execution environment... [2021-11-28 10:26:06,905] INFO - agent | Completed deployment of flow run XXXXXXXXXXXXXXXXXXXXXXXXXXXXX
実行結果も[Success]となっており、実行ログも確認できました。
タスクライブラリ
外部サービスと連携するタスクが標準で用意されています。 (下記は一部になります。詳細はこちらから確認できます。)
おわりに
まだ入門的な使い方しか確認できておりませんが、構文などは直感的でわかりやすかったです。 公式でもApache Airflowとの比較について言及しているので、今後はApache Airflow、Prefectそれぞれでの実装や挙動などについては改めて確認していきたいと思います。 以上、DA(データアナリティクス)事業本部のナガマサでした。