Cloud Composer で DAG 解析時間アラートを設定してみた
こんにちは!エノカワです。
Cloud Composer は、Apache Airflow で構築されたフルマネージドのワークフローオーケストレーションサービスです。
Apache Airflow のオープンソース プロジェクトを基に構築されており、Python プログラミング言語を使用して動作します。
今回は、Cloud Composer で DAG 解析時間のアラートを設定する方法についてご紹介します。
DAG 解析時間とは
DAG 解析時間(DAG Parse Time)とは、Airflow が DAG ファイルを解析し、スケジューラに登録するまでにかかる時間を指します。
この時間が長くなると、DAG のスケジューリングが遅延し、ワークフローの実行に影響を及ぼす可能性があります。
最悪の場合、DAG がスケジュールされず、ワークフローが実行されないという事態も発生しかねません。
理想的な解析時間の指標
Google Cloud 公式ブログでは、DAG 解析時間が 10秒を超えるとスケジューラが過負荷になる可能性がある と記載されています。
DAG 解析時間が長くなると、Airflow のパフォーマンスに影響が出る可能性があるため、適切な監視と対策が重要です。
Cloud Composer で Airflow DAG 解析時間を短縮す | Google Cloud 公式ブログ
理想的な解析時間の指標とは
モニタリング ダッシュボードの DAG の統計情報 セクションで、合計 DAG 解析時間のグラフを確認します。数値が約 10 秒を超える場合、スケジューラが DAG 解析により過負荷となり、DAG を効果的に実行できない可能性があります。
Cloud Composer 環境の作成
まず、DAG を実行するための Cloud Composer 環境を作成します。
Google Cloud コンソールの「Cloud Composer の環境作成」ページから Composer 3 を選択します。
Composer のバージョンによって画面や設定項目が異なる場合があるため、本記事では Composer 3 を使用します。
環境名はtest-composer
、リージョンはasia-northeast1
(東京)を選択し、サービスアカウントはデフォルトのものを使用します。
(本番運用では、最小限の権限を持つユーザー管理のサービスアカウントを作成し、それを指定することを推奨。)
環境の作成が完了すると、Airflow UI から DAG を管理できるようになります。
DAG 解析時間のアラートポリシーを作成
Cloud Monitoring を使用して、DAG 解析時間のアラートポリシーを作成します。
Google Cloud 公式ブログでは、以下のように述べられています。
Cloud Composer で Airflow DAG 解析時間を短縮す | Google Cloud 公式ブログ
長い解析時間のアラートを受け取る方法
アラート ポリシーを作成して指標の値をモニタリングすると、条件に違反した場合に通知できます。これは Composer モニタリング ダッシュボードで行うことも可能です。
この公式ブログの推奨に従い、Cloud Monitoring を活用して DAG 解析時間を監視し、異常な状態が発生した場合にアラートを受け取れるように設定します。
DAG 解析時間の指標を選択
Cloud Composer のモニタリング画面で「すべての DAG ファイルの合計解析時間」指標を確認し、右上のベルアイコンをクリックしてアラート設定を行います。
通知ポリシーの作成
[Select a metric] で Total Parse Time
を選択し、 [Next] をクリックします。
Total Parse Time
は、Cloud Composer 環境全体の DAG 解析時間の合計を表すメトリクスです。
トリガーの設定
DAG 解析時間が 10 秒を超えた場合にアラートをトリガーするよう設定します。
このしきい値は、Google Cloud 公式ドキュメントの推奨値に基づいています。
- Condition types:
Threshold
- しきい値:10秒
アラートの通知と名前
通知を受け取るためのチャンネルを設定します。
今回は メール通知 を利用しました。
メール通知以外にも、Slack や PagerDuty などの様々な通知チャンネルを利用できます。
アラートの確認
設定を確認し、 [ポリシーを作成] をクリックします。
作成したポリシーが一覧画面に表示されます。
アラートポリシーが有効になっていることを確認してください。
これで、DAG 解析時間のアラートポリシーが完成しました。
アラートが正常に動作するかどうかをテストするために、次のセクションで DAG 解析時間を意図的に増大させる方法を説明します。
DAG 解析時間を意図的に増大させる
DAG 解析時間のしきい値超過をシミュレートするため、大量のタスクを含む DAG を作成します。
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
# 50個の DAG を作成し、それぞれに1000個のタスクを追加
for i in range(50):
dag_id = f"large_dag3_{i}"
with DAG(
dag_id=dag_id,
schedule_interval=timedelta(days=1),
start_date=datetime(2025, 1, 1),
catchup=False,
) as dag:
start = DummyOperator(task_id='start')
previous_task = start
# 1000個のタスクを追加
for j in range(1000):
task = DummyOperator(task_id=f"task_{j}")
previous_task >> task
previous_task = task
# DAG をグローバル変数として登録
globals()[dag_id] = dag
この DAG は、以下の処理を行います。
- 50 個の DAG を作成し、それぞれに 1000 個のタスクを含めることで、DAG 解析時間を意図的に増大させます。
DummyOperator
を使用し、タスクの実行負荷を最小限に抑えつつ、DAG の解析負荷を高めます。- DAG をグローバル変数として登録することで、Airflow が DAG を認識できるようにします。
DAG ファイルを Cloud Composer 環境の DAG フォルダにアップロードすると、Airflow が DAG を解析し、スケジューラに登録します。
このとき、DAG の解析に時間がかかり、設定したアラートのしきい値を超えるはずです。
DAG 解析時間アラートの確認
DAG をデプロイし、しばらくするとアラートメールが届きました。
メール本文中の [VIEW INCIDENT] をクリックすると、インシデント画面が表示されます。
インシデント画面では、アラートの詳細や発生時刻、影響を受けたリソースなどを確認できます。
DAG 解析時間がしきい値の 10秒 を超過したため、DAG 解析時間アラートがトリガーされていることが確認できました。
まとめ
以上、Cloud Composer で DAG 解析時間アラートを設定する方法をご紹介しました。
DAG 解析時間を監視し、アラートを設定することで、Cloud Composer 環境の安定稼働に繋げることができます。
DAG 解析時間が増大するケースとしては、以下のようなものが考えられます。
- 新しい DAG を追加した場合
- DAG のタスク数を大幅に増やした場合
- DAG 内で外部 API 呼び出しを頻繁に行うようにした場合
- Python のトップレベルコードで重い処理を実行するようにした場合
今回、DAG 解析時間を意図的に増大させるために、大量のタスクを含む DAG を作成する方法に加え、上記のケースも試してみました。
**しかし、DAG の構造やタスクの処理内容によって DAG 解析時間に与える影響は異なり、意図通りに解析時間を増大させることの難しさを実感しました。
**
また、DAG 解析時間が長くなりすぎると、以下のようなエラーが発生しました。
DAG の破損(dags/dag_parse_time_simulation.py):
Traceback (most recent call last):
File "/home/airflow/gcs/dags/dag_parse_time_simulation.py", line 12, in simulate_dag_parse_time
time.sleep(100) # 100秒固定のスリープ
^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/airflow/utils/timeout.py", line 69, in handle_timeout
raise AirflowTaskTimeout(self.error_message)
airflow.exceptions.AirflowTaskTimeout: DagBag import timeout for /home/airflow/gcs/dags/dag_parse_time_simulation.py after 300.0s.
Please take a look at these docs to improve your DAG import time:
* https://airflow.apache.org/docs/apache-airflow/2.10.2/best-practices.html#top-level-python-code
* https://airflow.apache.org/docs/apache-airflow/2.10.2/best-practices.html#reducing-dag-complexity, PID: 1370
上記のエラー例からもわかるように、DAG 解析時間の増大は Cloud Composer 環境に悪影響を及ぼす可能性があります。
そのため、DAG 解析時間アラートを設定し、事前に兆候を察知できるようにしておくことが重要です。
Cloud Composer 環境の安定稼働のために、ぜひ DAG 解析時間アラートを設定し、活用してみてください!
参考
- Cloud Composer で Airflow DAG 解析時間を短縮す | Google Cloud 公式ブログhttps://cloud.google.com/blog/ja/products/data-analytics/reduce-airflow-dag-parse-times-in-cloud-composer
- 環境のパフォーマンスと費用を最適化する | Cloud Composer | Google Cloud
- 一般的なアラート ポリシーの設定 | Cloud Monitoring | Google Cloud
- 指標しきい値のアラート ポリシーを作成する | Cloud Monitoring | Google Cloud