[Airflow] オペレーターのテンプレートで独自のカスタムフィルターを使用する
こんにちは。サービスグループの武田です。
Apache AirflowはDAG(有向非巡回グラフ)でワークフローを表現し、ジョブのスケジューリングや監視などを行ってくれるツールです。Airflowのオペレーター定義では、引数としてテンプレートを使用できるものがあります(定義時ではなく実行時に評価される)。テンプレート内ではJinja2のフィルターが使用できるのですが、時には独自のカスタムフィルターが欲しい場面も出てきます。今回は簡単なフィルターを定義して実際に検証してみました。
今回のゴール
今回はto_json
フィルターを作ってみます。シンプルに値を受け取ったらJSON文字列に変換するフィルターです。シチュエーションとしては、DAG実行時のconf
をログに出力するオペレーターを定義したいとします。
フィルターを使わない場合
比較をするため、まずはフィルターを定義せずにシンプルに実装してみます。BashOperatorを使ってconf
を出力しています。
import airflow from airflow.models import DAG from airflow.operators.bash_operator import BashOperator args = { "owner": "airflow", "start_date": airflow.utils.dates.days_ago(2), "provide_context": True, } with DAG(dag_id="test_custom_filter", default_args=args, schedule_interval=None) as dag: BashOperator(task_id="output_conf", bash_command='echo "{{ dag_run.conf }}"')
DAG実行時にconf
を指定します。
出力結果は次のようになります。
さてどうでしょう。これを見て何か気付いたことはあるでしょうか?……はい、実は出力の形式がJSONではないです。dag_run.conf
はdict型となっていて、それを出力する際に 辞書の文字列表現 に変換されています。JSON文字列の方が都合のいいケースもきっとあるでしょう。というわけでやっていきます。
JSONフィルターを追加してみる
オペレーターのテンプレートで独自のフィルターを定義する手順は2つです。
- 処理を定義する
- DAGに登録する
関数は普通にPythonの関数として定義してもいいですし、Lambda式として宣言してもOKです。フィルターの登録は、DAG宣言時にuser_defined_filters
というパラメーターで、名前と処理をセットにして登録します。
import json import airflow from airflow.models import DAG from airflow.operators.bash_operator import BashOperator args = { "owner": "airflow", "start_date": airflow.utils.dates.days_ago(2), "provide_context": True, } def to_json(a): return json.dumps(a, ensure_ascii=False) with DAG( dag_id="test_custom_filter", default_args=args, schedule_interval=None, user_defined_filters={"to_json": to_json}, ) as dag: BashOperator( task_id="output_conf", bash_command="echo '{{ dag_run.conf | to_json }}'" )
22行目でto_json
という名前でフィルターを登録しています。そして25行目のテンプレートで実際にそれを使っています。
先ほどと同じconf
でこのDAGを実行してタスクのログを確認してみます。
ばっちりJSON文字列で出力されていますね!
まとめ
タスクをPythonOperatorで定義する場合などは普通にタスクの中でconf
を取得すればいいのですが、独自のオペレーターを作る場合などはそうもいきません。うまく組み合わせてDRYな設計にしていきたいですね。