Cloud Composer の MCP サーバーで Gemini CLI から DAG 失敗原因を調査してみた
こんにちは!エノカワです。
Cloud Composer は、Apache Airflow をベースに構築されたフルマネージドのワークフローオーケストレーションサービスです。
2026年4月15日のリリースノートで、Cloud Composer の名称が Managed Service for Apache Airflow に変わったことがアナウンスされました。
To more strongly embrace the success and growing customer preference for OSS solutions, Cloud Composer is evolving to become Managed Service for Apache Airflow. This name change provides improved customer understanding of our portfolio while reinforcing our commitment to being the most open cloud ecosystem.
OSS ソリューションへのコミットメントをより明確にするためのリブランディングで、サービス自体の機能や仕様に破壊的な変更があるわけではありません。
実際、Google Cloud コンソールを開くと「Managed Service for Apache Airflow(旧 Cloud Composer)」という表記になっており、環境の世代も「マネージド Airflow Gen 3 / Gen 2」と表現されるようになっています。
本記事では便宜上、従来どおり「Cloud Composer」と呼んで進めます。
同じ 2026/4/15 のリリースで、もう一つ注目すべき新機能が登場しました。
Cloud Composer MCP サーバー です。
Cloud Composer remote Model Context Protocol (MCP) server is available in Preview.
You can use Cloud Composer remote Model Context Protocol (MCP) server to connect to Cloud Composer from AI applications such as Gemini CLI, ChatGPT, Claude, or in AI applications that you're developing. The Cloud Composer MCP server lets you manage Cloud Composer environments and get details about executed DAG runs and Airflow tasks.
Gemini CLI や Claude などの AI クライアントから Cloud Composer 環境を操作できるようになり、DAG 実行や Airflow タスクの詳細取得も自然言語で行えそうです。
今回は、この機能を実際に試してみましたので、その内容をご紹介します。
Cloud Composer MCP サーバーとは
Cloud Composer MCP サーバーは、Apache Airflow 環境を AI アプリケーション(Gemini CLI / Claude / ChatGPT)から操作できるリモート MCP サーバーです。
Model Context Protocol(MCP)という標準化されたプロトコルで通信するため、対応する AI クライアントから同じインターフェースで利用できます。
提供されているツール
公式ドキュメント(MCP reference)で定義されているツールは以下の通りです。
各ツールが読み取り専用かどうかは、MCP 仕様の mcp.tool.isReadOnly 属性で判別できます。
以下は公式ドキュメントの説明をもとに分類してまとめたものです。
環境管理系
| ツール名 | 用途 | 読み書き |
|---|---|---|
list_environments |
プロジェクト内の全環境をリスト表示 | 読み取り |
get_environment |
特定環境の詳細情報を取得 | 読み取り |
create_environment |
新規環境を作成(デフォルトは小規模プライベートIP環境) | 書き込み |
delete_environment |
環境を削除(不可逆) | 書き込み |
manage_pypi_packages |
PyPI パッケージの追加・削除・更新 | 書き込み |
DAG・タスク監視系
| ツール名 | 用途 | 読み書き |
|---|---|---|
get_dag_source_code |
DAG のソースコードを取得 | 読み取り |
get_dag_run |
特定の DAG 実行の詳細(状態・開始/終了時刻など)を取得 | 読み取り |
find_last_failed_dag_runs |
失敗した DAG 実行を検索(DAG 指定可) | 読み取り |
list_failed_task_instances |
DAG 実行内の失敗タスクをリスト表示 | 読み取り |
get_task_instance |
タスク実行の詳細情報を取得 | 読み取り |
その他
| ツール名 | 用途 | 読み書き |
|---|---|---|
get_operation |
長時間実行オペレーションのステータス確認 | 読み取り |
大半が読み取り専用ツールで、書き込み系は create_environment・delete_environment・manage_pypi_packages の3つだけです。
接続情報
| 項目 | 値 |
|---|---|
| サーバー名 | Managed Service for Apache Airflow MCP server |
| エンドポイント | composer.{region}.rep.googleapis.com/mcp |
| 転送方式 | HTTP |
| 認証 | Google Cloud 認証情報・OAuth Client ID・エージェント ID のいずれか |
環境作成
DAG を動かすための環境を作成します。
Google Cloud コンソールで「Cloud Composer」を開くと、Managed Service for Apache Airflow(旧 Cloud Composer) という画面が表示されます。
名称変更が UI にも反映されていますね。

「環境の作成」→「マネージド Airflow Gen 3」(旧称:Composer 3)を選択します。
環境作成フォームで以下のように設定します。
| 項目 | 値 |
|---|---|
| 名前 | test-composer |
| 場所 | asia-northeast1 |
| イメージのバージョン | composer-3-airflow-3.1.7-build.5(Airflow 3 最新ビルド) |
| サービスアカウント | デフォルト(検証用) |
| 復元力モード | 標準的な復元力 |

本番運用ではユーザーマネージドのサービスアカウントが推奨ですが、今回は検証用なのでデフォルトのものを使用します。

環境作成には 20〜30 分ほどかかります。
事前準備
必要な IAM ロール
MCP サーバーを利用するユーザー(または Gemini CLI を実行するプリンシパル)に、以下のロールを付与します。
| ロール | 用途 |
|---|---|
roles/serviceusage.serviceUsageAdmin |
Service Usage 管理 |
roles/mcp.toolUser |
MCP ツール利用 |
OAuth クライアント ID の作成
ADC(Application Default Credentials)の認証に使う OAuth クライアント ID を作成します。
なお、OAuth クライアント ID を使った MCP サーバーの認証例については、以下の記事も参考になります。
Google Cloud コンソールの「APIとサービス」→「認証情報」→「認証情報を作成」→「OAuth クライアント ID」を選択します。
- アプリケーションの種類:デスクトップアプリ
- 名前:
composer-mcp-adc-client(任意)

作成後に表示されるダイアログから JSON をダウンロード して、ローカルに保存します。
クライアントシークレットは後から取得できないので、このタイミングで必ず保存してください。

Application Default Credentials の設定
作成した OAuth クライアント ID の JSON ファイルを指定して ADC を設定します。
gcloud auth application-default login \
--client-id-file=$HOME/Downloads/client_secret_XXXX.apps.googleusercontent.com.json \
--scopes=https://www.googleapis.com/auth/cloud-platform,https://www.googleapis.com/auth/cloudcomposer.readonly
ブラウザが開いて OAuth 認証フローが走ります。
Google アカウントを選択して「許可」をクリックすると、ローカルに認証情報が保存されます。
認証後、トークンが取得できるか確認します。
gcloud auth application-default print-access-token | head -c 50
Gemini CLI への MCP サーバー登録
Gemini CLI は extension 単位で MCP サーバーを登録します。
~/.gemini/extensions/ 配下に拡張機能用のディレクトリを作成し、gemini-extension.json を配置します。
mkdir -p ~/.gemini/extensions/cloud-composer
{
"name": "cloud-composer",
"version": "1.0.0",
"mcpServers": {
"cloud-composer": {
"httpUrl": "https://composer.asia-northeast1.rep.googleapis.com/mcp",
"authProviderType": "google_credentials",
"oauth": {
"scopes": ["https://www.googleapis.com/auth/cloudcomposer.readonly"]
},
"timeout": 30000,
"headers": {
"x-goog-user-project": "PROJECT_ID"
}
}
}
}
Gemini CLI の認証モードとモデル選定
Gemini CLI 起動時に認証モードとモデルが自動で選ばれますが、企業利用では Vertex AI 認証への切り替えを検討するとよいでしょう。
Vertex AI 認証ではプロンプトや回答が Google のモデル学習に使われないため、業務データを扱う場合に安心して利用できます。
Gemini CLI の認証方式ごとのプライバシー差異や切り替え方法については、以下の記事が参考になります。
接続確認
Gemini CLI を起動します。
gemini
起動後のプロンプトで /mcp と入力すると、登録済みの MCP サーバーとツール一覧が表示されます。

cloud-composer (from cloud-composer) - Ready (11 tools) と表示されていれば接続成功です。
ログ表示で見ると以下のようになります。
Configured MCP servers:
🟢 cloud-composer (from cloud-composer) - Ready (11 tools)
Tools:
- mcp_cloud-composer_create_environment
- mcp_cloud-composer_delete_environment
- mcp_cloud-composer_find_last_failed_dag_runs
- mcp_cloud-composer_get_dag_run
- mcp_cloud-composer_get_dag_source_code
- mcp_cloud-composer_get_environment
- mcp_cloud-composer_get_operation
- mcp_cloud-composer_get_task_instance
- mcp_cloud-composer_list_environments
- mcp_cloud-composer_list_failed_task_instances
- mcp_cloud-composer_manage_pypi_packages
実際に試してみる
① 環境情報を取得する
まずは接続確認を兼ねて、環境一覧を取得してみます。
質問:
Cloud Composer の環境一覧を教えて
✦ プロジェクトIDとロケーションを教えてください。
Gemini から「プロジェクト ID とロケーションを教えてください」と聞き返しがあったので、情報を伝えます。
質問:
プロジェクト ID は PROJECT_ID、ロケーションは asia-northeast1 です
Gemini CLI がツール list_environments を呼び出そうとし、実行許可を求めるダイアログが表示されます。

許可すると、list_environments が実行され、test-composer 環境の詳細が JSON で返ってきました。

ツールが返した JSON の主要部分を抜粋すると以下の通りです。
{
"environments": [
{
"config": {
"dagGcsPrefix": "gs://asia-northeast1-test-compos-XXXXXXXX-bucket/dags",
"environmentSize": "ENVIRONMENT_SIZE_SMALL",
"softwareConfig": {
"imageVersion": "composer-3-airflow-3.1.7-build.5"
}
},
"createTime": "2026-04-21T02:15:19.731202Z",
"name": "projects/PROJECT_ID/locations/asia-northeast1/environments/test-composer",
"state": "RUNNING"
}
]
}
test-composer 環境名・ステータス(RUNNING)・Airflow バージョン(composer-3-airflow-3.1.7-build.5)などが一目で確認できますね。
最後に Gemini から「test-composer という環境があります。」というシンプルな要約も返ってきました。
② サンプル DAG を意図的に失敗させる
次に、失敗する DAG を用意して原因調査の検証を行います。
失敗する DAG を配置
存在しない BigQuery テーブルを参照する DAG を用意しました。
サンプルなので、nonexistent_dataset.nonexistent_table という存在しないテーブルを意図的に指定しています。
from datetime import datetime
from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
with DAG(
dag_id="failing_sample_dag",
start_date=datetime(2026, 4, 1),
schedule="@daily",
catchup=False,
) as dag:
query_nonexistent_table = BigQueryInsertJobOperator(
task_id="query_nonexistent_table",
configuration={
"query": {
"query": "SELECT * FROM `nonexistent_dataset.nonexistent_table`",
"useLegacySql": False,
}
},
)
DAG を Cloud Composer 環境の DAGs フォルダにアップロードして実行させます。
アップロード後、failing_sample_dag がリトライも含めて3回失敗するまで待ちます。
Airflow UI を確認すると、failing_sample_dag が失敗になっています。

タスクの詳細画面では、BigQuery から 404 Not found エラーが返っているのが確認できます。

エラーログの要点(抜粋・マスク済み)は以下の通りです。
[2026-04-21 15:20:20] INFO - Executing: {'query': {'query': 'SELECT * FROM `nonexistent_dataset.nonexistent_table`', 'useLegacySql': False}}
[2026-04-21 15:20:21] ERROR - Task failed with exception
google.api_core.exceptions.NotFound: 404 Not found: Dataset PROJECT_ID:nonexistent_dataset was not found in location US; reason: notFound, message: Not found: Dataset PROJECT_ID:nonexistent_dataset was not found in location US
想定通り、タスクが失敗しました。
③ 失敗原因を調査する
Gemini CLI に自然言語で失敗原因を調査させてみます。
質問:
test-composer 環境で最近失敗した DAG を教えて。原因も調べて
Gemini CLI は以下の流れで、複数のツールを連鎖的に呼び出しながら調査を進めていきました。
find_last_failed_dag_runsで失敗した DAG 実行を検索 →failing_sample_dagが見つかるlist_failed_task_instancesで失敗タスクを特定 →query_nonexistent_tableが 3回リトライして失敗したことを把握get_dag_source_codeで失敗 DAG のソースコードを取得- 結果を統合して原因を推定・説明
最終的な画面はこのようになりました。
1回の質問でツールが連鎖的に呼び出され、最後に原因が自然言語で説明されています。

各ツールの呼び出しと結果は以下のように出力されていました(抜粋・マスク済み)。
① 失敗した DAG 実行の検索
✓ find_last_failed_dag_runs (cloud-composer MCP Server)
{"environment":"projects/PROJECT_ID/locations/asia-northeast1/environments/test-composer"}
{
"dagRuns": [
{
"state": "FAILED",
"executionDate": "2026-04-21T00:00:00Z",
"dagRunId": "scheduled__2026-04-21T00:00:00+00:00",
"dagId": "failing_sample_dag",
"type": "SCHEDULED"
}
]
}
✦ failing_sample_dagというDAGの実行が失敗しています。原因を調査します。
② 失敗タスクの特定
✓ list_failed_task_instances (cloud-composer MCP Server)
{
"taskInstances": [
{
"id": "query_nonexistent_table",
"taskType": "BigQueryInsertJobOperator",
"state": "FAILED",
"tryNumber": 3,
"taskId": "query_nonexistent_table",
"dagId": "failing_sample_dag"
}
]
}
✦ query_nonexistent_tableというタスクが失敗しています。DAGのソースコードを取得して、タスクの内容を調査します。
③ DAG ソースコードの取得と原因推定
✓ get_dag_source_code (cloud-composer MCP Server)
{
"code": "... SELECT * FROM `nonexistent_dataset.nonexistent_table` ..."
}
✦ 失敗したDAGは failing_sample_dag で、その中の query_nonexistent_table というタスクが失敗しています。
原因は、存在しないテーブル nonexistent_dataset.nonexistent_table にクエリを実行しようとしているためです。
DAGのソースコードでは、以下のようになっています。
1 query_nonexistent_table = BigQueryInsertJobOperator(
2 task_id="query_nonexistent_table",
3 configuration={
4 "query": {
5 "query": "SELECT * FROM `nonexistent_dataset.nonexistent_table`",
6 "useLegacySql": False,
7 }
8 },
9 )
該当箇所のソースコードも引用して、どの行に問題があるかまで示してくれました。
ちなみに、list_failed_task_instances の結果には taskType(BigQueryInsertJobOperator)や tryNumber(3回試行)などの情報が含まれていたため、Gemini は get_task_instance を呼ばずに get_dag_source_code に直接進みました。
必要最小限のツール呼び出しで原因を推定できるようです。
Airflow UI を開かずに、対話形式で DAG コードまで遡って原因を特定することができました。
④ 修正方法を質問する
会話の文脈が保持されるので、深掘りの質問もスムーズです。
質問:
どう修正すれば良い?

Gemini は単なるコード修正案ではなく、Cloud Composer の DAG ファイル更新の運用手順まで含めて回答してくれました。
✦ この問題を修正するには、DAGファイルを編集して、クエリが有効なテーブルを指すようにする必要があります。
DAGはCloud Storageバケットに保存されています。ファイルを編集するには、次の2つの方法があります。
方法1:gcloudコマンドを使用する(推奨)
1. DAGファイルをダウンロードします。
gsutil cp gs://<YOUR_BUCKET_NAME>/dags/failing_sample_dag.py .
2. ファイルを編集します。
3. ファイルをアップロードします。
gsutil cp failing_sample_dag.py gs://<YOUR_BUCKET_NAME>/dags/
方法2:GCPコンソールを使用する
1. GCPコンソールでCloud Storageブラウザに移動します。
2. お使いの環境のGCSバケットに移動します。
3. dags フォルダに移動し、failing_sample_dag.py ファイルを見つけます。
4. ファイルをクリックし、インライン編集機能を使用して、クエリを修正します。
もしGCSバケット名がわからない場合は、gcloud composer environments describe test-composer --location asia-northeast1 コマンドを実行するか、GCPコンソールで環境の詳細を確認してください。
- 方法1:
gsutilで GCS バケットから DAG ファイルをダウンロード → 編集 → アップロード - 方法2:GCP コンソールの Cloud Storage ブラウザで直接編集
GCS バケット名がわからない場合は gcloud composer environments describe で確認できることも案内してくれています。
DAG コードの修正方法だけでなく、Cloud Composer 特有の DAG 更新フロー(GCS 経由でのアップロード)まで踏まえた回答が得られる点は、運用現場でも使いやすそうです。
さらに踏み込んだ質問
コードレベルの具体的な修正案が欲しいときは、もう一歩踏み込んで質問します。
質問:
具体的にはコードをどう書き換えればいい?bigquery-public-data.samples.shakespeare を使った修正版のコードを見せて

今度は具体的な修正版コードを提示してくれました。
from datetime import datetime
from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
with DAG(
dag_id="failing_sample_dag",
start_date=datetime(2026, 4, 1),
schedule="@daily",
catchup=False,
) as dag:
query_shakespeare_table = BigQueryInsertJobOperator(
task_id="query_shakespeare_table",
configuration={
"query": {
"query": """
SELECT word, word_count
FROM `bigquery-public-data.samples.shakespeare`
WHERE corpus = 'kinglear'
ORDER BY word_count DESC
LIMIT 10
""",
"useLegacySql": False,
}
},
)
修正版コードでは、元の DAG 構造を保持しながら、テーブル参照を bigquery-public-data.samples.shakespeare に変更し、task_id も query_shakespeare_table に更新されています。
元のコードとの差分が明確で、反映しやすい修正案になっています。
Cloud Composer には Airflow UI から使える Gemini Cloud Assist(2025/11/14 GA)という類似の調査機能もあります。
既存機能が Airflow UI 内で完結するのに対し、MCP サーバーは外部の AI クライアントから接続できる点が違いです。
検証後のクリーンアップ
Cloud Composer 環境は作成したままにしておくと料金が発生し続けるため、検証が終わったら環境を削除しておきましょう。
ADC で Google Cloud の認証情報をローカル(~/.config/gcloud/application_default_credentials.json)に保存しているので、こちらも合わせて revoke しておきます。
gcloud auth application-default revoke
まとめ
以上、Cloud Composer MCP サーバーを Gemini CLI から使って、DAG の失敗原因調査を試してみました。
- Airflow UI を開かずに対話形式で原因調査ができる
find_last_failed_dag_runs→list_failed_task_instances→get_dag_source_codeとツールが連鎖的に呼ばれる- ソースコードや修正案まで Gemini CLI 上で確認できる
今回は Gemini CLI から利用しましたが、以下の記事のように Claude からも同様に利用できます。
AI クライアント側で他の MCP サーバー(BigQuery、Cloud Storage など)と組み合わせることで、より広い範囲を自然言語で運用できそうなので、機会があれば触ってみようと思います。
DAG 運用に AI クライアントを組み合わせてみたいという方は、ぜひ皆様の業務でも活用してみてはいかがでしょうか?
参考
- Use the Cloud Composer MCP server | Google Cloud Documentation
- Managed Service for Apache Airflow MCP reference
- Configure MCP in an AI application
- Authenticate AI applications to Google Cloud MCP servers
- Google Cloud MCP servers overview
- Cloud Composer release notes April 15, 2026
- Google Cloud MCPサーバをClaudeから利用する | DevelopersIO
- Gemini CLI の無料利用でも Google の ML モデルの改善に使用されなくする | DevelopersIO
- Model Context Protocol







