Apache Airflowで任意のタイムゾーンのds形式日時を取得する方法
こんにちは、八木です。
Airflowには、Jinjaテンプレート変数の1つとして {{ ds }}
があり、 YYYY-MM-DD
形式の日付を取得することができます。
この変数を利用して、SQLオペレータなどで日付によるフィルタリングを行うことができます。
SELECT * FROM users WHERE registered_at >= {{ ds }}
しかし {{ ds }}
で取得する値はUTC時間のため、日本時間の午前6時にDAGを実行した場合、 {{ ds }}
は前日の値になってしまいます。
DBに保存しているレコードの値が日本時間の場合、 {{ ds }}
では正しい比較ができません。
では、タイムゾーンを指定して日付を取得するにはどうしたらいいでしょうか?
TL;DR
{{ ds }}
の代わりに {{ execution_date.in_tz('Asia/Tokyo') | ds }}
を使います。
{{ ds }}ではローカルタイムの日付を取得できない
Airflowの設定を変更することで、{{ ds }}
などの変数をローカルタイムにできれば楽なのですが、現状はできません。
Airflowでは、タイムゾーンを跨いでチームが存在する場合や、サマータイムによる煩雑性、そこから生まれるバグを考慮し、UTC時刻で取り扱う思想のようです。Web UIの表示タイムゾーンを設定することはできますが、システム内のデータは全てUTCで保存しています。
テンプレート内での日時取扱いに関しても、「Airflowは日時をローカルタイムに変換しない」とドキュメントに書かれています。
Airflow returns time zone aware datetimes in templates, but does not convert them to local time so they remain in UTC. It is left up to the DAG to handle this.
(Airflowはテンプレートではタイムゾーンawareな日付時刻を返しますが、ローカルタイムに変換しないため、UTCのままです。日時の処理はDAGに任されています。)
Time Zones - Templates
実際に動かしてみましたが、タイムゾーンawareなオブジェクトでも、取得できる {{ ds }}
, {{ ts }}
の値はUTCでした。
from airflow import DAG from airflow.operators.bash_operator import BashOperator import pendulum with DAG( dag_id='check_timezone', schedule_interval=None, start_date=pendulum.datetime(2022, 1, 1, tz="Asia/Tokyo"), catchup=False ) as dag: task = BashOperator( task_id='check_time', dag=dag, bash_command="\ echo \"Current Time = $(date)\n\ DAG Timezone = {{ dag.timezone }}\n\ ds = {{ ds }}\n\ ts = {{ ts }}\"" )
[2022-10-03T18:00:44.806+0000] {subprocess.py:75} INFO - Running command: ['/bin/bash', '-c', 'echo "Current Time = $(date)\nDAG Timezone = Timezone(\'Asia/Tokyo\')\nds = 2022-10-03\nts = 2022-10-03T18:00:43.217129+00:00"'] [2022-10-03T18:00:44.818+0000] {subprocess.py:86} INFO - Output: [2022-10-03T18:00:44.826+0000] {subprocess.py:93} INFO - Current Time = Mon Oct 3 18:00:44 UTC 2022 [2022-10-03T18:00:44.827+0000] {subprocess.py:93} INFO - DAG Timezone = Timezone('Asia/Tokyo') [2022-10-03T18:00:44.828+0000] {subprocess.py:93} INFO - ds = 2022-10-03 [2022-10-03T18:00:44.829+0000] {subprocess.py:93} INFO - ts = 2022-10-03T18:00:43.217129+00:00 [2022-10-03T18:00:44.830+0000] {subprocess.py:97} INFO - Command exited with return code 0
ちなみにタイムゾーンawareとは、オブジェクトがタイムゾーンの情報を持っている状態です。
以下のように、 start_date
でタイムゾーンを指定してDAGを初期化することで、タイムゾーンawareなオプジェクトを作成することができます。
import pendulum dag = DAG("my_tz_dag", start_date=pendulum.datetime(2016, 1, 1, tz="Asia/Tokyo"))
{{ execution_date }} を使う
設定で取得する値を変えられないのであれば、値の変換を考えます。
{{ ds }}
からうまく変換できればいいのですが、時間情報を含まないので難しいです。
そこで利用するのが {{ execution_date }}
です。
実は {{ ds }}
= {{ dag_run.logical_date | ds }}
であり、 dag_run.logical_date
は execution_date
と同じ値を持ちます。1(ただし型が異なる。)
つまり ds
は execution_date
に対して、 ds
フィルターをかけた値です。
また、 execution_date
オブジェクトは pendulum.Pendulum
型のため、 in_tzメソッド
または in_timezoneメソッド
を使うことにより、任意のタイムゾーンへ変換が可能です。
よって、 execution_date
を in_tzメソッド
でローカルタイムへ変換し、 ds
フィルターをかけることで、 ds
形式のローカルタイムを取得することができます。
from airflow import DAG from airflow.operators.bash_operator import BashOperator import pendulum with DAG( dag_id='get_jst_date', schedule_interval=None, start_date=pendulum.datetime(2022, 1, 1, tz="Asia/Tokyo"), catchup=False ) as dag: task = BashOperator( task_id='get_jst_date', dag=dag, bash_command="\ echo \"Current Time = $(date)\n\ DAG Timezone = {{ dag.timezone }}\n\ ds = {{ ds }}\n\ ts = {{ ts }}\n\ execution_date.in_tz('Asia/Tokyo') = {{ execution_date.in_tz('Asia/Tokyo') }}\n\ execution_date.in_tz('Asia/Tokyo') | ds = {{ execution_date.in_tz('Asia/Tokyo') | ds }}\"" )
[2022-10-03T18:01:10.692+0000] {subprocess.py:75} INFO - Running command: ['/bin/bash', '-c', 'echo "Current Time = $(date)\nDAG Timezone = Timezone(\'Asia/Tokyo\')\nds = 2022-10-03\nts = 2022-10-03T18:01:09.393003+00:00\nexecution_date.in_tz(\'Asia/Tokyo\') = 2022-10-04T03:01:09.393003+09:00\nexecution_date.in_tz(\'Asia/Tokyo\') | ds = 2022-10-04"'] [2022-10-03T18:01:10.705+0000] {subprocess.py:86} INFO - Output: [2022-10-03T18:01:10.710+0000] {subprocess.py:93} INFO - Current Time = Mon Oct 3 18:01:10 UTC 2022 [2022-10-03T18:01:10.712+0000] {subprocess.py:93} INFO - DAG Timezone = Timezone('Asia/Tokyo') [2022-10-03T18:01:10.713+0000] {subprocess.py:93} INFO - ds = 2022-10-03 [2022-10-03T18:01:10.715+0000] {subprocess.py:93} INFO - ts = 2022-10-03T18:01:09.393003+00:00 [2022-10-03T18:01:10.716+0000] {subprocess.py:93} INFO - execution_date.in_tz('Asia/Tokyo') = 2022-10-04T03:01:09.393003+09:00 [2022-10-03T18:01:10.717+0000] {subprocess.py:93} INFO - execution_date.in_tz('Asia/Tokyo') | ds = 2022-10-04 [2022-10-03T18:01:10.718+0000] {subprocess.py:97} INFO - Command exited with return code 0
{{ ds }}
はUTCの日付が取得されるのに対し、 {{ execution_date.in_tz('Asia/Tokyo') | ds }}
は指定したタイムゾーンの日付を取得することができました。
余談ですが、 execution_date
は手動実行の場合とスケジュール実行の場合で受け取る値が異なります。詳しくは以下の記事をご覧ください。
Airflowのstart_dateとexecution_date (logical date)を把握する|Dentsu Digital Tech Blog|note
最後に
今回はJinjaテンプレートを活用して、ローカルタイムの日付を取得する方法を紹介しました。
Jinjaテンプレートはシンプルに記述できる一方、機能は非常に強力です。
AirflowのJinjaテンプレートには、デフォルトでさまざまな変数、マクロ、フィルターが実装されています。また、テンプレートに存在しない場合には自身でマクロを作成することもできます。
Jinjaテンプレートを最大限活用していきましょう。
以上、八木でした!