Apache Airflowの並列性と並行性について理解する

こんにちは。サービスグループの武田です。Airflowは同時実行性能についてチューニングできるようになっているのですが、パラメーターの意味がよくわからなかったので調べてまとめてみました。
2020.10.28

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは。サービスグループの武田です。

Apache AirflowはDAG(有向非巡回グラフ)でワークフローを表現し、ジョブのスケジューリングや監視などを行ってくれるツールです。Airflowはスタンドアローンでの構成もできますし、クラスターもサポートしています。設定で同時実行性能についてチューニングできるようになっているのですが、パラメーターの意味がよくわからなかったので調べてまとめてみました。

Airflowのバージョン

Airflowはバージョンによって設定可能なパラメーターが変わっていることがあります。今回は1.10.12を対象とします。

Executorの理解

Airflowの同時実行性を理解するためには、まずはExecutorの理解が必要です。

ExecutorはAirflowの各タスクをどのように処理するのかを決定します。上記リンク先のドキュメントを見てもらえればわかりますが、組込みでいくつかのExecutorが提供されています。まずは次の3種類を抑えておきましょう。

  • SequentialExecutor
    • AirflowのデフォルトExecutorで、一度にひとつのタスクしか実行できない
    • 一切スケールしない
  • LocalExecutor
    • 1台のAirflow(少なくともSchedulerとWorkerが同じノード)で構成する場合に使用するExecutor
    • マルチプロセスで動作し、スケールアップ によってスケール可能
  • CeleryExecutor
    • Workerをクラスターで構成する場合に使用するExecutor
    • スケールアップおよびスケールアウト でスケール可能

Airflowのアーキテクチャの理解についてはみかみのエントリが役立ちます。SchedulerとWorkerの仲立ちをするのがExecutorの役割です。

並列性および並行性のパラメーター

Airflowの全体像がざっくりつかめたところで、同時実行性能に関するパラメーターを確認していきます。ここでは次のパラメーターを取り上げます。なおxxx.yyy形式のパラメーターはairflow.cfgの「xxxセクションのyyy」という意味です。

  • core.parallelism
  • core.max_active_runs_per_dag / max_active_runs
  • core.dag_concurrency / concurrency
  • scheduler.max_threads
  • celery.worker_concurrency
  • pool / pool_slots
  • task_concurrency

core.parallelism

Airflowクラスター全体の並列数を指定します。デフォルト値は32です。そのままだとハイスペックな環境を用意しても32タスクしか同時に処理しません。LocalExecutorであればプロセス数の上限を意味します。SequentialExecutorはparallelism=1のLocalExecutorとみることができます。

CeleryExecutorを使用する場合は変更が必須のパラメーターです。というのも、Workerを増やしたところでparallelismが低いままだと並列性が変わらないためです。celery.worker_concurrency * worker数が推奨値です。

core.max_active_runs_per_dag / max_active_runs

あるDAGについて同時に実行できるDAGインスタンス数を指定します。デフォルト値は16です。DAG_A、DAG_Bが定義してあった場合、それぞれ16個まで実行できます。max_active_runsはDAG定義時に指定できるパラメーターで、core.max_active_runs_per_dagをDAGごとに上書きできます。core.max_active_runs_per_dagは変更せず、DAGの特性に合わせてmax_active_runsで調整することが推奨です。

core.dag_concurrency / concurrency

あるDAGについてのタスクの並行数を指定します。デフォルト値は16です。たとえば並行なタスクを5個もつDAG_Aを10個実行した場合、最大で50個のタスクを実行できることになりますが、17個目以降のタスクは待たされることになります。concurrencyはDAG定義時に指定できるパラメーターで、core.dag_concurrencyをDAGごとに上書きできます。core.dag_concurrencyにはcore.max_active_runs_per_dag以上の値を設定します。また個別のDAGの特性(長期実行されるタスクの数など)に合わせてconcurrencyで調整することが推奨です。

scheduler.max_threads

SchedulerノードのSchedulerプロセス数を指定します(threadsだけどプロセス)。デフォルト値は2です。DAGの解析およびタスク生成、タスクのスケジューリングを行うプロセスです。Schedulerノードのcore数 〜 2 * core数程度が推奨値です。

celery.worker_concurrency

Workerノード(1台あたり)のWorkerプロセス数を指定します。Workerが同時に処理できるタスク数を意味します。デフォルト値は16です。Workerノードの6-8 * core数または6-8 * memory/3.75GB程度が推奨値です。2coreであればデフォルトの設定でもおおむね問題なく、4core/16GBあたりのマシンであれば32くらいに設定します。

pool / pool_slots

ここまでのパラメーターと異なりpoolおよびpool_slotsは全体の設定項目ではなくタスク(Operator)に設定します。そもそもプールという概念が何かという話ですが、Airflowではあらかじめスロットサイズをもったプールを定義しておきます。default_poolという名前でスロットサイズが128のプールがデフォルトで用意されます。各タスクは実行時およびキューイング時に指定のプールに割り当てられ、またプールはタスクのpool_slots分だけスロットを消費します。つまり前述したparallelismconcurrencyなどに余裕があってもこのプールのスロットが空いていないとタスクが実行されません。

具体的にどう使うのかというと、たとえば外部サービスのAPIを重度に呼び出すようなタスクがあった場合です。何も考えずに大量のタスクを実行するとレートリミットに引っかかってしまう可能性があります。そこで通常タスクとは異なるプールを用意し、それらのタスクをそのプールに割り当てておくことで、それらのタスクレベルでの同時実行制御が実現できます。pool_slotsは消費するスロット数で、重い処理をするようなタスクは多めに設定するなど細かい調整が可能となります。

task_concurrency

task_concurrencyもタスク(Operator)に設定できる項目で、タスクレベルの並行数を指定します。たとえばtask_concurrency=3を設定したtask_1をもつDAG_Aを10個実行した場合を考えます。全体で16個までのタスク(concurrencyのデフォルト)を同時に実行できるわけですが、task_1だけは3個までしか実行できません。

まとめ

Airflowは大規模なバッチ処理やジョブフロー管理も行えるソフトウェアです。一方で、とりあえずWorker増やしておけば勝手にスケールするというわけでもありません。各設定項目の意味を理解し適切なパラメーターチューニングをして、しっかりと性能を引き出していきたいですね。

参考リンク