Cloud Composer で Airflow タスクの SLA を設定してみた

Cloud Composer で Airflow タスクの SLA を設定してみた

Clock Icon2025.02.06

こんにちは!エノカワです。

Cloud Composer は、Apache Airflow で構築されたフルマネージドのワークフローオーケストレーションサービスです。
Apache Airflow のオープンソース プロジェクトを基に構築されており、Python プログラミング言語を使用して動作します。

今回は、Cloud Composer で Airflow タスクに SLA(Service Level Agreement)を設定する方法を試してみました。

Airflow の SLA とは

SLA は、ワークフローの各タスクが所定の時間内に完了することを保証するための仕組みです。
Airflow では、各タスクに SLA を設定し、時間内に完了しなかった場合に通知を受け取ることができます。

Google Cloud 公式ブログの以下のドキュメントでも、SLA の活用について触れられています。

Airflow DAG の改良による Cloud Composer の最適化 | Google Cloud 公式ブログ

  1. タスクに SLA を追加します。
    a. SLA をタスクレベルに設定し、タスクの実行時間が想定より長い場合は通知を受け取るようにします。

SLA の目的

SLA を設定することで、以下のようなメリットがあります。

  • タスクの実行時間を監視し、期待通りの時間内に完了したかを確認できる
  • SLA 違反時にアラートを受け取ることで、問題を迅速に検知し対応できる
  • ビジネスプロセスやサービス品質の維持・向上につながる

環境作成

まず、DAG を実行するための Cloud Composer 環境を作成します。
Google Cloud コンソールの「Cloud Composer の環境作成」ページから Composer 2 を選択します。

cloud-composer-airflow-task-sla_01-1.png

環境名はtest-composer、リージョンはasia-northeast1(東京)を選択し、サービスアカウントはデフォルトのものを使用します。
(本番運用では、適切な権限を持つユーザーマネージドサービスアカウントを推奨)

cloud-composer-airflow-task-sla_02-1.png

環境の作成が完了すると、Airflow UI から DAG を管理できるようになります。

cloud-composer-airflow-task-sla_03.png

DAG を作成する

SLA 違反をシミュレートするための DAG を作成します。

以下のコードでは、sleep_20タスクに SLA を設定し、10 秒以内に完了しなかった場合にエラーログを出力するようにしています。

example_sla_dag.py
import datetime
import time
import pendulum
import logging
from airflow.decorators import dag, task


def sla_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
    logging.error(
        "SLA Miss detected!"
        f"dag={dag}, task_list={task_list}, "
        f"blocking_task_list={blocking_task_list}, slas={slas}, blocking_tis={blocking_tis}"
    )


@dag(
    schedule="*/5 * * * *",
    start_date=pendulum.datetime(2025, 1, 1, tz="UTC"),
    catchup=False,
    sla_miss_callback=sla_callback,
)
def example_sla_dag():
    @task(sla=datetime.timedelta(seconds=10))
    def sleep_20():
        """Sleep for 20 seconds"""
        time.sleep(20)

    @task
    def sleep_30():
        """Sleep for 30 seconds"""
        time.sleep(30)

    sleep_20() >> sleep_30()

example_dag = example_sla_dag()
  • @dagデコレータで DAG を定義し、5 分ごとに実行されるようにスケジュール
  • @taskデコレータでタスクを定義し、slaパラメータを指定(sleep_20に 10 秒の SLA)
  • sla_miss_callbackを設定し、SLA 違反時にエラーログを出力

DAG を実行する

DAG を実行し、SLA 設定が適用されることを確認します。

sleep_20タスクが 22 秒かかっています。
10 秒のSLAを設定しているので、SLA 違反が発生していると想定されます。

cloud-composer-airflow-task-sla_04.png

SLA 違反の確認

SLA 違反が発生すると、DAG で設定したsla_miss_callbackにより、Airflow のログにエラーメッセージが記録されます。

Airflow のログで確認する

ログを確認すると、以下のようなエラーメッセージが出力されていました。

[2025-02-05 09:05:25,608] {example_sla_dag.py:9} ERROR - SLA Miss detected!
  dag=<DAG: example_sla_dag>, 
  task_list=sleep_20 on 2025-02-05T09:05:00+00:00, 
  blocking_task_list=, 
  slas=[('example_sla_dag', 'sleep_20', '2025-02-05T09:05:00+00:00')], 
  blocking_tis=[]

このログは、DAG のsla_miss_callbackで出力されるカスタムメッセージです。
エラーログの詳細を把握することで、どのタスクが SLA 違反を起こしたのかを特定できます。

cloud-composer-airflow-task-sla_05.png

Airflow Web UI で確認する

SLA 違反の発生状況は、Airflow の Web UI からも確認できます。

[Browse] > [SLA Misses] をクリックします。

cloud-composer-airflow-task-sla_06.png

List Sla Miss のページが表示されます。
ここで、発生した SLA 違反の一覧を確認できます。

cloud-composer-airflow-task-sla_07.png

SLA 違反時の通知

今回は DAG 内でon_sla_missを使い、自前でログを出力する形で SLA 違反を検知しました。
Cloud Composer の Airflow では、以下のような方法でSLA 違反時の通知を設定できます。

  • メール通知: email_on_sla_missを設定し、SLA 違反時にメールを送信
  • Slack 通知: Airflow のSlackAPIPostOperatorSlackWebhookOperatorを使用してSlack通知を送る
  • Google Cloud Logging: ログを Cloud Logging に送信し、アラートポリシーを設定

注意点

SLA 設定を行う際には、以下の点に注意が必要です。

SLA 違反が発生してもタスクの実行は継続される

SLA 違反が発生しても、該当タスクはキャンセルされず、完了するまで実行されます。
そのため、一定時間を超えた場合にタスクを強制終了したい場合は、SLA ではなく timeout を使用する必要があります。

手動トリガーされた DAG 実行には SLA が適用されない

SLA のチェックは、スケジュールされた DAG 実行のexecution_dateを基準に行われます。
具体的には、DAG のスケジュールに従って決定された予定実行日時を基準に SLA が評価されるため、手動でトリガーされた DAG 実行では SLA チェックが行われません。

手動実行でも SLA を適用したい場合は、カスタムのモニタリングを実装するか、スケジュール実行を前提としたワークフロー設計が必要です。

各タスクごとに SLA を設定する必要がある

Airflow では、SLA を DAG 全体ではなく 各タスク単位 で設定する必要があります。
そのため、SLA を適用したいタスクごとに sla パラメータを指定する必要があります。

以前はdefault_slaという設定オプションがあり、DAG 全体にデフォルトの SLA を適用することが可能のようでしたが、現在はこの機能は提供されていませんでした。
Configuration Reference に当該プションなし)

Airflow SLA Setup. Airflow allows you to set Service Level… | by Prem Vishnoi(cloudvala) | Medium

  1. Global SLA Configuration:
  • Set a default SLA for all tasks in the [core] section of your airflow.cfg file:
[core]
default_sla = timedelta(hours=1)

なお、Airflow の設定オプションcheck_slasを無効にすることで、SLA チェック自体を無効化することは可能です。

[core]
check_slas = False

まとめ

今回は、Cloud Composer 上で Airflow タスクに SLA を設定する方法を試してみました。

SLA を活用することで、所定時間以上タスクが実行し続けていた場合にアラートを受け取り、問題をいち早く検知できることを実感しました。

ただし、以下の点には注意が必要です。

  • SLA 違反時もタスクそのものは強制終了されない
  • 手動実行では SLA が適用されない仕様
  • 監視したいタスクごとに SLA を設定する必要がある

SLA 自体はタスクのトラッキングと通知が主眼であるため、タスクを途中停止するには timeout など別の機能を組み合わせる必要があります。

実際のワークフローにも簡単に導入できるため、「長時間実行しがちなタスク」への対処として、ぜひ SLA の設定を検討してみてください。

本記事が、ワークフロー管理の効率化に少しでも役立てば幸いです。

参考

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.