Airflowで動的なワークフローを構成する

Airflowで動的なワークフローを構成するために、SubDagOperatorとtrigger_dag APIを使ってみました。
2019.01.31

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

課題

ある一連の処理を何らかのリストに対して行いたいという場合があります。 ただしそのリストは固定ではなく毎回変わるものとします。図に書くとこんな感じです。

              |--> 処理1 for id1 --> 処理2 for id1 --> 処理3 for id1
              |--> 処理1 for id2 --> 処理2 for id2 --> 処理3 for id2
idリスト取得  ---|     :
              |     :
              |--> 処理1 for idN --> 処理2 for idN --> 処理3 for idN

このような処理フローをAirflowで構成するにはどうすれば良いのか?というのが本稿のテーマです。 以下、自分がやってみた方法を2つご紹介します。

方法1: SubDagOperatorの利用

一つ目の方法はSubDagOperatorを使うというものです。 各idを処理するDAGを返す関数を用意して、それをSubDagOperatorに渡します。以下のような感じで記述します。

def subdag(parent_dag_name, child_dag_name, args):
    """ 各idに対して実行する処理フローを記述したDAGを返す """
    sub_dag = DAG(dag_id="{}.{}".format(parent_dag_name, child_dag_name), default_args=args, schedule_interval="@once")
    for id in get_id_list():
        t1 = BashOperator(
            task_id='{}-task-1'.format(id),
            bash_command='echo task1: {}'.format(id),
            default_args=args,
            dag=sub_dag,
        )
        t2 = BashOperator(
            task_id='{}-task-2'.format(id),
            bash_command='echo task2: {}'.format(id),
            default_args=args,
            dag=sub_dag,
        )
        t1 >> t2
    return sub_dag

t2 = SubDagOperator(
    task_id='subdag',
    executor=CeleryExecutor(),  # デフォルトはSequentialExecutorで並列実行されない
    subdag=subdag(DAG_NAME, 'subdag', args),
    default_args=args,
    dag=dag,
)

コード全体はこちら

実行すると、以下のようなTreeViewになります。

subdag(SubDagOperatorのタスク)の右側の四角をクリックすると、以下のようなポップアップが表示されます。

ここで"Zoom into Sub DAG"をクリックすると、subdagの中身が表示されます。

なお、図からお分かりかもしれませんが、この方法はリストが大きくなると破綻します。 SubDAGの中身が大きくなりすぎてUIでは表示しきれなくなるためです。

案2: trigger_dag APIの利用

二つ目の方法は、trigger_dagというAPIを利用して他のDAGを動的に起動するというものです。 trigger_dagで他のDAGを起動する関数を用意し、これをPythonOperatorで実行します。

def trigger(**kwargs):
    dag_id = kwargs['dag_id']  # triggerするDAG idを引数から取得
    execution_date = kwargs['ti'].execution_date.isoformat()
    for id in get_id_list():
        run_id = 'trig__{}_{}'.format(id, execution_date)
        trigger_dag(dag_id=dag_id,
                    run_id=run_id,
                    conf=json.dumps({'id': id}),
                    execution_date=None,
                    replace_microseconds=False)

t2 = PythonOperator(
    task_id='trigger_account_dag',
    python_callable=trigger,
    op_kwargs={'dag_id': 'triggered_dag_sample'},
    default_args=args,
    dag=dag,
)

コード全体はこちら

実行すると、メインのダッシュボードでは2つのDAGが動いているのがわかります。

triggerされる側のDag Runsから、以下のように実行結果の一覧を見ることができます。この表はページングされるため、リストが大きくなっても大丈夫です。 ここでリストの中のどのIDに対する実行なのかをRun Idで区別できるように、trigger_dagのrun_idパラメータを適切に指定しておくと良さそうです。

ちなみに、trigger_dagを呼び出すOperatorとしてTriggerDagOperatorというものもあります。場面によってはそれを使う方が簡単かもしれません。

※なお、現時点ではtrigger_dagを含め各種APIはexperimentalです。利用にはご注意ください。

まとめ

Airflowで動的なリストに対するフローを構成してみました。 ここ2〜3週間Airflowを触っていますが、チュートリアルを超えて少し凝ったことをやろうとすると色々とハマるポイントが多くて苦労しています。 同じようにハマっているどなたかの助けになれば幸いです。

参考

同種のお悩み、解決案など

公式のサンプルDAG