Python製のワークフロー管理ツール「Prefect」を試してみた

2021.11.28

「Prefect」というPython製のワークフロー管理ツールを試したみたので、簡単に紹介いたします。

Prefectについて

概要

Prefect is a new workflow management system, designed for modern infrastructure and powered by the open-source Prefect Core workflow engine. Users organize Tasks into Flows, and Prefect takes care of the rest.

上記の公式ドキュメントの説明を機械翻訳すると下記となります。

Prefectは、最新のインフラストラクチャ用に設計され、オープンソースのPrefectCoreワークフローエンジンを搭載した新しいワークフロー管理システムです。ユーザーはTaskFlowに編成し、残りはPrefectが処理します。

3種類のプロダクトとして提供されているようです。

  • Prefect Core
    • OSS(Open Source Software)であるため無償で利用可能
    • Python製のワークフローエンジン
    • 自前でホスティングして利用することも可能
  • Prefect Cloud
    • マネージドなクラウドサービスで基本的に有償で利用可能
    • 主な用途としては本番運用
  • Prefect Orion
    • Prefectの第2世代のオーケストレーションエンジン

※本記事ではPrefect Core(OSSワークフローエンジン)について試してみた内容をまとめています。

Apache Airflow との比較

Prefectの開発者がApache Airflowのコントリビューターということもあり、PrefectはApache Airflowを意識して作られたワークフロー管理ツールのようです。

公式ドキュメントにも、Why Not Airflow? というタイトルでApache Airflowとの比較について言及されたページがあります。

下記などが主な違いとして挙げられています。

  • 動的なDAG
  • スケジュール
    • Apache Airflowではスケジュール実行前提での実装になるが、Prefectでは単発手動実行のワークフローなどが実装しやすい(スケジュール実行も可能)

またApache Airflowに比べて、下記の機能が容易に実装できるようです。

  • ローカルデバッグ
  • Slack通知

試してみた

実行環境

※Prefectを実行するには、Python 3.6以上が必須です。

  • macOS Catalina 10.15.7
  • Python 3.7.2
  • prefect 0.15.6
  • pip 21.1.3

インストール

pip にて prefect をインストールします。

$ pip install prefect

正しくインストールできると、prefect コマンドが使えるようになります。(バージョンを確認)

$ prefect version
0.15.6

Flow実行

定番の「Hello world!」を出力するだけの下記スクリプトを実行してみます。

  • @taskデコレータを関数に指定し、Taskとして実際の処理を実装
    • get_paramhelloという2つのTaskを作って順番に実行するだけのワークフロー
  • Flowの部分で、必要なTask(関数)を指定
    • Flowのrun()メソッドを呼ぶことで実際にワークフローが開始される

hello_world.py

from prefect import Flow, task

@task
def get_param():
    return "world"

@task
def hello(param):
    print(f"Hello, {param}!")

with Flow("HelloWorld") as flow:
    param = get_param()
    hello_world = hello(param)

flow.run()

Taskが順番通りに実行されていることがわかります。

$ python hello_world.py
[2021-11-28 04:26:26+0900] INFO - prefect.FlowRunner | Beginning Flow run for 'HelloWorld'
[2021-11-28 04:26:26+0900] INFO - prefect.TaskRunner | Task 'get_param': Starting task run...
[2021-11-28 04:26:26+0900] INFO - prefect.TaskRunner | Task 'get_param': Finished task run for task with final state: 'Success'
[2021-11-28 04:26:26+0900] INFO - prefect.TaskRunner | Task 'hello': Starting task run...
Hello, world!
[2021-11-28 04:26:26+0900] INFO - prefect.TaskRunner | Task 'hello': Finished task run for task with final state: 'Success'
[2021-11-28 04:26:26+0900] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded

パラメータ指定での実行

Flow実行時にパラメータとして、タスクで使用する値を指定することができます。(Parameterクラス)

  • say_helloの引数として、Parameterクラスから取得したparamを指定
  • Flow実行時に、paramを指定して実行

prefect_hello_param.py

from prefect import task, Flow, Parameter

@task(log_stdout=True)
def say_hello(param):
    print(f"Hello, {param}!")

with Flow("My First Flow") as flow:
    param = Parameter('param')
    say_hello(param)

flow.run(param='world') # "Hello, world!"
flow.run(param='cloud') # "Hello, cloud!"

指定した順番でパラメータ指定されたFlowが実行されていることがわかります。

$ python prefect_hello_param.py
[2021-11-28 03:48:12+0900] INFO - prefect.FlowRunner | Beginning Flow run for 'My First Flow'
[2021-11-28 03:48:12+0900] INFO - prefect.TaskRunner | Task 'param': Starting task run...
[2021-11-28 03:48:12+0900] INFO - prefect.TaskRunner | Task 'param': Finished task run for task with final state: 'Success'
[2021-11-28 03:48:12+0900] INFO - prefect.TaskRunner | Task 'say_hello': Starting task run...
[2021-11-28 03:48:12+0900] INFO - prefect.TaskRunner | Hello, world!
[2021-11-28 03:48:12+0900] INFO - prefect.TaskRunner | Task 'say_hello': Finished task run for task with final state: 'Success'
[2021-11-28 03:48:12+0900] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2021-11-28 03:48:12+0900] INFO - prefect.FlowRunner | Beginning Flow run for 'My First Flow'
[2021-11-28 03:48:12+0900] INFO - prefect.TaskRunner | Task 'param': Starting task run...
[2021-11-28 03:48:12+0900] INFO - prefect.TaskRunner | Task 'param': Finished task run for task with final state: 'Success'
[2021-11-28 03:48:12+0900] INFO - prefect.TaskRunner | Task 'say_hello': Starting task run...
[2021-11-28 03:48:12+0900] INFO - prefect.TaskRunner | Hello, cloud!
[2021-11-28 03:48:12+0900] INFO - prefect.TaskRunner | Task 'say_hello': Finished task run for task with final state: 'Success'
[2021-11-28 03:48:12+0900] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded

動的なFlow

動的なFlowを構成するには、タスクのmapメソッドを使用することができます。

  • 実行時に上流タスクから渡された値に応じて動的にタスクを生成しFlowを構成
  • mapメソッドの引数には、上流タスクの結果(リスト)を指定

こちらのコード(dynamic_flow.py)を拝借して、実際の動きを確認してみます。

  • 実行ごとにランダムな長さのリストを生成
  • リストの値をそれぞれ2乗した値の合計を出力

リストの分だけ2乗処理が実行されていることが確認できます。

$ python dynamic_flow.py // 実行1回目
[2021-11-28 05:36:25+0900] INFO - prefect.FlowRunner | Beginning Flow run for 'DynamicFlow'
[2021-11-28 05:36:25+0900] INFO - prefect.TaskRunner | Task 'random_list': Starting task run...
[2021-11-28 05:36:25+0900] INFO - prefect.TaskRunner | Task 'random_list': Finished task run for task with final state: 'Success'
[2021-11-28 05:36:25+0900] INFO - prefect.TaskRunner | Task 'squared': Starting task run...
[2021-11-28 05:36:25+0900] INFO - prefect.TaskRunner | Task 'squared': Finished task run for task with final state: 'Mapped'
[2021-11-28 05:36:25+0900] INFO - prefect.TaskRunner | Task 'squared[0]': Starting task run...
[2021-11-28 05:36:25+0900] INFO - prefect.TaskRunner | Task 'squared[0]': Finished task run for task with final state: 'Success'
[2021-11-28 05:36:25+0900] INFO - prefect.TaskRunner | Task 'squared[1]': Starting task run...
[2021-11-28 05:36:25+0900] INFO - prefect.TaskRunner | Task 'squared[1]': Finished task run for task with final state: 'Success'
[2021-11-28 05:36:25+0900] INFO - prefect.TaskRunner | Task 'squared[2]': Starting task run...
[2021-11-28 05:36:25+0900] INFO - prefect.TaskRunner | Task 'squared[2]': Finished task run for task with final state: 'Success'
[2021-11-28 05:36:25+0900] INFO - prefect.TaskRunner | Task 'squared[3]': Starting task run...
[2021-11-28 05:36:25+0900] INFO - prefect.TaskRunner | Task 'squared[3]': Finished task run for task with final state: 'Success'
[2021-11-28 05:36:25+0900] INFO - prefect.TaskRunner | Task 'squared[4]': Starting task run...
[2021-11-28 05:36:25+0900] INFO - prefect.TaskRunner | Task 'squared[4]': Finished task run for task with final state: 'Success'
[2021-11-28 05:36:25+0900] INFO - prefect.TaskRunner | Task 'sum_up': Starting task run...
Squared results: [0, 1, 4, 9, 16]
Sum: 30
[2021-11-28 05:36:25+0900] INFO - prefect.TaskRunner | Task 'sum_up': Finished task run for task with final state: 'Success'
[2021-11-28 05:36:25+0900] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded

実行ごとにリストの長さが動的であることが確認できます。

$ python dynamic_flow.py // 実行2回目
[2021-11-28 05:38:13+0900] INFO - prefect.FlowRunner | Beginning Flow run for 'DynamicFlow'
[2021-11-28 05:38:13+0900] INFO - prefect.TaskRunner | Task 'random_list': Starting task run...
[2021-11-28 05:38:13+0900] INFO - prefect.TaskRunner | Task 'random_list': Finished task run for task with final state: 'Success'
[2021-11-28 05:38:13+0900] INFO - prefect.TaskRunner | Task 'squared': Starting task run...
[2021-11-28 05:38:13+0900] INFO - prefect.TaskRunner | Task 'squared': Finished task run for task with final state: 'Mapped'
[2021-11-28 05:38:13+0900] INFO - prefect.TaskRunner | Task 'squared[0]': Starting task run...
[2021-11-28 05:38:13+0900] INFO - prefect.TaskRunner | Task 'squared[0]': Finished task run for task with final state: 'Success'
[2021-11-28 05:38:13+0900] INFO - prefect.TaskRunner | Task 'squared[1]': Starting task run...
[2021-11-28 05:38:13+0900] INFO - prefect.TaskRunner | Task 'squared[1]': Finished task run for task with final state: 'Success'
[2021-11-28 05:38:13+0900] INFO - prefect.TaskRunner | Task 'sum_up': Starting task run...
Squared results: [0, 1]
Sum: 1
[2021-11-28 05:38:13+0900] INFO - prefect.TaskRunner | Task 'sum_up': Finished task run for task with final state: 'Success'
[2021-11-28 05:38:13+0900] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded

Prefect Server

ローカル上でPrefect Serverを起動して確認しました。 Docker, Docker Compose が起動していれば、下記のコマンドで簡単に立ち上げることが可能です。

$ prefect backend server
Backend switched to server
$ prefect server start
Pulling postgres ... done
Pulling hasura   ... done
Pulling graphql  ... done
Pulling apollo   ... done
Pulling towel    ... done
Pulling ui       ... done
Creating network "prefect-server" with the default driver
Creating t_postgres_1 ... done
Creating t_hasura_1   ... done
Creating t_graphql_1  ... done
Creating t_towel_1    ... done
Creating t_apollo_1   ... done
Creating t_ui_1       ... done
Attaching to t_postgres_1, t_hasura_1, t_graphql_1, t_towel_1, t_apollo_1, t_ui_1
--- (中略) ---
apollo_1    | Building schema complete!
apollo_1    | Server ready at http://0.0.0.0:4200 🚀 (version: 2021.09.02)
apollo_1    | Sending telemetry to Prefect Technologies, Inc.: {"source":"prefect_server","type":"startup","payload":{"id":"e277c28a-a669-4b65-aec3-e911821bcf8b","prefect_server_version":"2021.09.02","api_version":"0.2.0"}}
graphql_1   | INFO:     172.20.0.5:52838 - "POST /graphql/ HTTP/1.1" 200 OK
graphql_1   | INFO:     172.20.0.5:52850 - "POST /graphql/ HTTP/1.1" 200 OK

                                            WELCOME TO

   _____  _____  ______ ______ ______ _____ _______    _____ ______ _______      ________ _____
  |  __ \|  __ \|  ____|  ____|  ____/ ____|__   __|  / ____|  ____|  __ \ \    / /  ____|  __ \
  | |__) | |__) | |__  | |__  | |__ | |       | |    | (___ | |__  | |__) \ \  / /| |__  | |__) |
  |  ___/|  _  /|  __| |  __| |  __|| |       | |     \___ \|  __| |  _  / \ \/ / |  __| |  _  /
  | |    | | \ \| |____| |    | |___| |____   | |     ____) | |____| | \ \  \  /  | |____| | \ \
  |_|    |_|  \_\______|_|    |______\_____|  |_|    |_____/|______|_|  \_\  \/   |______|_|  \_\


   Visit http://localhost:8080 to get started, or check out the docs at https://docs.prefect.io

(Prefect Server内では、PostgreSQLやGraphQLなどの複数のサーバーが起動しているようです。) ローカルでPrefect Serverが立ち上がったので、任意のプロジェクトを作成し、http://localhost:8080から確認してみました。

$ prefect create project "sample"
sample created

お試しで動的なFlowを確認した際のコード(dynamic_flow.py)を登録します。

$ prefect register --project "sample" --path dynamic_flow.py
Collecting flows...
Processing 'dynamic_flow.py':
  Building `Local` storage...
  Registering 'DynamicFlow'... Done
  └── ID: XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
  └── Version: 1
======================== 1 registered ========================

Flowを実行するためにagentをローカルで起動します。

$ prefect agent local start
[2021-11-28 09:51:03,306] INFO - agent | Registering agent...
[2021-11-28 09:51:03,347] INFO - agent | Registration successful!

 ____            __           _        _                    _
|  _ \ _ __ ___ / _| ___  ___| |_     / \   __ _  ___ _ __ | |_
| |_) | '__/ _ \ |_ / _ \/ __| __|   / _ \ / _` |/ _ \ '_ \| __|
|  __/| | |  __/  _|  __/ (__| |_   / ___ \ (_| |  __/ | | | |_
|_|   |_|  \___|_|  \___|\___|\__| /_/   \_\__, |\___|_| |_|\__|
                                           |___/

[2021-11-28 09:51:03,357] INFO - agent | Starting LocalAgent with labels ['XXXXXXXXXX']
[2021-11-28 09:51:03,357] INFO - agent | Agent documentation can be found at https://docs.prefect.io/orchestration/
[2021-11-28 09:51:03,358] INFO - agent | Waiting for flow runs...

画面上から、QUICK RUNを押下します。 下記のようなログがagentの方でも流れます。

[2021-11-28 10:26:06,848] INFO - agent | Deploying flow run XXXXXXXXXXXXXXXXXXXXXXXXXXXXX to execution environment...
[2021-11-28 10:26:06,905] INFO - agent | Completed deployment of flow run XXXXXXXXXXXXXXXXXXXXXXXXXXXXX

実行結果も[Success]となっており、実行ログも確認できました。

タスクライブラリ

外部サービスと連携するタスクが標準で用意されています。
(下記は一部になります。詳細はこちらから確認できます。)

おわりに

まだ入門的な使い方しか確認できておりませんが、構文などは直感的でわかりやすかったです。
公式でもApache Airflowとの比較について言及しているので、今後はApache Airflow、Prefectそれぞれでの実装や挙動などについては改めて確認していきたいと思います。
以上、DA(データアナリティクス)事業本部のナガマサでした。

参考記事