Cloud Composer で DAG の多重起動を防ぎたい

2023.06.04

こんにちは!エノカワです。

Cloud Composer は、Apache Airflow で構築されたフルマネージドのワークフローオーケストレーションサービスです。
Apache Airflow のオープンソース プロジェクトを基に構築されており、Python プログラミング言語を使用して動作します。

やりたいこと

データパイプラインの管理において、同じDAGが同時に多重に実行されると、リソースの消費が増えます。
また、データの整合性を保つための困難さが増します。
とくに処理に時間を要するタスクが含まれる場合、同じDAGが複数回実行されると予期しない結果が生じることがあります。

今回はmax_active_runsパラメータを使用してDAGの多重起動を防止する方法をご紹介します。

max_active_runs

max_active_runsはDAG設定の一部で、同時に実行できるDAGの最大数を設定します。
この値を'1'に設定すると、同時に2つ以上のDAGインスタンスが実行されることはありません。
これにより、前のDAG実行が完了するまで、次のDAG実行が待機状態になります。

それでは、max_active_runsを使ったDAGを作成し、Cloud Composer で動かしてみましょう。

Cloud Composer 環境を作成

DAGを動かす Cloud Composer 環境を作成します。

Google Cloud コンソールで Cloud Composer の[環境の作成]ページに移動し、Cloud Composer 環境を作成します。

test-composerという名前で、東京リージョン、最新のイメージバージョンを選択し、サービスアカウントなど他はでデフォルトのままで作成しました。

DAG を作成

Cloud Composer 環境で実行する DAGを作成します。

以下は、DAGが多重起動する可能性がある例です。
このDAGでは、2分ごとにスケジュールされ、スリープタスクが3分間スリープするため、次のスケジュール時に前のDAG実行がまだ実行中である可能性があります。

sleep_for_a_while.py

from datetime import timedelta
import time
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago


def sleep_for_a_while():
    time.sleep(180)  # 3分間スリープ


with DAG(
    dag_id="sleep_for_a_while",
    start_date=days_ago(1),
    schedule_interval=timedelta(minutes=2),  # DAGは2分ごとに実行
    catchup=False,
) as dag:

    # DAGの開始を示すタスク
    start_operator = DummyOperator(
        task_id="run_this_first"
    )

    # スリープするタスク
    sleep_operator = PythonOperator(
        task_id='sleep',
        python_callable=sleep_for_a_while
    )

    # DAGの終了を示すタスク
    end_operator = DummyOperator(
        task_id="run_this_last"
    )

    # 開始タスク -> スリープタスク -> 終了タスク
    start_operator >> sleep_operator >> end_operator

test-composer[DAG]リンクからDAGフォルダに移動します。

DAGフォルダにファイルをアップロードしてDAGをデプロイします。

DAG を実行

先ほどデプロイしたDAGを実際に動かしてみて、DAGが多重起動する場合の挙動を確認してみましょう。

DAGが多重起動する場合

デプロイ直後、最初のDAGが動き始めました。
スリープタスクが実行中の状況です。

最初のDAG実行から2分後、前のDAGがまだ完了していないにもかかわらず新たなDAGが実行を開始しました。
これが多重起動の状況です。

さらに2分後、新たなDAGが実行を開始しました。

DAGが多重起動しない場合

次に、DAGが多重起動しない場合の挙動を確認してみましょう。

先ほどのDAGにmax_active_runs=1を設定してデプロイします。
タスクの内容は先ほどのDAGと同じです。

sleep_for_a_while_max_active_runs.py

from datetime import timedelta
import time
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago


def sleep_for_a_while():
    time.sleep(180)  # 3分間スリープ


with DAG(
    dag_id="sleep_for_a_while_max_active_runs",
    start_date=days_ago(1),
    schedule_interval=timedelta(minutes=2),  # DAGは2分ごとに実行
    max_active_runs=1,
    catchup=False,
) as dag:

    # DAGの開始を示すタスク
    start_operator = DummyOperator(
        task_id="run_this_first"
    )

    # スリープするタスク
    sleep_operator = PythonOperator(
        task_id='sleep',
        python_callable=sleep_for_a_while
    )

    # DAGの終了を示すタスク
    end_operator = DummyOperator(
        task_id="run_this_last"
    )

    # 開始タスク -> スリープタスク -> 終了タスク
    start_operator >> sleep_operator >> end_operator

デプロイ直後、最初のDAGが動き始めました。
スリープタスクが実行中の状況です。先ほどのDAGと同じ挙動ですね。

最初のDAG実行から3分後、前のDAGが完了してから新たなDAGが実行を開始しました。
本来のスケジュールでは2分ごとの実行ですが、max_active_runs=1の設定により次のDAG実行が待機されていたようです。

2番目のDAGの実行が完了した後、3番目のDAGが実行を開始しました。
同時に2つ以上のDAGが実行されない制御されているようです。

DAGの設定にmax_active_runs=1を追加することで、多重起動を防止することができました!

max_active_runs_per_dag

max_active_runs_per_dagは、すべてのDAGに対する同時実行可能なDAGインスタンスの最大数を設定します。
Airflow の構成(airflow.cfg)で定義されています。確認してみましょう。

[Admin] - [Configurations] をクリックします。

max_active_runs_per_dagの値は '25' で定義されています。
先ほどのDAGで設定したmax_active_runsは、max_active_runs_per_dagの値を上書きします。
max_active_runsが未設定のDAGは、max_active_runs_per_dagの値('25')が設定されていることになります。

設定値の変更

max_active_runs_per_dagは、以下のコマンドで設定を行うことができます。

$ gcloud composer environments update test-composer \
    --location asia-northeast1 \
    --update-airflow-configs=core-max_active_runs_per_dag=1

以下のコマンドで設定を削除することができます。

$ gcloud composer environments update test-composer \
    --location asia-northeast1 \
    --remove-airflow-configs=core-max_active_runs_per_dag

まとめ

以上、max_active_runsパラメータを使用してDAGの多重起動を防止する方法をご紹介しました。

max_active_runsを使用することで、システムの過負荷を避けたり、データの整合性を保つといった課題に対処することができます。
一方で、max_active_runsの値が低く設定されていると、新しいDAG実行の開始までの待ち時間が不必要に長くなり、効率性が損なわれる可能性があります。

max_active_runsの値は、システムのリソース、タスクの実行時間、DAGのスケジューリング間隔等を考慮して適切に設定する必要がありますので、設定の意味や挙動を理解してパラメータチューニングを行うことが重要ですね。
今回の検証が誰かのお役に立てれば幸いです!

参考