Cloud Composer で DAG 解析時間アラートを設定してみた

Cloud Composer で DAG 解析時間アラートを設定してみた

Clock Icon2025.02.28

こんにちは!エノカワです。

Cloud Composer は、Apache Airflow で構築されたフルマネージドのワークフローオーケストレーションサービスです。
Apache Airflow のオープンソース プロジェクトを基に構築されており、Python プログラミング言語を使用して動作します。

今回は、Cloud Composer で DAG 解析時間のアラートを設定する方法についてご紹介します。

DAG 解析時間とは

DAG 解析時間(DAG Parse Time)とは、Airflow が DAG ファイルを解析し、スケジューラに登録するまでにかかる時間を指します。
この時間が長くなると、DAG のスケジューリングが遅延し、ワークフローの実行に影響を及ぼす可能性があります。
最悪の場合、DAG がスケジュールされず、ワークフローが実行されないという事態も発生しかねません。

理想的な解析時間の指標

Google Cloud 公式ブログでは、DAG 解析時間が 10秒を超えるとスケジューラが過負荷になる可能性がある と記載されています。
DAG 解析時間が長くなると、Airflow のパフォーマンスに影響が出る可能性があるため、適切な監視と対策が重要です。

Cloud Composer で Airflow DAG 解析時間を短縮す | Google Cloud 公式ブログ

理想的な解析時間の指標とは
モニタリング ダッシュボードの DAG の統計情報 セクションで、合計 DAG 解析時間のグラフを確認します。数値が約 10 秒を超える場合、スケジューラが DAG 解析により過負荷となり、DAG を効果的に実行できない可能性があります。

Cloud Composer 環境の作成

まず、DAG を実行するための Cloud Composer 環境を作成します。
Google Cloud コンソールの「Cloud Composer の環境作成」ページから Composer 3 を選択します。
Composer のバージョンによって画面や設定項目が異なる場合があるため、本記事では Composer 3 を使用します。

cloud-composer-dag-parse-times-alerts_01.png

環境名はtest-composer、リージョンはasia-northeast1(東京)を選択し、サービスアカウントはデフォルトのものを使用します。
(本番運用では、最小限の権限を持つユーザー管理のサービスアカウントを作成し、それを指定することを推奨。)

cloud-composer-dag-parse-times-alerts_02.png

環境の作成が完了すると、Airflow UI から DAG を管理できるようになります。

cloud-composer-dag-parse-times-alerts_03.png

DAG 解析時間のアラートポリシーを作成

Cloud Monitoring を使用して、DAG 解析時間のアラートポリシーを作成します。

Google Cloud 公式ブログでは、以下のように述べられています。

Cloud Composer で Airflow DAG 解析時間を短縮す | Google Cloud 公式ブログ

長い解析時間のアラートを受け取る方法
アラート ポリシーを作成して指標の値をモニタリングすると、条件に違反した場合に通知できます。これは Composer モニタリング ダッシュボードで行うことも可能です。

この公式ブログの推奨に従い、Cloud Monitoring を活用して DAG 解析時間を監視し、異常な状態が発生した場合にアラートを受け取れるように設定します。

DAG 解析時間の指標を選択

Cloud Composer のモニタリング画面で「すべての DAG ファイルの合計解析時間」指標を確認し、右上のベルアイコンをクリックしてアラート設定を行います。

cloud-composer-dag-parse-times-alerts_04.png

通知ポリシーの作成

[Select a metric]Total Parse Time を選択し、 [Next] をクリックします。
Total Parse Time は、Cloud Composer 環境全体の DAG 解析時間の合計を表すメトリクスです。

cloud-composer-dag-parse-times-alerts_05.png

トリガーの設定

DAG 解析時間が 10 秒を超えた場合にアラートをトリガーするよう設定します。
このしきい値は、Google Cloud 公式ドキュメントの推奨値に基づいています。

  • Condition typesThreshold
  • しきい値:10秒

cloud-composer-dag-parse-times-alerts_06.png

アラートの通知と名前

通知を受け取るためのチャンネルを設定します。
今回は メール通知 を利用しました。
メール通知以外にも、Slack や PagerDuty などの様々な通知チャンネルを利用できます。

cloud-composer-dag-parse-times-alerts_07.png

アラートの確認

設定を確認し、 [ポリシーを作成] をクリックします。

cloud-composer-dag-parse-times-alerts_08.png

作成したポリシーが一覧画面に表示されます。
アラートポリシーが有効になっていることを確認してください。

cloud-composer-dag-parse-times-alerts_09.png

これで、DAG 解析時間のアラートポリシーが完成しました。
アラートが正常に動作するかどうかをテストするために、次のセクションで DAG 解析時間を意図的に増大させる方法を説明します。

DAG 解析時間を意図的に増大させる

DAG 解析時間のしきい値超過をシミュレートするため、大量のタスクを含む DAG を作成します。

dag_parse_time_simulation.py
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta

# 50個の DAG を作成し、それぞれに1000個のタスクを追加
for i in range(50):
    dag_id = f"large_dag3_{i}"

    with DAG(
        dag_id=dag_id,
        schedule_interval=timedelta(days=1),
        start_date=datetime(2025, 1, 1),
        catchup=False,
    ) as dag:

        start = DummyOperator(task_id='start')
        previous_task = start

        # 1000個のタスクを追加
        for j in range(1000):
            task = DummyOperator(task_id=f"task_{j}")
            previous_task >> task
            previous_task = task

    # DAG をグローバル変数として登録
    globals()[dag_id] = dag

この DAG は、以下の処理を行います。

  • 50 個の DAG を作成し、それぞれに 1000 個のタスクを含めることで、DAG 解析時間を意図的に増大させます。
  • DummyOperator を使用し、タスクの実行負荷を最小限に抑えつつ、DAG の解析負荷を高めます。
  • DAG をグローバル変数として登録することで、Airflow が DAG を認識できるようにします。

DAG ファイルを Cloud Composer 環境の DAG フォルダにアップロードすると、Airflow が DAG を解析し、スケジューラに登録します。
このとき、DAG の解析に時間がかかり、設定したアラートのしきい値を超えるはずです。

DAG 解析時間アラートの確認

DAG をデプロイし、しばらくするとアラートメールが届きました。

cloud-composer-dag-parse-times-alerts_11.png

メール本文中の [VIEW INCIDENT] をクリックすると、インシデント画面が表示されます。
インシデント画面では、アラートの詳細や発生時刻、影響を受けたリソースなどを確認できます。

cloud-composer-dag-parse-times-alerts_12.png

DAG 解析時間がしきい値の 10秒 を超過したため、DAG 解析時間アラートがトリガーされていることが確認できました。

まとめ

以上、Cloud Composer で DAG 解析時間アラートを設定する方法をご紹介しました。
DAG 解析時間を監視し、アラートを設定することで、Cloud Composer 環境の安定稼働に繋げることができます。

DAG 解析時間が増大するケースとしては、以下のようなものが考えられます。

  • 新しい DAG を追加した場合
  • DAG のタスク数を大幅に増やした場合
  • DAG 内で外部 API 呼び出しを頻繁に行うようにした場合
  • Python のトップレベルコードで重い処理を実行するようにした場合

今回、DAG 解析時間を意図的に増大させるために、大量のタスクを含む DAG を作成する方法に加え、上記のケースも試してみました。
**しかし、DAG の構造やタスクの処理内容によって DAG 解析時間に与える影響は異なり、意図通りに解析時間を増大させることの難しさを実感しました。
**
また、DAG 解析時間が長くなりすぎると、以下のようなエラーが発生しました。

cloud-composer-dag-parse-times-alerts_13.png

DAG の破損(dags/dag_parse_time_simulation.py):
Traceback (most recent call last):
  File "/home/airflow/gcs/dags/dag_parse_time_simulation.py", line 12, in simulate_dag_parse_time
    time.sleep(100)  # 100秒固定のスリープ
    ^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/utils/timeout.py", line 69, in handle_timeout
    raise AirflowTaskTimeout(self.error_message)
airflow.exceptions.AirflowTaskTimeout: DagBag import timeout for /home/airflow/gcs/dags/dag_parse_time_simulation.py after 300.0s.
Please take a look at these docs to improve your DAG import time:
* https://airflow.apache.org/docs/apache-airflow/2.10.2/best-practices.html#top-level-python-code
* https://airflow.apache.org/docs/apache-airflow/2.10.2/best-practices.html#reducing-dag-complexity, PID: 1370

上記のエラー例からもわかるように、DAG 解析時間の増大は Cloud Composer 環境に悪影響を及ぼす可能性があります。
そのため、DAG 解析時間アラートを設定し、事前に兆候を察知できるようにしておくことが重要です。

Cloud Composer 環境の安定稼働のために、ぜひ DAG 解析時間アラートを設定し、活用してみてください!

参考

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.