[Airflow]provide_contextとtask_instanceに関する備忘録
はじめに
便利なフレームワークやツールは色々な利用例が溢れていますが、たまにあるのが「そのパラメータはどこからきたのか」というやつです。普通に突然現れるものの解説はなく、公式ドキュメントを検索しても記述が見当たらない。しかし平然とそれが使い回されている各種サンプル。
AirflowでPythonOperatorを使っていたところ、それに相当するcontext['task_instance']
に遭遇しました。ネタがわかれば「なるほど」となりますが、そこに到れるまで少しかかったこともあり、説明用備忘録として書いておくことにしました。
task_instanceの使い方
一例としてjob間でパラメータを引き渡しあう際に用いるxcom_pull()
とxcom_push()
の実行時があります。
def function(**kwargs): context['task_instance'].xcom_push(key='value', value='abc') def receiver(**kwargs): context['task_instance'].xcom_pull(task_id='call', key='value')
幾つか端折っていますが、xcom_push()
したデータをxcom_pull
で取り寄せる形です。
task_instanceをcontextで有効にする
では単純にcontextへkey指定にて呼び出せばよいかというと、KeyError: 'task_instance'
が発生する未来が待ち受けています。
関数を呼び出すPythonOperator
上でprovide_context
を有効にしておきましょう。
func = PythonOperator( task_id='call', python_callable=function, provide_context=True) receiver = PythonOperator( task_id='rece', python_callable=receiver, provide_context=True) func >> receiver
provide_context
自体はドキュメントに記述があります。
When you set the provide_context argument to True, Airflow passes in an additional set of keyword arguments: one for each of the Jinja template variables and a templates_dict argument.
task_instanceがどのように渡ってくるのかを理解しておく
provide_context
がその役目を果たしてくれることが分かっていても、task_instance
を取得したいという目的で調べている間はなかなか辿り着けないわけです。task_instance
の役割について説明しているドキュメントはあるのですが、該当ページ内にprovide_context
の説明が見当たらないため、認知の紐付けが辛いかもしれません。
以下、provide_context
がTrueである時の動作です。
def execute(self, context): # Export context to make it available for callables to use. airflow_context_vars = context_to_airflow_vars(context, in_env_var_format=True) self.log.debug("Exporting the following env vars:\n%s", '\n'.join(["{}={}".format(k, v) for k, v in airflow_context_vars.items()])) os.environ.update(airflow_context_vars) if self.provide_context: context.update(self.op_kwargs) context['templates_dict'] = self.templates_dict self.op_kwargs = context
環境変数となって溶けて消えているようにみえますが、Jinja2を通して環境変数を用いてop_kwargs
から渡されています。おかげで物凄く実装を追いにくいと思えるのも事実です。
Airflow Variables can also be created and managed using Environment Variables. The environment variable naming convention is AIRFLOW_VAR_
, all uppercase. So if your variable key is FOO then the variable name should be AIRFLOW_VAR_FOO.
そして、provide_context
が使われているサンプルを見て、いずれも引数が全て**kwargs
となっていることに気がつくかもしれませんが、これについてはソースコード内にそういった仕様であることがコメントされています。
:param provide_context: if set to true, Airflow will pass a set of keyword arguments that can be used in your function. This set of kwargs correspond exactly to what you can use in your jinja templates. For this to work, you need to define `**kwargs` in your function header.
また、githubリポジトリを検索すると以下の記述に巡り合うかもしれませんが、これは2.0.0aの仕様であり、記事執筆時点でのリリースバージョンは1.10.13です。お間違いのないように。
provide_context argument on the PythonOperator was removed. The signature of the callable passed to the PythonOperator is now inferred and argument values are always automatically provided. There is no need to explicitly provide or not provide the context anymore.
あとがき
分かってしまえば何の問題もないのですが、気がつくまでには相当な時間を要しました。なお、きっかけになった記事は以下のstackoverflow質疑ツリーです。
現時点で動作についてソースコードを追って確認したい場合は、masterブランチではなくv1-10-stableを見るようにしてください。provide_context
の扱いが全く異なっています。
同じような悩みで躓いている方の参考になれば幸いです。