Cloud Composer の MCP サーバーで Gemini CLI から DAG 失敗原因を調査してみた

Cloud Composer の MCP サーバーで Gemini CLI から DAG 失敗原因を調査してみた

2026.04.21

こんにちは!エノカワです。

Cloud Composer は、Apache Airflow をベースに構築されたフルマネージドのワークフローオーケストレーションサービスです。

2026年4月15日のリリースノートで、Cloud Composer の名称が Managed Service for Apache Airflow に変わったことがアナウンスされました。

To more strongly embrace the success and growing customer preference for OSS solutions, Cloud Composer is evolving to become Managed Service for Apache Airflow. This name change provides improved customer understanding of our portfolio while reinforcing our commitment to being the most open cloud ecosystem.

OSS ソリューションへのコミットメントをより明確にするためのリブランディングで、サービス自体の機能や仕様に破壊的な変更があるわけではありません。

実際、Google Cloud コンソールを開くと「Managed Service for Apache Airflow(旧 Cloud Composer)」という表記になっており、環境の世代も「マネージド Airflow Gen 3 / Gen 2」と表現されるようになっています。

本記事では便宜上、従来どおり「Cloud Composer」と呼んで進めます。

同じ 2026/4/15 のリリースで、もう一つ注目すべき新機能が登場しました。
Cloud Composer MCP サーバー です。

Cloud Composer remote Model Context Protocol (MCP) server is available in Preview.

You can use Cloud Composer remote Model Context Protocol (MCP) server to connect to Cloud Composer from AI applications such as Gemini CLI, ChatGPT, Claude, or in AI applications that you're developing. The Cloud Composer MCP server lets you manage Cloud Composer environments and get details about executed DAG runs and Airflow tasks.

Gemini CLI や Claude などの AI クライアントから Cloud Composer 環境を操作できるようになり、DAG 実行や Airflow タスクの詳細取得も自然言語で行えそうです。
今回は、この機能を実際に試してみましたので、その内容をご紹介します。

Cloud Composer MCP サーバーとは

Cloud Composer MCP サーバーは、Apache Airflow 環境を AI アプリケーション(Gemini CLI / Claude / ChatGPT)から操作できるリモート MCP サーバーです。

Model Context Protocol(MCP)という標準化されたプロトコルで通信するため、対応する AI クライアントから同じインターフェースで利用できます。

提供されているツール

公式ドキュメント(MCP reference)で定義されているツールは以下の通りです。

各ツールが読み取り専用かどうかは、MCP 仕様の mcp.tool.isReadOnly 属性で判別できます。
以下は公式ドキュメントの説明をもとに分類してまとめたものです。

環境管理系

ツール名 用途 読み書き
list_environments プロジェクト内の全環境をリスト表示 読み取り
get_environment 特定環境の詳細情報を取得 読み取り
create_environment 新規環境を作成(デフォルトは小規模プライベートIP環境) 書き込み
delete_environment 環境を削除(不可逆) 書き込み
manage_pypi_packages PyPI パッケージの追加・削除・更新 書き込み

DAG・タスク監視系

ツール名 用途 読み書き
get_dag_source_code DAG のソースコードを取得 読み取り
get_dag_run 特定の DAG 実行の詳細(状態・開始/終了時刻など)を取得 読み取り
find_last_failed_dag_runs 失敗した DAG 実行を検索(DAG 指定可) 読み取り
list_failed_task_instances DAG 実行内の失敗タスクをリスト表示 読み取り
get_task_instance タスク実行の詳細情報を取得 読み取り

その他

ツール名 用途 読み書き
get_operation 長時間実行オペレーションのステータス確認 読み取り

大半が読み取り専用ツールで、書き込み系は create_environmentdelete_environmentmanage_pypi_packages の3つだけです。

接続情報

項目
サーバー名 Managed Service for Apache Airflow MCP server
エンドポイント composer.{region}.rep.googleapis.com/mcp
転送方式 HTTP
認証 Google Cloud 認証情報・OAuth Client ID・エージェント ID のいずれか

環境作成

DAG を動かすための環境を作成します。

Google Cloud コンソールで「Cloud Composer」を開くと、Managed Service for Apache Airflow(旧 Cloud Composer) という画面が表示されます。
名称変更が UI にも反映されていますね。

cloud-composer-mcp_01.png

「環境の作成」→「マネージド Airflow Gen 3」(旧称:Composer 3)を選択します。

環境作成フォームで以下のように設定します。

項目
名前 test-composer
場所 asia-northeast1
イメージのバージョン composer-3-airflow-3.1.7-build.5(Airflow 3 最新ビルド)
サービスアカウント デフォルト(検証用)
復元力モード 標準的な復元力

cloud-composer-mcp_02.png

本番運用ではユーザーマネージドのサービスアカウントが推奨ですが、今回は検証用なのでデフォルトのものを使用します。

cloud-composer-mcp_03.png

環境作成には 20〜30 分ほどかかります。

事前準備

必要な IAM ロール

MCP サーバーを利用するユーザー(または Gemini CLI を実行するプリンシパル)に、以下のロールを付与します。

ロール 用途
roles/serviceusage.serviceUsageAdmin Service Usage 管理
roles/mcp.toolUser MCP ツール利用

OAuth クライアント ID の作成

ADC(Application Default Credentials)の認証に使う OAuth クライアント ID を作成します。

なお、OAuth クライアント ID を使った MCP サーバーの認証例については、以下の記事も参考になります。

https://dev.classmethod.jp/articles/google-cloud-mcp-server-with-claude/

Google Cloud コンソールの「APIとサービス」→「認証情報」→「認証情報を作成」→「OAuth クライアント ID」を選択します。

  • アプリケーションの種類:デスクトップアプリ
  • 名前composer-mcp-adc-client(任意)

cloud-composer-mcp_oauth_01.png

作成後に表示されるダイアログから JSON をダウンロード して、ローカルに保存します。
クライアントシークレットは後から取得できないので、このタイミングで必ず保存してください。

cloud-composer-mcp_oauth_02.png

Application Default Credentials の設定

作成した OAuth クライアント ID の JSON ファイルを指定して ADC を設定します。

gcloud auth application-default login \
  --client-id-file=$HOME/Downloads/client_secret_XXXX.apps.googleusercontent.com.json \
  --scopes=https://www.googleapis.com/auth/cloud-platform,https://www.googleapis.com/auth/cloudcomposer.readonly

ブラウザが開いて OAuth 認証フローが走ります。
Google アカウントを選択して「許可」をクリックすると、ローカルに認証情報が保存されます。

認証後、トークンが取得できるか確認します。

gcloud auth application-default print-access-token | head -c 50

Gemini CLI への MCP サーバー登録

Gemini CLI は extension 単位で MCP サーバーを登録します。
~/.gemini/extensions/ 配下に拡張機能用のディレクトリを作成し、gemini-extension.json を配置します。

mkdir -p ~/.gemini/extensions/cloud-composer
~/.gemini/extensions/cloud-composer/gemini-extension.json
{
  "name": "cloud-composer",
  "version": "1.0.0",
  "mcpServers": {
    "cloud-composer": {
      "httpUrl": "https://composer.asia-northeast1.rep.googleapis.com/mcp",
      "authProviderType": "google_credentials",
      "oauth": {
        "scopes": ["https://www.googleapis.com/auth/cloudcomposer.readonly"]
      },
      "timeout": 30000,
      "headers": {
        "x-goog-user-project": "PROJECT_ID"
      }
    }
  }
}

Gemini CLI の認証モードとモデル選定

Gemini CLI 起動時に認証モードとモデルが自動で選ばれますが、企業利用では Vertex AI 認証への切り替えを検討するとよいでしょう。
Vertex AI 認証ではプロンプトや回答が Google のモデル学習に使われないため、業務データを扱う場合に安心して利用できます。

Gemini CLI の認証方式ごとのプライバシー差異や切り替え方法については、以下の記事が参考になります。

https://dev.classmethod.jp/articles/opt-out-gemini-cli-privacy/

接続確認

Gemini CLI を起動します。

gemini

起動後のプロンプトで /mcp と入力すると、登録済みの MCP サーバーとツール一覧が表示されます。

cloud-composer-mcp_04.png

cloud-composer (from cloud-composer) - Ready (11 tools) と表示されていれば接続成功です。

ログ表示で見ると以下のようになります。

Configured MCP servers:

🟢 cloud-composer (from cloud-composer) - Ready (11 tools)
  Tools:
  - mcp_cloud-composer_create_environment
  - mcp_cloud-composer_delete_environment
  - mcp_cloud-composer_find_last_failed_dag_runs
  - mcp_cloud-composer_get_dag_run
  - mcp_cloud-composer_get_dag_source_code
  - mcp_cloud-composer_get_environment
  - mcp_cloud-composer_get_operation
  - mcp_cloud-composer_get_task_instance
  - mcp_cloud-composer_list_environments
  - mcp_cloud-composer_list_failed_task_instances
  - mcp_cloud-composer_manage_pypi_packages

実際に試してみる

① 環境情報を取得する

まずは接続確認を兼ねて、環境一覧を取得してみます。

質問:

Cloud Composer の環境一覧を教えて

✦ プロジェクトIDとロケーションを教えてください。

Gemini から「プロジェクト ID とロケーションを教えてください」と聞き返しがあったので、情報を伝えます。

質問:

プロジェクト ID は PROJECT_ID、ロケーションは asia-northeast1 です

Gemini CLI がツール list_environments を呼び出そうとし、実行許可を求めるダイアログが表示されます。

cloud-composer-mcp_05.png

許可すると、list_environments が実行され、test-composer 環境の詳細が JSON で返ってきました。

cloud-composer-mcp_06.png

ツールが返した JSON の主要部分を抜粋すると以下の通りです。

{
  "environments": [
    {
      "config": {
        "dagGcsPrefix": "gs://asia-northeast1-test-compos-XXXXXXXX-bucket/dags",
        "environmentSize": "ENVIRONMENT_SIZE_SMALL",
        "softwareConfig": {
          "imageVersion": "composer-3-airflow-3.1.7-build.5"
        }
      },
      "createTime": "2026-04-21T02:15:19.731202Z",
      "name": "projects/PROJECT_ID/locations/asia-northeast1/environments/test-composer",
      "state": "RUNNING"
    }
  ]
}

test-composer 環境名・ステータス(RUNNING)・Airflow バージョン(composer-3-airflow-3.1.7-build.5)などが一目で確認できますね。
最後に Gemini から「test-composer という環境があります。」というシンプルな要約も返ってきました。

② サンプル DAG を意図的に失敗させる

次に、失敗する DAG を用意して原因調査の検証を行います。

失敗する DAG を配置

存在しない BigQuery テーブルを参照する DAG を用意しました。
サンプルなので、nonexistent_dataset.nonexistent_table という存在しないテーブルを意図的に指定しています。

failing_sample_dag.py
from datetime import datetime
from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

with DAG(
    dag_id="failing_sample_dag",
    start_date=datetime(2026, 4, 1),
    schedule="@daily",
    catchup=False,
) as dag:
    query_nonexistent_table = BigQueryInsertJobOperator(
        task_id="query_nonexistent_table",
        configuration={
            "query": {
                "query": "SELECT * FROM `nonexistent_dataset.nonexistent_table`",
                "useLegacySql": False,
            }
        },
    )

DAG を Cloud Composer 環境の DAGs フォルダにアップロードして実行させます。
アップロード後、failing_sample_dag がリトライも含めて3回失敗するまで待ちます。

Airflow UI を確認すると、failing_sample_dag が失敗になっています。

cloud-composer-mcp_07.png

タスクの詳細画面では、BigQuery から 404 Not found エラーが返っているのが確認できます。

cloud-composer-mcp_07_detail.png

エラーログの要点(抜粋・マスク済み)は以下の通りです。

[2026-04-21 15:20:20] INFO - Executing: {'query': {'query': 'SELECT * FROM `nonexistent_dataset.nonexistent_table`', 'useLegacySql': False}}
[2026-04-21 15:20:21] ERROR - Task failed with exception
google.api_core.exceptions.NotFound: 404 Not found: Dataset PROJECT_ID:nonexistent_dataset was not found in location US; reason: notFound, message: Not found: Dataset PROJECT_ID:nonexistent_dataset was not found in location US

想定通り、タスクが失敗しました。

③ 失敗原因を調査する

Gemini CLI に自然言語で失敗原因を調査させてみます。

質問:

test-composer 環境で最近失敗した DAG を教えて。原因も調べて

Gemini CLI は以下の流れで、複数のツールを連鎖的に呼び出しながら調査を進めていきました。

  1. find_last_failed_dag_runs で失敗した DAG 実行を検索 → failing_sample_dag が見つかる
  2. list_failed_task_instances で失敗タスクを特定 → query_nonexistent_table が 3回リトライして失敗したことを把握
  3. get_dag_source_code で失敗 DAG のソースコードを取得
  4. 結果を統合して原因を推定・説明

最終的な画面はこのようになりました。
1回の質問でツールが連鎖的に呼び出され、最後に原因が自然言語で説明されています。

cloud-composer-mcp_08.png

各ツールの呼び出しと結果は以下のように出力されていました(抜粋・マスク済み)。

① 失敗した DAG 実行の検索

✓  find_last_failed_dag_runs (cloud-composer MCP Server)
   {"environment":"projects/PROJECT_ID/locations/asia-northeast1/environments/test-composer"}

{
  "dagRuns": [
    {
      "state": "FAILED",
      "executionDate": "2026-04-21T00:00:00Z",
      "dagRunId": "scheduled__2026-04-21T00:00:00+00:00",
      "dagId": "failing_sample_dag",
      "type": "SCHEDULED"
    }
  ]
}

✦ failing_sample_dagというDAGの実行が失敗しています。原因を調査します。

② 失敗タスクの特定

✓  list_failed_task_instances (cloud-composer MCP Server)

{
  "taskInstances": [
    {
      "id": "query_nonexistent_table",
      "taskType": "BigQueryInsertJobOperator",
      "state": "FAILED",
      "tryNumber": 3,
      "taskId": "query_nonexistent_table",
      "dagId": "failing_sample_dag"
    }
  ]
}

✦ query_nonexistent_tableというタスクが失敗しています。DAGのソースコードを取得して、タスクの内容を調査します。

③ DAG ソースコードの取得と原因推定

✓  get_dag_source_code (cloud-composer MCP Server)

{
  "code": "... SELECT * FROM `nonexistent_dataset.nonexistent_table` ..."
}

✦ 失敗したDAGは failing_sample_dag で、その中の query_nonexistent_table というタスクが失敗しています。

  原因は、存在しないテーブル nonexistent_dataset.nonexistent_table にクエリを実行しようとしているためです。

  DAGのソースコードでは、以下のようになっています。

   1 query_nonexistent_table = BigQueryInsertJobOperator(
   2     task_id="query_nonexistent_table",
   3     configuration={
   4         "query": {
   5             "query": "SELECT * FROM `nonexistent_dataset.nonexistent_table`",
   6             "useLegacySql": False,
   7         }
   8     },
   9 )

該当箇所のソースコードも引用して、どの行に問題があるかまで示してくれました。

ちなみに、list_failed_task_instances の結果には taskTypeBigQueryInsertJobOperator)や tryNumber(3回試行)などの情報が含まれていたため、Gemini は get_task_instance を呼ばずに get_dag_source_code に直接進みました。
必要最小限のツール呼び出しで原因を推定できるようです。

Airflow UI を開かずに、対話形式で DAG コードまで遡って原因を特定することができました。

④ 修正方法を質問する

会話の文脈が保持されるので、深掘りの質問もスムーズです。

質問:

どう修正すれば良い?

cloud-composer-mcp_09.png

Gemini は単なるコード修正案ではなく、Cloud Composer の DAG ファイル更新の運用手順まで含めて回答してくれました。

✦ この問題を修正するには、DAGファイルを編集して、クエリが有効なテーブルを指すようにする必要があります。

  DAGはCloud Storageバケットに保存されています。ファイルを編集するには、次の2つの方法があります。

  方法1:gcloudコマンドを使用する(推奨)
   1. DAGファイルをダウンロードします。
        gsutil cp gs://<YOUR_BUCKET_NAME>/dags/failing_sample_dag.py .
   2. ファイルを編集します。
   3. ファイルをアップロードします。
        gsutil cp failing_sample_dag.py gs://<YOUR_BUCKET_NAME>/dags/

  方法2:GCPコンソールを使用する
   1. GCPコンソールでCloud Storageブラウザに移動します。
   2. お使いの環境のGCSバケットに移動します。
   3. dags フォルダに移動し、failing_sample_dag.py ファイルを見つけます。
   4. ファイルをクリックし、インライン編集機能を使用して、クエリを修正します。

  もしGCSバケット名がわからない場合は、gcloud composer environments describe test-composer --location asia-northeast1 コマンドを実行するか、GCPコンソールで環境の詳細を確認してください。
  • 方法1:gsutil で GCS バケットから DAG ファイルをダウンロード → 編集 → アップロード
  • 方法2:GCP コンソールの Cloud Storage ブラウザで直接編集

GCS バケット名がわからない場合は gcloud composer environments describe で確認できることも案内してくれています。

DAG コードの修正方法だけでなく、Cloud Composer 特有の DAG 更新フロー(GCS 経由でのアップロード)まで踏まえた回答が得られる点は、運用現場でも使いやすそうです。

さらに踏み込んだ質問

コードレベルの具体的な修正案が欲しいときは、もう一歩踏み込んで質問します。

質問:

具体的にはコードをどう書き換えればいい?bigquery-public-data.samples.shakespeare を使った修正版のコードを見せて

cloud-composer-mcp_10.png

今度は具体的な修正版コードを提示してくれました。

from datetime import datetime
from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

with DAG(
    dag_id="failing_sample_dag",
    start_date=datetime(2026, 4, 1),
    schedule="@daily",
    catchup=False,
) as dag:
    query_shakespeare_table = BigQueryInsertJobOperator(
        task_id="query_shakespeare_table",
        configuration={
            "query": {
                "query": """
                    SELECT word, word_count
                    FROM `bigquery-public-data.samples.shakespeare`
                    WHERE corpus = 'kinglear'
                    ORDER BY word_count DESC
                    LIMIT 10
                """,
                "useLegacySql": False,
            }
        },
    )

修正版コードでは、元の DAG 構造を保持しながら、テーブル参照を bigquery-public-data.samples.shakespeare に変更し、task_idquery_shakespeare_table に更新されています。
元のコードとの差分が明確で、反映しやすい修正案になっています。

Cloud Composer には Airflow UI から使える Gemini Cloud Assist(2025/11/14 GA)という類似の調査機能もあります。
既存機能が Airflow UI 内で完結するのに対し、MCP サーバーは外部の AI クライアントから接続できる点が違いです。

検証後のクリーンアップ

Cloud Composer 環境は作成したままにしておくと料金が発生し続けるため、検証が終わったら環境を削除しておきましょう。

ADC で Google Cloud の認証情報をローカル(~/.config/gcloud/application_default_credentials.json)に保存しているので、こちらも合わせて revoke しておきます。

gcloud auth application-default revoke

まとめ

以上、Cloud Composer MCP サーバーを Gemini CLI から使って、DAG の失敗原因調査を試してみました。

  • Airflow UI を開かずに対話形式で原因調査ができる
  • find_last_failed_dag_runslist_failed_task_instancesget_dag_source_code とツールが連鎖的に呼ばれる
  • ソースコードや修正案まで Gemini CLI 上で確認できる

今回は Gemini CLI から利用しましたが、以下の記事のように Claude からも同様に利用できます。

https://dev.classmethod.jp/articles/google-cloud-mcp-server-with-claude/

AI クライアント側で他の MCP サーバー(BigQuery、Cloud Storage など)と組み合わせることで、より広い範囲を自然言語で運用できそうなので、機会があれば触ってみようと思います。

DAG 運用に AI クライアントを組み合わせてみたいという方は、ぜひ皆様の業務でも活用してみてはいかがでしょうか?

参考

この記事をシェアする

関連記事