DatabricksのマネージドMCPサーバーを利用してみた

DatabricksのマネージドMCPサーバーを利用してみた

Clock Icon2025.06.12

データ事業本部のueharaです。

今回は、DatabricksのマネージドMCPサーバーを利用してみたいと思います。

はじめに

先日、DatabricksからマネージドMCPサーバーが発表されました。

https://docs.databricks.com/aws/en/generative-ai/agent-framework/mcp

これまでもローカルでDatabricksのMCPサーバーを実装し利用する例はあったのですが、この度プラットフォーム側でマネージドなものが利用できるようになり、より簡単に利用できるようになりました。

提供されているMCPサーバー

提供されているMCPサーバーは以下の通りです。

MCPサーバー 説明 URL
Vector search 指定したUnity Catalogのスキーマのベクトル検索インデックスをクエリできる https://<your-workspace-hostname>/api/2.0/mcp/vector-search/{catalog_name}/{schema_name}
Unity Catalog functions 指定したUnity CatalogスキーマのUnity Catalog関数の実行ができる https://<your-workspace-hostname>/api/2.0/mcp/functions/{catalog_name}/{schema_name}
Genie space 指定したGenieスペースをクエリできる https://<your-workspace-hostname>/api/2.0/mcp/genie/{genie_space_id}

事前準備

記事執筆時点ではベータ版のため、機能有効化の設定から行います。

設定にアクセスするため、Databricksのプロフィールアイコンから『Previews』を選択します。

20250612_databricks_01

Previewsの一覧に『Managed MCP Servers』があると思うので、これをONにします。

20250612_databricks_02

これで事前準備は完了です。

MCPホスト(クライアント)の作成、設定

マネージドMCPサーバーを利用する設定は済んだので、MCPホスト(クライアント)側の作成や設定を行います。

Databricksの認証設定

OAuthを使用してワークスペースへの認証を行います。

※Databricks CLIが必要になるので、インストールがまだの方はこちらを参考にインストールして下さい。

$ databricks auth login --host https://<your-workspace-hostname>

認証時にプロファイル名の指定を求められるので、任意に設定して下さい。(こちらのプロファイル名は後で利用するので控えておいて下さい)

20250612_databricks_03

認証が完了するとブラウザに以下のような画面が表示されるかと思います。

20250612_databricks_04

ライブラリのインストール

以下コマンドで、後段のプログラムに必要なDatabricks関連ライブラリをインストールします。

$ pip install -U --pre databricks-mcp "mcp>=1.9" databricks-sdk databricks-agents

クライアントスクリプトの作成

以下のようなスクリプトを作成してみました。

LLMはAmazon Bedrockで Claude 3.7 Sonnet を利用するようにしています。

databricks_mcp_test.py
import asyncio
import json
from contextlib import asynccontextmanager
from typing import Any, Dict, List, Optional

import boto3
from databricks.sdk import WorkspaceClient
from databricks_mcp import DatabricksOAuthClientProvider
from mcp.client.session import ClientSession
from mcp.client.streamable_http import streamablehttp_client

class BedrockMCPAgent:
    """
    Databricks MCPサーバーをツールとして利用するBedrock Agent。
    BedrockのConverse APIを使用して、ツール利用を含む対話を行う。
    """

    def __init__(
        self,
        databricks_profile: str,
        mcp_server_url: Optional[str] = None,
        bedrock_model_id: str = "apac.anthropic.claude-3-7-sonnet-20250219-v1:0",
        aws_region: str = "ap-northeast-1",
    ):
        self.workspace_client = WorkspaceClient(profile=databricks_profile)
        self.host = self.workspace_client.config.host
        self.mcp_server_url = (
            mcp_server_url or "{host}/api/2.0/mcp/functions/system/ai"
        ).format(host=self.host)
        self.bedrock_client = boto3.client("bedrock-runtime", region_name=aws_region)
        self.bedrock_model_id = bedrock_model_id

    @asynccontextmanager
    async def _mcp_session(self):
        """MCPサーバーとのセッションを非同期で確立"""
        async with streamablehttp_client(
            url=self.mcp_server_url,
            auth=DatabricksOAuthClientProvider(self.workspace_client),
        ) as (reader, writer, _):
            async with ClientSession(reader, writer) as session:
                await session.initialize()
                yield session

    async def _list_mcp_tools(self) -> List[Dict]:
        """MCPサーバーから利用可能なツールを取得し、Bedrockの形式に変換"""
        async with self._mcp_session() as session:
            tools_response = await session.list_tools()
            return [
                {
                    "toolSpec": {
                        "name": tool.name,
                        "description": tool.description,
                        "inputSchema": {"json": tool.inputSchema},
                    }
                }
                for tool in tools_response.tools
            ]

    async def _call_mcp_tool(self, tool_name: str, parameters: Dict[str, Any]) -> str:
        """指定されたMCPツールを非同期で実行"""
        print(f"--- MCP Tool Call ---")
        print(f"Tool: {tool_name}")
        # デバッグ用
        # print(f"Arguments: {json.dumps(parameters, indent=2)}")
        try:
            async with self._mcp_session() as session:
                response = await session.call_tool(name=tool_name, arguments=parameters)
                result = "".join([content.text for content in response.content])
                # デバッグ用
                # print(f"Result: {result}")
                print(f"-----------------------")
                return result
        except Exception as e:
            error_message = f"Error calling MCP tool {tool_name}: {str(e)}"
            print(error_message)
            return error_message

    async def chat(self, user_input: str, session_id: str = "local-session") -> Dict:
        """
        Converse APIを使用してチャットを実行する。
        ツール利用が必要な場合は、複数回のAPI呼び出しを自動的に行う。
        """
        instruction = """あなたはDatabricksのMCPツールを使用できるAIアシスタントです。
ユーザーの質問に対して、必要であれば利用可能なツールを判断して使用し、その結果に基づいて回答してください。
ツールの実行結果はユーザーに直接見せず、結果を解釈して自然な文章で回答を生成してください。"""

        try:
            # 利用可能なツールリストを取得
            mcp_tools = await self._list_mcp_tools()
            tool_config = {"tools": mcp_tools}

            # 対話履歴の初期化
            messages = [{"role": "user", "content": [{"text": user_input}]}]

            # ツール呼び出しのループ
            final_response_text = ""
            tool_calls_executed = []

            while True:
                # boto3のAPIを非同期で呼び出す
                response = await asyncio.to_thread(
                    self.bedrock_client.converse,
                    modelId=self.bedrock_model_id,
                    messages=messages,
                    system=[{"text": instruction}],
                    toolConfig=tool_config,
                )

                model_message = response["output"]["message"]
                # モデルの応答を履歴に追加
                messages.append(model_message)

                # 応答からテキスト部分とツール呼び出し部分を分離
                text_content = "".join(
                    [c["text"] for c in model_message["content"] if "text" in c]
                )
                tool_use_requests = [
                    c["toolUse"] for c in model_message["content"] if "toolUse" in c
                ]

                if text_content:
                    final_response_text += text_content

                # ツール呼び出しがなければループを終了
                if not tool_use_requests:
                    break

                # ツール実行
                tool_results = []
                for tool_request in tool_use_requests:
                    tool_name = tool_request["name"]
                    tool_input = tool_request["input"]
                    tool_use_id = tool_request["toolUseId"]

                    # MCPツールを呼び出す
                    tool_result_str = await self._call_mcp_tool(tool_name, tool_input)

                    # Bedrockに返すための結果を作成
                    tool_results.append(
                        {
                            "toolResult": {
                                "toolUseId": tool_use_id,
                                "content": [{"text": tool_result_str}],
                            }
                        }
                    )
                    tool_calls_executed.append(
                        {
                            "tool_name": tool_name,
                            "input": tool_input,
                            "result": tool_result_str,
                        }
                    )

                # ツール実行結果を履歴に追加して、次の対話の入力とする
                messages.append({"role": "user", "content": tool_results})

            return {
                "response": final_response_text,
                "tool_calls": tool_calls_executed,
                "session_id": session_id,
            }

        except Exception as e:
            return {"error": str(e), "session_id": session_id}

async def main():
    # 設定
    DATABRICKS_PROFILE = "<YOUR_DATABRICKS_CLI_PROFILE>"  # Databricksのプロファイル名

    try:
        agent = BedrockMCPAgent(
            databricks_profile=DATABRICKS_PROFILE
        )

        print("Bedrock MCP Agent を起動しました")
        print("利用可能なMCPツールを確認中...")

        tools = await agent._list_mcp_tools()
        if not tools:
            print("利用可能なMCPツールが見つかりませんでした。DatabricksのMCPサーバーの設定を確認してください。")
            return

        print(f"{len(tools)} 個のツールが利用可能です:")
        for tool in tools:
            spec = tool["toolSpec"]
            print(f"  - {spec['name']}: {spec['description']}")

    except Exception as e:
        print(f"\n[エラー] エージェントの初期化に失敗しました: {e}")
        print("Databricksのプロファイル設定 (`~/.databrickscfg`) やMCPサーバーのURLが正しいか確認してください。")
        return

    print("\nチャットを開始します (終了するには 'quit' と入力)")
    print("-" * 50)

    session_id = f"local-session-{int(asyncio.get_event_loop().time())}"

    while True:
        try:
            user_input = input("\nYou: ").strip()

            if user_input.lower() in ["quit", "exit", "q"]:
                break
            if not user_input:
                continue

            print("Assistant: (考え中...)")
            result = await agent.chat(user_input, session_id)

            if "error" in result:
                print(f"\n[エラー]: {result['error']}")
            else:
                print(f"\nAssistant: {result['response']}")
                if result["tool_calls"]:
                    print("\n[使用されたツール]")
                    for i, call in enumerate(result["tool_calls"], 1):
                        print(f"  {i}. {call['tool_name']}")

        except KeyboardInterrupt:
            print("\n\nチャットを中断しました。")
            break
        except Exception as e:
            print(f"\n[予期しないエラー]: {e}")

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except Exception as e:
        print(f"プログラムの実行中にエラーが発生しました: {e}")

<YOUR_DATABRICKS_CLI_PROFILE> には、前述の手順で作成したプロファイル名を指定して下さい。

また、MCPサーバーのURLのデフォルト値は公式ドキュメントのサンプルと同様に {host}/api/2.0/mcp/functions/system/ai としています。

Databricksには system.ai.python_exec というPythonコードを動的に利用できる組み込みUnity Catalog関数があるため、特にMCPサーバーのURLが指定されていない場合は上記を利用するためのMCPサーバーにアクセスするようなイメージです。

実行してみた

以下のコマンドで作成したクライアントスクリプトを実行します。

$ python databricks_mcp_test.py

実行すると以下のような画面が表示されます。

2025-612_databricks_05

ツールとして、想定通り system__ai__python_exec が利用できることが伺えます。

説明から、Pythonはサンドボックス環境で実行されファイル等にはアクセスできないようなので、適当にFizzBuzzを解いてもらいます。

You: FizzBuzzを15まで実行して下さい。

結果は以下の通りで、 system__ai__python_exec を利用してFizzBuzzを解いてくれました。

2025-612_databricks_06

他にも試してみます。

私の手元環境にはMovieLensというデータセットをテーブル化し、それを参照するGenieスペースがあります。

2025-612_databricks_07

先のクライアントスクリプトを修正し、このGenieスペースにアクセスするよう設定します。( <GENIE_SPACE_ID> は要置き換え)

        agent = BedrockMCPAgent(
            databricks_profile=DATABRICKS_PROFILE,
            mcp_server_url="{host}/api/2.0/mcp/genie/<GENIE_SPACE_ID>",
        )

これを実行すると以下のように出力されました。

20250612_databricks_07

query_space_{genie_space_id} というツールが利用できるようで、これによりスペースにクエリできるようです。

以下のように指定してみました。

You: 評価件数の多い映画トップ5を教えて

出力結果は次の通りで、データにアクセスできていることが確認できました。

20250612_databricks_08

最後に

今回は、DatabricksのマネージドMCPサーバーを利用してみました。

MCPサーバーはDatabricksによってホストおよび保守されるため、簡単に接続を始められて良いなと感じました。

独自のカスタムまたはサードパーティのMCPサーバーをDatabricks Appsとしてホストすることもできるので、気になった方は公式ドキュメントをご確認下さい。

参考になりましたら幸いです。

参考文献

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.