Cloud Composer で TriggerDagRunOperator を使ってDAGから別のDAGを呼び出す

2023.07.31

こんにちは!エノカワです。

Cloud Composer は、Apache Airflow で構築されたフルマネージドのワークフローオーケストレーションサービスです。
Apache Airflow のオープンソース プロジェクトを基に構築されており、Python プログラミング言語を使用して動作します。

やりたいこと

Apache Airflowを使用することで、ワークフロー内で一連のタスクを定義し、それらのタスクの実行順序を制御することができます。
これらのタスクは特定のDAG内に存在しますが、DAG間で呼び出しが可能です。

今回は、このDAG間の呼び出しに使用できるTriggerDagRunOperatorを使ってDAGから別のDAGを呼び出す方法を試してみましたのでご紹介します。

TriggerDagRunOperator

Apache Airflowでは、DAGから別のDAGを呼び出すための基本的なOperatorとしてTriggerDagRunOperatorを用います。
以下にその使用例を示します。

trigger = TriggerDagRunOperator(
    task_id="trigger_dagrun", # タスクID
    trigger_dag_id="child_dag", # 呼び出し先のDAG ID
)

この例では、TriggerDagRunOperatorを使用して、別のDAG(この例では "child_dag")を呼び出すタスクを作成します。
これにより、parent_dagが実行されるときにchild_dagも一緒に実行されるようになります。

以降、呼び出し元のDAGを親DAG、呼び出し先のDAGを子DAGと呼びます。

それでは、TriggerDagRunOperatorを使ったDAGを作成し、Cloud Composer で動かしてみましょう。

Cloud Composer 環境を作成

DAGを動かす Cloud Composer 環境を作成します。

Google Cloud コンソールで Cloud Composer の[環境の作成]ページに移動し、Cloud Composer 環境を作成します。

test-composerという名前で、東京リージョン、最新のイメージバージョンを選択し、サービスアカウントなど他はでデフォルトのままで作成しました。

子DAGにパラメータを渡す

親DAGから子DAGにパラメータを渡すためには、TriggerDagRunOperatorconf引数を使います。
conf引数には辞書型のオブジェクトを指定します。

ここでは、親DAGから子DAGに渡したパラメータを子DAG側でログ出力し、正しくパラメータが渡されていることを確認してみます。

親DAG

parent_dag.py

from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.utils.dates import days_ago


with DAG(
    dag_id="parent_dag",
    start_date=days_ago(1),
    schedule_interval="@daily"
) as dag:

    # TriggerDagRunOperatorを使って別のDAG(ここでは"child_dag")を呼び出す
    trigger = TriggerDagRunOperator(
        task_id="trigger_dagrun",
        trigger_dag_id="child_dag",
        conf={"message": "Hello from parent DAG!"},  # パラメータを渡す
    )

    trigger

子DAG

child_dag.py

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago


# パラメータをログに出力する関数
def print_message(**context):
    print(context["dag_run"].conf["message"])


with DAG(
    dag_id="child_dag",
    start_date=days_ago(1),
    schedule_interval=None
) as dag:

    # PythonOperator で print_message関数を実行
    print_task = PythonOperator(
        task_id="print_task",
        python_callable=print_message,
        provide_context=True,  # コンテキストを渡すように設定
    )

    print_task

上記DAGを Cloud Composer にデプロイ後、しばらくすると親DAG、子DAGともに成功しました。

子DAGのログを確認してみましょう。

親DAGから渡されたメッセージHello from parent DAG!が出力されています。
正しくパラメータが渡されていることが確認できました。

Airflow UI 上の見え方

DAGs 画面

親DAGから子DAGを呼び出す際、TriggerDagRunOperatorを使用すると、それぞれのDAGが独立してUI上に表示されます。
DAGs 画面では、親子DAGの関係が直接的には見えないため、これらが連携して動作することを意識する必要があります。

Task Details

親DAGのtrigger_dagrunタスク(TriggerDagRunOperator)の Task Details に [Triggered DAG] ボタン表示があります。

クリックすると子DAGの画面に遷移しました。親DAGから子DAGのトレースはできそうです。

一方、子DAG側には同様のボタンはないため、逆方向(子DAGから親DAG)のトレースはできないようです。

DAG Dependencies

Airflow UI 上部の [browse] - [DAG Dependencies] から、DAGの依存関係を示す図を確認することができます。

子DAGのスケジュール設定について

子DAGのスケジュール設定の必要性は、子DAGの使用方法に依存します。
親DAGから呼び出されることが前提の場合、子DAGのスケジュール設定は不要となります。
子DAGのschedule_intervalNoneに設定することで、スケジューリングから除外されます。

親DAG

parent_dag.py

with DAG(
    dag_id="parent_dag",
    start_date=days_ago(1),
    schedule_interval="@daily"
) as dag:

子DAG

child_dag.py

with DAG(
    dag_id="child_dag",
    start_date=days_ago(1),
    schedule_interval=None
) as dag:

ただし、子DAGが独立して実行されるケースがある場合は、子DAGのスケジュール設定を考慮する必要があります。

子DAGの完了を待つ

親DAGのtrigger_dagrunタスク(TriggerDagRunOperator)は、子DAGの呼び出し成功とともに直ちに完了します。

ここでは、1分間スリープするタスクを持つ子DAGの呼び出し例を考えてみましょう。

子DAGは1分間のスリープを終えてから完了しますが、

親DAGは子DAGの呼び出し後すぐに完了します。
全体の処理時間は約3秒であり、子DAGの完了を待たずに処理が完了していることが確認できます。

wait_for_completion

wait_for_completionパラメータを利用すると、親DAGが子DAGの完了を待つことが可能です。

以下のように、wait_for_completionパラメータにTrueを設定してみましょう。

親DAG

parent_wait_dag.py

from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.utils.dates import days_ago


with DAG(
    dag_id="parent_wait_dag",
    start_date=days_ago(1),
    schedule_interval="@daily"
) as dag:

    # TriggerDagRunOperatorを使って別のDAG(ここでは"child_dag")を呼び出す
    trigger = TriggerDagRunOperator(
        task_id="trigger_dagrun",
        trigger_dag_id="child_sleep_dag",
        wait_for_completion=True,
        retries=0,
    )

    trigger

子DAG

child_sleep_dag.py

import time

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago


def sleep_for_a_while():
    time.sleep(60)  # 1分間スリープ


with DAG(
    dag_id="child_sleep_dag",
    start_date=days_ago(1),
    schedule_interval=None
) as dag:

    # スリープするタスク
    sleep_operator = PythonOperator(
        task_id="sleep",
        python_callable=sleep_for_a_while
    )

    sleep_operator

これにより、親DAGは子DAGの呼び出し後も実行中の状態が続き、

子DAGが完了してから親DAGも完了しました。
全体の処理時間は約2分で、親DAGが子DAGの完了を待って完了していることが確認できます。

子DAGが失敗した場合

通常、TriggerDagRunOperatorを使用したタスクは、子DAGの実行結果に関係なく成功します。
wait_for_completionパラメータにTrueを設定して呼び出した子DAGが失敗した場合はどうなるでしょうか?

子DAGのsleepタスクが実行中に[Mark Failed]ボタンを押下し、故意に失敗させてみます。

すると、親DAGのtrigger_dagrunタスクも失敗しました。

wait_for_completionパラメータにTrueを設定すると、親DAGのtrigger_dagrunタスク(TriggerDagRunOperator)の成功・失敗が子DAGの結果に連動することが分かります。

まとめ

以上、TriggerDagRunOperatorを使ってDAGから別のDAGを呼び出す方法をご紹介しました。

TriggerDagRunOperatorを活用することで、複雑なワークフローを独立した部分(DAG)に分割して管理することが可能になります。
これにより、各DAGが独立して動作し、それぞれのDAG間の依存関係を柔軟に制御することができます。
親DAGから子DAGに対してパラメータを渡せるので、動的なワークフロー設定にも対応できそうですね。

一方で、TriggerDagRunOperatorを使用すると Airflow UI上で親DAGと子DAGの関連性が直接的に見なくなってしまうので、その点は注意が必要そうだと感じました。

今回の検証が誰かのお役に立てれば幸いです!

参考