Apache Airflowの並列性と並行性について理解する
こんにちは。サービスグループの武田です。
Apache AirflowはDAG(有向非巡回グラフ)でワークフローを表現し、ジョブのスケジューリングや監視などを行ってくれるツールです。Airflowはスタンドアローンでの構成もできますし、クラスターもサポートしています。設定で同時実行性能についてチューニングできるようになっているのですが、パラメーターの意味がよくわからなかったので調べてまとめてみました。
Airflowのバージョン
Airflowはバージョンによって設定可能なパラメーターが変わっていることがあります。今回は1.10.12
を対象とします。
Executorの理解
Airflowの同時実行性を理解するためには、まずはExecutorの理解が必要です。
ExecutorはAirflowの各タスクをどのように処理するのかを決定します。上記リンク先のドキュメントを見てもらえればわかりますが、組込みでいくつかのExecutorが提供されています。まずは次の3種類を抑えておきましょう。
- SequentialExecutor
- AirflowのデフォルトExecutorで、一度にひとつのタスクしか実行できない
- 一切スケールしない
- LocalExecutor
- 1台のAirflow(少なくともSchedulerとWorkerが同じノード)で構成する場合に使用するExecutor
- マルチプロセスで動作し、スケールアップ によってスケール可能
- CeleryExecutor
- Workerをクラスターで構成する場合に使用するExecutor
- スケールアップおよびスケールアウト でスケール可能
Airflowのアーキテクチャの理解についてはみかみのエントリが役立ちます。SchedulerとWorkerの仲立ちをするのがExecutorの役割です。
並列性および並行性のパラメーター
Airflowの全体像がざっくりつかめたところで、同時実行性能に関するパラメーターを確認していきます。ここでは次のパラメーターを取り上げます。なおxxx.yyy
形式のパラメーターはairflow.cfg
の「xxx
セクションのyyy
」という意味です。
- core.parallelism
- core.max_active_runs_per_dag / max_active_runs
- core.dag_concurrency / concurrency
- scheduler.max_threads
- celery.worker_concurrency
- pool / pool_slots
- task_concurrency
core.parallelism
Airflowクラスター全体の並列数を指定します。デフォルト値は32です。そのままだとハイスペックな環境を用意しても32タスクしか同時に処理しません。LocalExecutorであればプロセス数の上限を意味します。SequentialExecutorはparallelism=1
のLocalExecutorとみることができます。
CeleryExecutorを使用する場合は変更が必須のパラメーターです。というのも、Workerを増やしたところでparallelism
が低いままだと並列性が変わらないためです。celery.worker_concurrency * worker数
が推奨値です。
core.max_active_runs_per_dag / max_active_runs
あるDAGについて同時に実行できるDAGインスタンス数を指定します。デフォルト値は16です。DAG_A、DAG_Bが定義してあった場合、それぞれ16個まで実行できます。max_active_runs
はDAG定義時に指定できるパラメーターで、core.max_active_runs_per_dag
をDAGごとに上書きできます。core.max_active_runs_per_dag
は変更せず、DAGの特性に合わせてmax_active_runs
で調整することが推奨です。
core.dag_concurrency / concurrency
あるDAGについてのタスクの並行数を指定します。デフォルト値は16です。たとえば並行なタスクを5個もつDAG_Aを10個実行した場合、最大で50個のタスクを実行できることになりますが、17個目以降のタスクは待たされることになります。concurrency
はDAG定義時に指定できるパラメーターで、core.dag_concurrency
をDAGごとに上書きできます。core.dag_concurrency
にはcore.max_active_runs_per_dag
以上の値を設定します。また個別のDAGの特性(長期実行されるタスクの数など)に合わせてconcurrency
で調整することが推奨です。
scheduler.max_threads
SchedulerノードのSchedulerプロセス数を指定します(threads
だけどプロセス)。デフォルト値は2です。DAGの解析およびタスク生成、タスクのスケジューリングを行うプロセスです。Schedulerノードのcore数 〜 2 * core数
程度が推奨値です。
celery.worker_concurrency
Workerノード(1台あたり)のWorkerプロセス数を指定します。Workerが同時に処理できるタスク数を意味します。デフォルト値は16です。Workerノードの6-8 * core数
または6-8 * memory/3.75GB
程度が推奨値です。2coreであればデフォルトの設定でもおおむね問題なく、4core/16GBあたりのマシンであれば32
くらいに設定します。
pool / pool_slots
ここまでのパラメーターと異なりpool
およびpool_slots
は全体の設定項目ではなくタスク(Operator)に設定します。そもそもプールという概念が何かという話ですが、Airflowではあらかじめスロットサイズをもったプールを定義しておきます。default_pool
という名前でスロットサイズが128
のプールがデフォルトで用意されます。各タスクは実行時およびキューイング時に指定のプールに割り当てられ、またプールはタスクのpool_slots
分だけスロットを消費します。つまり前述したparallelism
やconcurrency
などに余裕があってもこのプールのスロットが空いていないとタスクが実行されません。
具体的にどう使うのかというと、たとえば外部サービスのAPIを重度に呼び出すようなタスクがあった場合です。何も考えずに大量のタスクを実行するとレートリミットに引っかかってしまう可能性があります。そこで通常タスクとは異なるプールを用意し、それらのタスクをそのプールに割り当てておくことで、それらのタスクレベルでの同時実行制御が実現できます。pool_slots
は消費するスロット数で、重い処理をするようなタスクは多めに設定するなど細かい調整が可能となります。
task_concurrency
task_concurrency
もタスク(Operator)に設定できる項目で、タスクレベルの並行数を指定します。たとえばtask_concurrency=3
を設定したtask_1をもつDAG_Aを10個実行した場合を考えます。全体で16個までのタスク(concurrencyのデフォルト)を同時に実行できるわけですが、task_1だけは3個までしか実行できません。
まとめ
Airflowは大規模なバッチ処理やジョブフロー管理も行えるソフトウェアです。一方で、とりあえずWorker増やしておけば勝手にスケールするというわけでもありません。各設定項目の意味を理解し適切なパラメーターチューニングをして、しっかりと性能を引き出していきたいですね。