Apache Airflowで任意のタイムゾーンのds形式日時を取得する方法

2022.10.04

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、八木です。

Airflowには、Jinjaテンプレート変数の1つとして {{ ds }} があり、 YYYY-MM-DD 形式の日付を取得することができます。
この変数を利用して、SQLオペレータなどで日付によるフィルタリングを行うことができます。

SELECT *
FROM users
WHERE registered_at >= {{ ds }}

しかし {{ ds }} で取得する値はUTC時間のため、日本時間の午前6時にDAGを実行した場合、 {{ ds }} は前日の値になってしまいます。
DBに保存しているレコードの値が日本時間の場合、 {{ ds }} では正しい比較ができません。

では、タイムゾーンを指定して日付を取得するにはどうしたらいいでしょうか?

TL;DR

{{ ds }} の代わりに {{ execution_date.in_tz('Asia/Tokyo') | ds }} を使います。

{{ ds }}ではローカルタイムの日付を取得できない

Airflowの設定を変更することで、{{ ds }} などの変数をローカルタイムにできれば楽なのですが、現状はできません。
Airflowでは、タイムゾーンを跨いでチームが存在する場合や、サマータイムによる煩雑性、そこから生まれるバグを考慮し、UTC時刻で取り扱う思想のようです。Web UIの表示タイムゾーンを設定することはできますが、システム内のデータは全てUTCで保存しています。

テンプレート内での日時取扱いに関しても、「Airflowは日時をローカルタイムに変換しない」とドキュメントに書かれています。

Airflow returns time zone aware datetimes in templates, but does not convert them to local time so they remain in UTC. It is left up to the DAG to handle this.
(Airflowはテンプレートではタイムゾーンawareな日付時刻を返しますが、ローカルタイムに変換しないため、UTCのままです。日時の処理はDAGに任されています。)
Time Zones - Templates

実際に動かしてみましたが、タイムゾーンawareなオブジェクトでも、取得できる {{ ds }}, {{ ts }} の値はUTCでした。

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
import pendulum

with DAG(
    dag_id='check_timezone',
    schedule_interval=None,
    start_date=pendulum.datetime(2022, 1, 1, tz="Asia/Tokyo"),
    catchup=False
) as dag:
    task = BashOperator(
        task_id='check_time',
        dag=dag,
        bash_command="\
echo \"Current Time = $(date)\n\
DAG Timezone = {{ dag.timezone }}\n\
ds = {{ ds }}\n\
ts = {{ ts }}\""
    )
[2022-10-03T18:00:44.806+0000] {subprocess.py:75} INFO - Running command: ['/bin/bash', '-c', 'echo "Current Time = $(date)\nDAG Timezone = Timezone(\'Asia/Tokyo\')\nds = 2022-10-03\nts = 2022-10-03T18:00:43.217129+00:00"']
[2022-10-03T18:00:44.818+0000] {subprocess.py:86} INFO - Output:
[2022-10-03T18:00:44.826+0000] {subprocess.py:93} INFO - Current Time = Mon Oct  3 18:00:44 UTC 2022
[2022-10-03T18:00:44.827+0000] {subprocess.py:93} INFO - DAG Timezone = Timezone('Asia/Tokyo')
[2022-10-03T18:00:44.828+0000] {subprocess.py:93} INFO - ds = 2022-10-03
[2022-10-03T18:00:44.829+0000] {subprocess.py:93} INFO - ts = 2022-10-03T18:00:43.217129+00:00
[2022-10-03T18:00:44.830+0000] {subprocess.py:97} INFO - Command exited with return code 0

ちなみにタイムゾーンawareとは、オブジェクトがタイムゾーンの情報を持っている状態です。
以下のように、 start_date でタイムゾーンを指定してDAGを初期化することで、タイムゾーンawareなオプジェクトを作成することができます。

import pendulum

dag = DAG("my_tz_dag", start_date=pendulum.datetime(2016, 1, 1, tz="Asia/Tokyo"))

{{ execution_date }} を使う

設定で取得する値を変えられないのであれば、値の変換を考えます。
{{ ds }}からうまく変換できればいいのですが、時間情報を含まないので難しいです。

そこで利用するのが {{ execution_date }}です。

実は {{ ds }} = {{ dag_run.logical_date | ds }} であり、 dag_run.logical_dateexecution_date と同じ値を持ちます。1(ただし型が異なる。)
つまり dsexecution_date に対して、 ds フィルターをかけた値です。
また、 execution_date オブジェクトは pendulum.Pendulum 型のため、 in_tzメソッド または in_timezoneメソッド を使うことにより、任意のタイムゾーンへ変換が可能です。

よって、 execution_datein_tzメソッド でローカルタイムへ変換し、 ds フィルターをかけることで、 ds 形式のローカルタイムを取得することができます。

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
import pendulum

with DAG(
    dag_id='get_jst_date',
    schedule_interval=None,
    start_date=pendulum.datetime(2022, 1, 1, tz="Asia/Tokyo"),
    catchup=False
) as dag:
    task = BashOperator(
        task_id='get_jst_date',
        dag=dag,
        bash_command="\
echo \"Current Time = $(date)\n\
DAG Timezone = {{ dag.timezone }}\n\
ds = {{ ds }}\n\
ts = {{ ts }}\n\
execution_date.in_tz('Asia/Tokyo') = {{ execution_date.in_tz('Asia/Tokyo') }}\n\
execution_date.in_tz('Asia/Tokyo') | ds = {{ execution_date.in_tz('Asia/Tokyo') | ds }}\""
    )
[2022-10-03T18:01:10.692+0000] {subprocess.py:75} INFO - Running command: ['/bin/bash', '-c', 'echo "Current Time = $(date)\nDAG Timezone = Timezone(\'Asia/Tokyo\')\nds = 2022-10-03\nts = 2022-10-03T18:01:09.393003+00:00\nexecution_date.in_tz(\'Asia/Tokyo\') = 2022-10-04T03:01:09.393003+09:00\nexecution_date.in_tz(\'Asia/Tokyo\') | ds = 2022-10-04"']
[2022-10-03T18:01:10.705+0000] {subprocess.py:86} INFO - Output:
[2022-10-03T18:01:10.710+0000] {subprocess.py:93} INFO - Current Time = Mon Oct  3 18:01:10 UTC 2022
[2022-10-03T18:01:10.712+0000] {subprocess.py:93} INFO - DAG Timezone = Timezone('Asia/Tokyo')
[2022-10-03T18:01:10.713+0000] {subprocess.py:93} INFO - ds = 2022-10-03
[2022-10-03T18:01:10.715+0000] {subprocess.py:93} INFO - ts = 2022-10-03T18:01:09.393003+00:00
[2022-10-03T18:01:10.716+0000] {subprocess.py:93} INFO - execution_date.in_tz('Asia/Tokyo') = 2022-10-04T03:01:09.393003+09:00
[2022-10-03T18:01:10.717+0000] {subprocess.py:93} INFO - execution_date.in_tz('Asia/Tokyo') | ds = 2022-10-04
[2022-10-03T18:01:10.718+0000] {subprocess.py:97} INFO - Command exited with return code 0

{{ ds }} はUTCの日付が取得されるのに対し、 {{ execution_date.in_tz('Asia/Tokyo') | ds }} は指定したタイムゾーンの日付を取得することができました。

余談ですが、 execution_date は手動実行の場合とスケジュール実行の場合で受け取る値が異なります。詳しくは以下の記事をご覧ください。
Airflowのstart_dateとexecution_date (logical date)を把握する|Dentsu Digital Tech Blog|note

最後に

今回はJinjaテンプレートを活用して、ローカルタイムの日付を取得する方法を紹介しました。

Jinjaテンプレートはシンプルに記述できる一方、機能は非常に強力です。
AirflowのJinjaテンプレートには、デフォルトでさまざまな変数、マクロ、フィルターが実装されています。また、テンプレートに存在しない場合には自身でマクロを作成することもできます。

Jinjaテンプレートを最大限活用していきましょう。

以上、八木でした!