Apache AirflowのTriggerDagRunOperatorをバージョン2対応する

2021.08.31

Apache AirflowのAWSマネージドサービスであるAmazon MWAAが、少し前にバージョン2対応しました
その際 TriggerDagRunOperator の仕様が変更されていて、動作の違いについて調べて対応したのでまとめておきます。

TriggerDagRunOperatorとは

バージョン1の場合

下記の例のように trigger_dag_id にDAGのIDを指定することで、そのDAGを起動することができます。
その際 python_callable に関数を指定することで、関数内で payload として作成した任意の値を dag_run.conf から参照することが可能です。
この値は、別のタスクで使用すること可能です。

1.10.15のドキュメントはこちら

def trigger_task_func(context, object):
    object.payload = {"message": "Hello world"}
    return object

trigger_task = TriggerDagRunOperator(
    task_id='trigger_task',
    trigger_dag_id="target_dag",
    python_callable=trigger_task_func,
)

バージョン2の場合

バージョン1で使用できた python_callable が使用できなくなっています。
その代わり、conf というパラメータが指定可能となり、直接任意の値を渡すことが可能です。

2.0.2のドキュメントはこちら

trigger_task = TriggerDagRunOperator(
    task_id='trigger_task',
    trigger_dag_id="target_dag",
    conf={"message": "Hello world"},
)

仕様の変更については、こちらのPull Requestに記されています。

It removes the python_callable argument and is thus not backwards compatible so should be merged in Airflow 2.0. Also (I think), people might have "abused" this weird DagRunOrder class to set their dagrun id. This PR removes that possibility.

↓翻訳

これは python_callable 引数を削除するもので、後方互換性がないため、Airflow 2.0 にマージされるべきです。また、(私が思うに)人々はこの奇妙な DagRunOrder クラスを "悪用して" 彼らの dagrun id を設定していたかもしれません。このPRはその可能性を取り除きます。

そのため、バージョン1で python_callable に関数を渡して何かしらの処理をしていた場合、PythonOperator などで処理を分けて事前に実行する必要があります。
Pull Request内でも例として紹介されています

python_callableに指定した関数内で作成した値を他のタスクで使用する

今回の移行対象のプロジェクトでは、python_callable に指定した関数内で動的に作成した値が、他のタスクで dag_run.conf を通して使用されていました。

first_dag

def initialize(context, object):
    hoge = "渡したい値1"
    fuga = "渡したい値2"
    
    object.payload = {
        "hoge": hoge,
        "fuga": fuga
    }
    return object
    
trigger_task = TriggerDagRunOperator(
    task_id='trigger_task',
    trigger_dag_id="second_dag",
    python_callable=initialize,
)

second_dag

another_task = PythonOperator(
    task_id='another_task',
    op_kwargs={
       'hoge': "{{ dag_run.conf['hoge'] }}",
       'fuga': "{{ dag_run.conf['fuga'] }}"
    }
)

バージョン2では python_callable は使えないため、別の方法で他のタスクに値を渡す必要がありました。

対応方法はいくつかあるかと思いますが、今回は xcom の機能を利用する方法を紹介します。

利用方法は下記の通りです。

first_dag

def push(**context):
    hoge = "渡したい値1"
    fuga = "渡したい値2"
    
    return {
        "hoge": hoge,
        "fuga": fuga
    }
    
initialize = PythonOperator(
    task_id='initialize',
    python_callable=push,
    provide_context=True,
)

trigger_task = TriggerDagRunOperator(
    task_id='trigger_task',
    trigger_dag_id="second_dag",
    conf={
        'hoge': "{{ ti.xcom_pull(task_ids='initialize')['hoge'] }}",
        'fuga': "{{ ti.xcom_pull(task_ids='initialize')['fuga'] }}"
    }
)

initialize >> trigger_task

second_dag

another_task = PythonOperator(
    task_id='another_task',
    op_kwargs={
       'hoge': "{{ dag_run.conf['hoge'] }}",
       'fuga': "{{ dag_run.conf['fuga'] }}"
    }
)

バージョン1の python_callable に渡した関数で作成するような動的な値を、PythonOperator で別のタスクに分けて作成します。
例では initialize タスクの push 関数で作成し、戻り値としています。
次の trigger_task タスクから xcom_pull を使って取得しています。

以上です!