Cloud Composer で Airflow タスクの SLA を設定してみた
こんにちは!エノカワです。
Cloud Composer は、Apache Airflow で構築されたフルマネージドのワークフローオーケストレーションサービスです。
Apache Airflow のオープンソース プロジェクトを基に構築されており、Python プログラミング言語を使用して動作します。
今回は、Cloud Composer で Airflow タスクに SLA(Service Level Agreement)を設定する方法を試してみました。
Airflow の SLA とは
SLA は、ワークフローの各タスクが所定の時間内に完了することを保証するための仕組みです。
Airflow では、各タスクに SLA を設定し、時間内に完了しなかった場合に通知を受け取ることができます。
Google Cloud 公式ブログの以下のドキュメントでも、SLA の活用について触れられています。
Airflow DAG の改良による Cloud Composer の最適化 | Google Cloud 公式ブログ
- タスクに SLA を追加します。
a. SLA をタスクレベルに設定し、タスクの実行時間が想定より長い場合は通知を受け取るようにします。
SLA の目的
SLA を設定することで、以下のようなメリットがあります。
- タスクの実行時間を監視し、期待通りの時間内に完了したかを確認できる
- SLA 違反時にアラートを受け取ることで、問題を迅速に検知し対応できる
- ビジネスプロセスやサービス品質の維持・向上につながる
環境作成
まず、DAG を実行するための Cloud Composer 環境を作成します。
Google Cloud コンソールの「Cloud Composer の環境作成」ページから Composer 2 を選択します。
環境名はtest-composer
、リージョンはasia-northeast1
(東京)を選択し、サービスアカウントはデフォルトのものを使用します。
(本番運用では、適切な権限を持つユーザーマネージドサービスアカウントを推奨)
環境の作成が完了すると、Airflow UI から DAG を管理できるようになります。
DAG を作成する
SLA 違反をシミュレートするための DAG を作成します。
以下のコードでは、sleep_20
タスクに SLA を設定し、10 秒以内に完了しなかった場合にエラーログを出力するようにしています。
import datetime
import time
import pendulum
import logging
from airflow.decorators import dag, task
def sla_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
logging.error(
"SLA Miss detected!"
f"dag={dag}, task_list={task_list}, "
f"blocking_task_list={blocking_task_list}, slas={slas}, blocking_tis={blocking_tis}"
)
@dag(
schedule="*/5 * * * *",
start_date=pendulum.datetime(2025, 1, 1, tz="UTC"),
catchup=False,
sla_miss_callback=sla_callback,
)
def example_sla_dag():
@task(sla=datetime.timedelta(seconds=10))
def sleep_20():
"""Sleep for 20 seconds"""
time.sleep(20)
@task
def sleep_30():
"""Sleep for 30 seconds"""
time.sleep(30)
sleep_20() >> sleep_30()
example_dag = example_sla_dag()
@dag
デコレータで DAG を定義し、5 分ごとに実行されるようにスケジュール@task
デコレータでタスクを定義し、sla
パラメータを指定(sleep_20
に 10 秒の SLA)sla_miss_callback
を設定し、SLA 違反時にエラーログを出力
DAG を実行する
DAG を実行し、SLA 設定が適用されることを確認します。
sleep_20
タスクが 22 秒かかっています。
10 秒のSLAを設定しているので、SLA 違反が発生していると想定されます。
SLA 違反の確認
SLA 違反が発生すると、DAG で設定したsla_miss_callback
により、Airflow のログにエラーメッセージが記録されます。
Airflow のログで確認する
ログを確認すると、以下のようなエラーメッセージが出力されていました。
[2025-02-05 09:05:25,608] {example_sla_dag.py:9} ERROR - SLA Miss detected!
dag=<DAG: example_sla_dag>,
task_list=sleep_20 on 2025-02-05T09:05:00+00:00,
blocking_task_list=,
slas=[('example_sla_dag', 'sleep_20', '2025-02-05T09:05:00+00:00')],
blocking_tis=[]
このログは、DAG のsla_miss_callback
で出力されるカスタムメッセージです。
エラーログの詳細を把握することで、どのタスクが SLA 違反を起こしたのかを特定できます。
Airflow Web UI で確認する
SLA 違反の発生状況は、Airflow の Web UI からも確認できます。
[Browse] > [SLA Misses] をクリックします。
List Sla Miss のページが表示されます。
ここで、発生した SLA 違反の一覧を確認できます。
SLA 違反時の通知
今回は DAG 内でon_sla_miss
を使い、自前でログを出力する形で SLA 違反を検知しました。
Cloud Composer の Airflow では、以下のような方法でSLA 違反時の通知を設定できます。
- メール通知:
email_on_sla_miss
を設定し、SLA 違反時にメールを送信 - Slack 通知: Airflow の
SlackAPIPostOperator
やSlackWebhookOperator
を使用してSlack通知を送る - Google Cloud Logging: ログを Cloud Logging に送信し、アラートポリシーを設定
注意点
SLA 設定を行う際には、以下の点に注意が必要です。
SLA 違反が発生してもタスクの実行は継続される
SLA 違反が発生しても、該当タスクはキャンセルされず、完了するまで実行されます。
そのため、一定時間を超えた場合にタスクを強制終了したい場合は、SLA ではなく timeout を使用する必要があります。
手動トリガーされた DAG 実行には SLA が適用されない
SLA のチェックは、スケジュールされた DAG 実行のexecution_date
を基準に行われます。
具体的には、DAG のスケジュールに従って決定された予定実行日時を基準に SLA が評価されるため、手動でトリガーされた DAG 実行では SLA チェックが行われません。
手動実行でも SLA を適用したい場合は、カスタムのモニタリングを実装するか、スケジュール実行を前提としたワークフロー設計が必要です。
各タスクごとに SLA を設定する必要がある
Airflow では、SLA を DAG 全体ではなく 各タスク単位 で設定する必要があります。
そのため、SLA を適用したいタスクごとに sla パラメータを指定する必要があります。
以前はdefault_sla
という設定オプションがあり、DAG 全体にデフォルトの SLA を適用することが可能のようでしたが、現在はこの機能は提供されていませんでした。
(Configuration Reference に当該プションなし)
Airflow SLA Setup. Airflow allows you to set Service Level… | by Prem Vishnoi(cloudvala) | Medium
- Global SLA Configuration:
- Set a default SLA for all tasks in the
[core]
section of yourairflow.cfg
file:[core] default_sla = timedelta(hours=1)
なお、Airflow の設定オプションcheck_slas
を無効にすることで、SLA チェック自体を無効化することは可能です。
[core]
check_slas = False
まとめ
今回は、Cloud Composer 上で Airflow タスクに SLA を設定する方法を試してみました。
SLA を活用することで、所定時間以上タスクが実行し続けていた場合にアラートを受け取り、問題をいち早く検知できることを実感しました。
ただし、以下の点には注意が必要です。
- SLA 違反時もタスクそのものは強制終了されない
- 手動実行では SLA が適用されない仕様
- 監視したいタスクごとに SLA を設定する必要がある
SLA 自体はタスクのトラッキングと通知が主眼であるため、タスクを途中停止するには timeout など別の機能を組み合わせる必要があります。
実際のワークフローにも簡単に導入できるため、「長時間実行しがちなタスク」への対処として、ぜひ SLA の設定を検討してみてください。
本記事が、ワークフロー管理の効率化に少しでも役立てば幸いです。
参考
- Airflow DAG の改良による Cloud Composer の最適化 | Google Cloud 公式ブログ
- Airflow SLA Setup. Airflow allows you to set Service Level… | by Prem Vishnoi(cloudvala) | Medium
- Airflow の SLA設定方法 | フューチャー技術ブログ
- バッチジョブワークフロー実行基盤としての「Google Cloud Composer」実用性検証【後編】 | TECH | NRI Digital
- Cloud Composerにデータマート集計基盤を移行しました - ZOZO TECH BLOG
- Tasks — Airflow Documentation
- Callbacks — Airflow Documentation
- airflow.models.slamiss — Airflow Documentation
- Configuration Reference — Airflow Documentation