[Airflow]provide_contextとtask_instanceに関する備忘録

Apache AirflowでPythonOperator利用時にtask_instanceのkey指定を正常に行うためにはどうすべきか、只管追ってみたことを書き出してみました。
2020.10.20

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

便利なフレームワークやツールは色々な利用例が溢れていますが、たまにあるのが「そのパラメータはどこからきたのか」というやつです。普通に突然現れるものの解説はなく、公式ドキュメントを検索しても記述が見当たらない。しかし平然とそれが使い回されている各種サンプル。

AirflowでPythonOperatorを使っていたところ、それに相当するcontext['task_instance']に遭遇しました。ネタがわかれば「なるほど」となりますが、そこに到れるまで少しかかったこともあり、説明用備忘録として書いておくことにしました。

task_instanceの使い方

一例としてjob間でパラメータを引き渡しあう際に用いるxcom_pull()xcom_push()の実行時があります。

def function(**kwargs):
    context['task_instance'].xcom_push(key='value', value='abc')

def receiver(**kwargs):
    context['task_instance'].xcom_pull(task_id='call', key='value')

幾つか端折っていますが、xcom_push()したデータをxcom_pullで取り寄せる形です。

task_instanceをcontextで有効にする

では単純にcontextへkey指定にて呼び出せばよいかというと、KeyError: 'task_instance'が発生する未来が待ち受けています。

関数を呼び出すPythonOperator上でprovide_contextを有効にしておきましょう。

func = PythonOperator(
    task_id='call',
    python_callable=function,
    provide_context=True)
receiver = PythonOperator(
    task_id='rece',
    python_callable=receiver,
    provide_context=True)

func >> receiver

provide_context自体はドキュメントに記述があります。

When you set the provide_context argument to True, Airflow passes in an additional set of keyword arguments: one for each of the Jinja template variables and a templates_dict argument.

task_instanceがどのように渡ってくるのかを理解しておく

provide_contextがその役目を果たしてくれることが分かっていても、task_instanceを取得したいという目的で調べている間はなかなか辿り着けないわけです。task_instanceの役割について説明しているドキュメントはあるのですが、該当ページ内にprovide_contextの説明が見当たらないため、認知の紐付けが辛いかもしれません。

以下、provide_contextがTrueである時の動作です。

    def execute(self, context):
        # Export context to make it available for callables to use.
        airflow_context_vars = context_to_airflow_vars(context, in_env_var_format=True)
        self.log.debug("Exporting the following env vars:\n%s",
                       '\n'.join(["{}={}".format(k, v)
                                  for k, v in airflow_context_vars.items()]))
        os.environ.update(airflow_context_vars)


        if self.provide_context:
            context.update(self.op_kwargs)
            context['templates_dict'] = self.templates_dict
            self.op_kwargs = context

環境変数となって溶けて消えているようにみえますが、Jinja2を通して環境変数を用いてop_kwargsから渡されています。おかげで物凄く実装を追いにくいと思えるのも事実です。

Airflow Variables can also be created and managed using Environment Variables. The environment variable naming convention is AIRFLOW_VAR_, all uppercase. So if your variable key is FOO then the variable name should be AIRFLOW_VAR_FOO.

そして、provide_contextが使われているサンプルを見て、いずれも引数が全て**kwargsとなっていることに気がつくかもしれませんが、これについてはソースコード内にそういった仕様であることがコメントされています。

        :param provide_context: if set to true, Airflow will pass a set of
        keyword arguments that can be used in your function. This set of
        kwargs correspond exactly to what you can use in your jinja
        templates. For this to work, you need to define `**kwargs` in your
        function header.

また、githubリポジトリを検索すると以下の記述に巡り合うかもしれませんが、これは2.0.0aの仕様であり、記事執筆時点でのリリースバージョンは1.10.13です。お間違いのないように。

provide_context argument on the PythonOperator was removed. The signature of the callable passed to the PythonOperator is now inferred and argument values are always automatically provided. There is no need to explicitly provide or not provide the context anymore.

あとがき

分かってしまえば何の問題もないのですが、気がつくまでには相当な時間を要しました。なお、きっかけになった記事は以下のstackoverflow質疑ツリーです。

現時点で動作についてソースコードを追って確認したい場合は、masterブランチではなくv1-10-stableを見るようにしてください。provide_contextの扱いが全く異なっています。

同じような悩みで躓いている方の参考になれば幸いです。