こんにちは。サービス開発室の武田です。
Apache Airflowを利用してパイプライン処理を実施しているとき、タスクが失敗した際の通知は重要です。Airflowでは簡単な設定をすることでメール通知を行えます。
たとえばSMTPサーバーを経由してメールを送信する場合、まずサーバーの設定をします。airflow.cfg
で指定するなら次のように指定します。
[smtp]
smtp_host = email-smtp.ap-northeast-1.amazonaws.com
smtp_user = XXXX
smtp_password = qwerty
smtp_port = 587
smtp_mail_from = airflow@example.com
または環境変数を使用するなら次のように指定します。
AIRFLOW__SMTP__SMTP_HOST = email-smtp.ap-northeast-1.amazonaws.com
AIRFLOW__SMTP__SMTP_USER = XXXX
AIRFLOW__SMTP__SMTP_PASSWORD = qwerty
AIRFLOW__SMTP__SMTP_PORT = 587
AIRFLOW__SMTP__SMTP_MAIL_FROM = airflow@example.com
動作確認として、次のようなDAGを作ってみます。
import airflow
from airflow.decorators import task, dag
args = {
"owner": "airflow",
"start_date": airflow.utils.dates.days_ago(2),
"provide_context": True,
"email": ["takeda.takashi@example.com"],
"email_on_failure": True,
"email_on_retry": True,
}
@task
def test_task():
raise Exception("Email notify test.")
@dag(default_args=args, schedule_interval=None, catchup=False)
def email_test():
test_task()
email_test()
このDAGを実行するとタスクは失敗し、通知を受け取れます。
件名。
Airflow alert: <TaskInstance: email_test.test_task manual__2024-04-08T00:00:00.00000+00:00 [failed]>
本文。
Try 1 out of 1
Exception:
Email notify test.
Log: Link
Host: e86e79358238
Mark success: Link
Airflowの管理UIへのリンクも入っており、最低限の要件は満たしていそうです。
メールをカスタマイズしてみる
さてデフォルトでは上記のようなメール通知を受け取れるわけですが、運用に合わせて変更したいこともあるでしょう。Airflowではメールの件名と本文をカスタマイズできます。まずデフォルトのテンプレートは次のようになっています。
件名。
Airflow alert: {{ti}}
本文。
Try {{try_number}} out of {{max_tries + 1}}<br>
Exception:<br>{{exception_html}}<br>
Log: <a href="{{ti.log_url}}">Link</a><br>
Host: {{ti.hostname}}<br>
Mark success: <a href="{{ti.mark_success_url}}">Link</a><br>
テンプレートからアクセスできる変数は次の5つです。
- ti
- exception
- exception_html
- try_number
- max_tries
exception
とexception_html
の違いは、\n
が<br>
に変換されている点です。またti
はtask_instanceですが、ti
経由でアクセスできる属性はドキュメントを参照してください。
今回はメール本文に、タスクの開始時刻、終了時刻、間隔を追加してみましょう。
次のようなファイルを用意します。ファイルを置く場所はAirflowがアクセスできれば任意です。
dags/content_template.txt
Try {{try_number}} out of {{max_tries + 1}}<br>
Exception:<br>{{exception_html}}<br>
Log: <a href="{{ti.log_url}}">Link</a><br>
Host: {{ti.hostname}}<br>
Mark success: <a href="{{ti.mark_success_url}}">Link</a><br>
<br>
Start date: {{ti.start_date}}<br>
End date: {{ti.end_date}}<br>
Duration: {{ti.duration}}<br>
続いて、そのファイルの場所を指定します。
設定ファイルの場合。
[email]
html_content_template = dags/content_template.txt
環境変数の場合。
AIRFLOW__EMAIL__HTML_CONTENT_TEMPLATE = dags/content_template.txt
指定できたら一度Airflowを再起動して、再度DAGを実行してみます。修正された内容にメールが受信できました。
Try 1 out of 1
Exception:
Email notify test.
Log: Link
Host: c4f993fd4535
Mark success: Link
Start date: 2024-04-08 00:00:57.938069+00:00
End date: 2024-04-08 00:00:59.162104+00:00
Duration: 1.224035
まとめ
Airflowがしくみとして持っているメール通知の内容を確認してみました。簡単にセットアップでき、またカスタマイズもできるため最初の選択肢としていいのではないでしょうか。
この方法の欠点としては、並行して実行されているタスクがエラーになると、それぞれについてメールが発報される点でしょうか。ひとつのDAGでエラー通知は1通のみ、などにしたい場合には別途エラー通知を作り込む必要があります。それはまたの機会ということで。