[Airflow] オペレーターのテンプレートで独自のカスタムフィルターを使用する

こんにちは。サービスグループの武田です。オペレーターの引数に指定するテンプレート内で独自のカスタムフィルターを使う手順を確認してみました。
2020.09.30

こんにちは。サービスグループの武田です。

Apache AirflowはDAG(有向非巡回グラフ)でワークフローを表現し、ジョブのスケジューリングや監視などを行ってくれるツールです。Airflowのオペレーター定義では、引数としてテンプレートを使用できるものがあります(定義時ではなく実行時に評価される)。テンプレート内ではJinja2のフィルターが使用できるのですが、時には独自のカスタムフィルターが欲しい場面も出てきます。今回は簡単なフィルターを定義して実際に検証してみました。

今回のゴール

今回はto_jsonフィルターを作ってみます。シンプルに値を受け取ったらJSON文字列に変換するフィルターです。シチュエーションとしては、DAG実行時のconfをログに出力するオペレーターを定義したいとします。

フィルターを使わない場合

比較をするため、まずはフィルターを定義せずにシンプルに実装してみます。BashOperatorを使ってconfを出力しています。

test_custom_filter.py

import airflow
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator

args = {
    "owner": "airflow",
    "start_date": airflow.utils.dates.days_ago(2),
    "provide_context": True,
}


with DAG(dag_id="test_custom_filter", default_args=args, schedule_interval=None) as dag:
    BashOperator(task_id="output_conf", bash_command='echo "{{ dag_run.conf }}"')

DAG実行時にconfを指定します。

出力結果は次のようになります。

さてどうでしょう。これを見て何か気付いたことはあるでしょうか?……はい、実は出力の形式がJSONではないです。dag_run.confはdict型となっていて、それを出力する際に 辞書の文字列表現 に変換されています。JSON文字列の方が都合のいいケースもきっとあるでしょう。というわけでやっていきます。

JSONフィルターを追加してみる

オペレーターのテンプレートで独自のフィルターを定義する手順は2つです。

  1. 処理を定義する
  2. DAGに登録する

関数は普通にPythonの関数として定義してもいいですし、Lambda式として宣言してもOKです。フィルターの登録は、DAG宣言時にuser_defined_filtersというパラメーターで、名前と処理をセットにして登録します。

test_custom_filter.py

import json

import airflow
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator

args = {
    "owner": "airflow",
    "start_date": airflow.utils.dates.days_ago(2),
    "provide_context": True,
}


def to_json(a):
    return json.dumps(a, ensure_ascii=False)


with DAG(
    dag_id="test_custom_filter",
    default_args=args,
    schedule_interval=None,
    user_defined_filters={"to_json": to_json},
) as dag:
    BashOperator(
        task_id="output_conf", bash_command="echo '{{ dag_run.conf | to_json }}'"
    )

22行目でto_jsonという名前でフィルターを登録しています。そして25行目のテンプレートで実際にそれを使っています。

先ほどと同じconfでこのDAGを実行してタスクのログを確認してみます。

ばっちりJSON文字列で出力されていますね!

まとめ

タスクをPythonOperatorで定義する場合などは普通にタスクの中でconfを取得すればいいのですが、独自のオペレーターを作る場合などはそうもいきません。うまく組み合わせてDRYな設計にしていきたいですね。