メインコンテンツまでスキップ

Databricksにおけるモデルコンテキストプロトコル (MCP)

このページでは、Databricks で MCP を使用する方法について説明します。MCP は、AI エージェントをツール、リソース、プロンプト、およびその他のコンテキスト情報に接続するオープンソース標準です。

MCP の主な利点は標準化です。ツールを一度作成すれば、自分で作成したエージェントでも、サードパーティのエージェントでも、どのエージェントでも使用できます。同様に、チームまたは組織外の他のユーザーが開発したツールを使用できます。

Databricks には、次の MCP オプションが用意されています。

  • マネージド MCP サーバー: Databricks には、エージェントがデータをクエリしたり、Unity Catalog のツールにアクセスしたりできる、すぐに使用できるサーバーがあります。Unity Catalog の権限は常に適用されるため、エージェントとユーザーは許可されたツールとデータにのみアクセスできます。

  • カスタム MCP サーバー: 独自の MCP サーバーを Databricks アプリ として安全にホストし、独自のサーバーを持ち込んだり、サードパーティの MCP サーバーを実行したりします。

マネージド MCP サーバー

備考

ベータ版

この機能は ベータ版です。

Databricks には、エージェントをエンタープライズ データに簡単に接続するために、次のマネージド MCP サーバーが用意されています。これらのサーバーはすぐに使用でき、Databricks によってホストおよび保守されます。

MCPサーバ

説明

URL

ベクトル検索

エージェントは、特定のUnity Catalogのスキーマのベクトル検索インデックス をクエリできます。

https://<your-workspace-hostname>/api/2.0/mcp/vector-search/{catalog_name}/{schema_name}

Unity Catalog の関数

エージェントが指定した Unity Catalog スキーマの Unity Catalog 関数 を実行できるようにします。

https://<your-workspace-hostname>/api/2.0/mcp/functions/{catalog_name}/{schema_name}

Genieスペース

エージェントは、指定された Genieスペース をクエリして、(Unity Catalog内のテーブルの)構造化データから知見を得ることができます

https://<your-workspace-hostname>/api/2.0/mcp/genie/{genie_space_id}

エージェントに複数のサーバー URL を指定して、複数のツールとデータに接続できます。たとえば、次の URL を指定して、エージェントが顧客サポート チケット、請求テーブルをクエリし、請求関連機能を実行できるようにすることができます。

  • https://<your-workspace-hostname>/api/2.0/mcp/vector-search/prod/customer_support

    • エージェントは、 prod.customer_support スキーマのベクトル検索インデックスを使用して非構造化データを検索できます
  • https://<your-workspace-hostname>/api/2.0/mcp/genie/{genie_space_id}

    • エージェントは、 prod.billing スキーマに接続されたGenieスペースを使用して構造化データを検索できます
  • https://<your-workspace-hostname>/api/2.0/mcp/functions/prod/billing

    • エージェントは、Unity Catalog 関数 (カスタム Python または SQL データ取得 UDF) を実行できるようにします。 prod.billing

Databricks アプリを使用して MCP サーバーをホストする

また、独自のカスタムまたはサードパーティの MCP サーバーを Databricks アプリとしてホストすることもできます。これは、デプロイして組織内の他のユーザーと共有する MCP サーバーが既にある場合や、サードパーティの MCP サーバーをツールのソースとして実行したい場合に便利です。

Databricks アプリとしてホストされる MCP サーバーは、 ストリーム可能な HTTP トランスポートなどの HTTP 互換トランスポートを実装する必要があります。

ヒント

独自の MCP サーバーを作成し、それを Databricks アプリとしてデプロイする例については、 カスタム MCP サーバー リポジトリ を参照してください。

既存の Python MCP サーバーを Databricks アプリとしてホストするには、次の手順に従います。

  1. サーバーのルートディレクトリに requirements.txt を追加し、サーバーのPython依存関係を指定します。

    Python MCP サーバは多くの場合、パッケージ管理に uv を使用します。uvを使用する場合は、uvを追加すると、追加の依存関係のインストールが処理されます。

  2. サーバーを実行するための CLI コマンドを指定する app.yaml を追加します。

    デフォルトでは、 Databricks アプリはポート8000でリッスンします。 サーバーが別のポートでリッスンする場合は、app.yaml ファイルで環境変数のオーバーライドを使用して設定します。

    app.yaml:

    YAML
    command: [
    'uv',
    'run',
    'your-server-name',
    ..., # optionally include additional parameters here
    ]
  3. MCP サーバーをホストする Databricks アプリを作成します。

    Bash
    databricks apps create mcp-my-custom-server
  4. ソース コードを Databricks にアップロードし、 app.yaml ファイルを含むディレクトリから次のコマンドを実行してアプリをデプロイします。

    Bash
    DATABRICKS_USERNAME=$(databricks current-user me | jq -r .userName)
    databricks sync . "/Users/$DATABRICKS_USERNAME/mcp-my-custom-server"
    databricks apps deploy mcp-my-custom-server --source-code-path "/Workspace/Users/$DATABRICKS_USERNAME/mcp-my-custom-server"

MCP を使用したエージェントの構築

このセクションでは、Databricks 上の MCP サーバーに接続するカスタム コード エージェントを記述する方法について説明します。

備考

ベータ版

次のコード スニペットを使用するには、マネージド MCP サーバーの ベータ版 に登録する必要があります。

Databricks 上の MCP サーバーへの接続は、他のリモート MCP サーバーと似ています。サーバーには、MCP Python SDK などの標準 SDK を使用して接続できます。主な違いは、Databricks MCP サーバーはデフォルトでセキュリティで保護されており、クライアントに認証を指定する必要があることです。databricks-mcp Python ライブラリは、カスタム エージェント コードでの認証を簡略化するのに役立ちます。

エージェントコードを開発する最も簡単な方法は、ローカルで実行し、ワークスペースに対して認証することです。次の手順を使用して、Databricks MCP サーバーに接続する AI エージェントを構築します。

  1. OAuth を使用してワークスペースへの認証を行います。ローカルターミナルで次のコマンドを実行します。

    Bash
    databricks auth login --host https://<your-workspace-hostname>
  2. Python 3.12 以降のローカル環境があることを確認してから、依存関係をインストールします。

    Bash
    pip install -U databricks-mcp "mcp>=1.9" "databricks-sdk[openai]" "mlflow>=3.1.0" "databricks-agents>=1.0.0"
  3. 次のスニペットを実行して、MCP サーバーへの接続を検証します。このスニペットには、Unity Catalog ツールが一覧表示され、 組み込みの Python コード インタープリター ツールが実行されます。この snipet を実行するには、ワークスペースでサーバレス コンピュートを有効にする必要があります

    Python
    import asyncio

    from mcp.client.streamable_http import streamablehttp_client
    from mcp.client.session import ClientSession
    from databricks_mcp import DatabricksOAuthClientProvider
    from databricks.sdk import WorkspaceClient

    # TODO: Update to the Databricks CLI profile name you specified when
    # configuring authentication to the workspace.
    databricks_cli_profile = "YOUR_DATABRICKS_CLI_PROFILE"
    assert (
    databricks_cli_profile != "YOUR_DATABRICKS_CLI_PROFILE"
    ), "Set databricks_cli_profile to the Databricks CLI profile name you specified when configuring authentication to the workspace"
    workspace_client = WorkspaceClient(profile=databricks_cli_profile)
    workspace_hostname = workspace_client.config.host
    mcp_server_url = f"{workspace_hostname}/api/2.0/mcp/functions/system/ai"


    # This snippet below uses the Unity Catalog functions MCP server to expose built-in
    # AI tools under `system.ai`, like the `system.ai.python_exec` code interpreter tool
    async def test_connect_to_server():
    async with streamablehttp_client(
    f"{mcp_server_url}", auth=DatabricksOAuthClientProvider(workspace_client)
    ) as (read_stream, write_stream, _), ClientSession(
    read_stream, write_stream
    ) as session:
    # List and call tools from the MCP server
    await session.initialize()
    tools = await session.list_tools()
    print(
    f"Discovered tools {[t.name for t in tools.tools]} "
    f"from MCP server {mcp_server_url}"
    )
    result = await session.call_tool(
    "system__ai__python_exec", {"code": "print('Hello, world!')"}
    )
    print(
    f"Called system__ai__python_exec tool and got result " f"{result.content}"
    )


    if __name__ == "__main__":
    asyncio.run(test_connect_to_server())
  4. 上記のスニペットに基づいて、ツールを使用する基本的なシングルターンエージェントを定義できます。エージェントコードを mcp_agent.py という名前のファイルとしてローカルに保存し、後続のセクションでデプロイできるようにします。

    Python
    from contextlib import asynccontextmanager
    import json
    import uuid
    import asyncio
    from typing import Any, Callable, List
    from pydantic import BaseModel

    import mlflow
    from mlflow.pyfunc import ResponsesAgent
    from mlflow.types.responses import ResponsesAgentRequest, ResponsesAgentResponse

    from databricks_mcp import DatabricksOAuthClientProvider
    from databricks.sdk import WorkspaceClient
    from mcp.client.session import ClientSession
    from mcp.client.streamable_http import streamablehttp_client

    # 1) CONFIGURE YOUR ENDPOINTS/PROFILE
    LLM_ENDPOINT_NAME = "databricks-claude-3-7-sonnet"
    SYSTEM_PROMPT = "You are a helpful assistant."
    DATABRICKS_CLI_PROFILE = "YOUR_DATABRICKS_CLI_PROFILE"
    assert (
    DATABRICKS_CLI_PROFILE != "YOUR_DATABRICKS_CLI_PROFILE"
    ), "Set DATABRICKS_CLI_PROFILE to the Databricks CLI profile name you specified when configuring authentication to the workspace"
    workspace_client = WorkspaceClient(profile=DATABRICKS_CLI_PROFILE)
    host = workspace_client.config.host
    # Add more MCP server URLs here if desired, e.g
    # f"{host}/api/2.0/mcp/vector-search/prod/billing"
    # to include vector search indexes under the prod.billing schema, or
    # f"{host}/api/2.0/mcp/genie/<genie_space_id>"
    # to include a Genie space
    MCP_SERVER_URLS = [
    f"{host}/api/2.0/mcp/functions/system/ai",
    ]


    # 2) HELPER: convert between ResponsesAgent “message dict” and ChatCompletions format
    def _to_chat_messages(msg: dict[str, Any]) -> List[dict]:
    """
    Take a single ResponsesAgent‐style dict and turn it into one or more
    ChatCompletions‐compatible dict entries.
    """
    msg_type = msg.get("type")
    if msg_type == "function_call":
    return [
    {
    "role": "assistant",
    "content": None,
    "tool_calls": [
    {
    "id": msg["call_id"],
    "type": "function",
    "function": {
    "name": msg["name"],
    "arguments": msg["arguments"],
    },
    }
    ],
    }
    ]
    elif msg_type == "message" and isinstance(msg["content"], list):
    return [
    {
    "role": "assistant" if msg["role"] == "assistant" else msg["role"],
    "content": content["text"],
    }
    for content in msg["content"]
    ]
    elif msg_type == "function_call_output":
    return [
    {
    "role": "tool",
    "content": msg["output"],
    "tool_call_id": msg["tool_call_id"],
    }
    ]
    else:
    # fallback for plain {"role": ..., "content": "..."} or similar
    return [
    {
    k: v
    for k, v in msg.items()
    if k in ("role", "content", "name", "tool_calls", "tool_call_id")
    }
    ]


    # 3) “MCP SESSION” + TOOL‐INVOCATION LOGIC
    @asynccontextmanager
    async def _mcp_session(server_url: str, ws: WorkspaceClient):
    async with streamablehttp_client(
    url=server_url, auth=DatabricksOAuthClientProvider(ws)
    ) as (reader, writer, _):
    async with ClientSession(reader, writer) as session:
    await session.initialize()
    yield session


    def _list_tools(server_url: str, ws: WorkspaceClient):
    async def inner():
    async with _mcp_session(server_url, ws) as sess:
    return await sess.list_tools()

    return asyncio.run(inner())


    def _make_exec_fn(
    server_url: str, tool_name: str, ws: WorkspaceClient
    ) -> Callable[..., str]:
    def exec_fn(**kwargs):
    async def call_it():
    async with _mcp_session(server_url, ws) as sess:
    resp = await sess.call_tool(name=tool_name, arguments=kwargs)
    return "".join([c.text for c in resp.content])

    return asyncio.run(call_it())

    return exec_fn


    class ToolInfo(BaseModel):
    name: str
    spec: dict
    exec_fn: Callable


    def _fetch_tool_infos(ws: WorkspaceClient, server_url: str) -> List[ToolInfo]:
    print(f"Listing tools from MCP server {server_url}")
    infos: List[ToolInfo] = []
    mcp_tools = _list_tools(server_url, ws).tools
    for t in mcp_tools:
    schema = t.inputSchema.copy()
    if "properties" not in schema:
    schema["properties"] = {}
    spec = {
    "type": "function",
    "function": {
    "name": t.name,
    "description": t.description,
    "parameters": schema,
    },
    }
    infos.append(
    ToolInfo(
    name=t.name, spec=spec, exec_fn=_make_exec_fn(server_url, t.name, ws)
    )
    )
    return infos


    # 4) “SINGLE‐TURN” AGENT CLASS
    class SingleTurnMCPAgent(ResponsesAgent):
    def _call_llm(self, history: List[dict], ws: WorkspaceClient, tool_infos):
    """
    Send current history → LLM, returning the raw response dict.
    """
    client = ws.serving_endpoints.get_open_ai_client()
    flat_msgs = []
    for msg in history:
    flat_msgs.extend(_to_chat_messages(msg))

    return client.chat.completions.create(
    model=LLM_ENDPOINT_NAME,
    messages=flat_msgs,
    tools=[ti.spec for ti in tool_infos],
    )

    def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
    ws = WorkspaceClient(profile=DATABRICKS_CLI_PROFILE)

    # 1) build initial history: system + user
    history: List[dict] = [{"role": "system", "content": SYSTEM_PROMPT}]
    for inp in request.input:
    history.append(inp.model_dump())

    # 2) call LLM once
    tool_infos = [
    tool_info
    for mcp_server_url in MCP_SERVER_URLS
    for tool_info in _fetch_tool_infos(ws, mcp_server_url)
    ]
    tools_dict = {tool_info.name: tool_info for tool_info in tool_infos}
    llm_resp = self._call_llm(history, ws, tool_infos)
    raw_choice = llm_resp.choices[0].message.to_dict()
    raw_choice["id"] = uuid.uuid4().hex
    history.append(raw_choice)

    tool_calls = raw_choice.get("tool_calls") or []
    if tool_calls:
    # (we only support a single tool in this “single‐turn” example)
    fc = tool_calls[0]
    name = fc["function"]["name"]
    args = json.loads(fc["function"]["arguments"])
    try:
    tool_info = tools_dict[name]
    result = tool_info.exec_fn(**args)
    except Exception as e:
    result = f"Error invoking {name}: {e}"

    # 4) append the “tool” output
    history.append(
    {
    "type": "function_call_output",
    "role": "tool",
    "id": uuid.uuid4().hex,
    "tool_call_id": fc["id"],
    "output": result,
    }
    )

    # 5) call LLM a second time and treat that reply as final
    followup = (
    self._call_llm(history, ws, tool_infos=[]).choices[0].message.to_dict()
    )
    followup["id"] = uuid.uuid4().hex

    assistant_text = followup.get("content", "")
    return ResponsesAgentResponse(
    output=[
    {
    "id": uuid.uuid4().hex,
    "type": "message",
    "role": "assistant",
    "content": [{"type": "output_text", "text": assistant_text}],
    }
    ],
    custom_outputs=request.custom_inputs,
    )

    # 6) if no tool_calls at all, return the assistant’s original reply
    assistant_text = raw_choice.get("content", "")
    return ResponsesAgentResponse(
    output=[
    {
    "id": uuid.uuid4().hex,
    "type": "message",
    "role": "assistant",
    "content": [{"type": "output_text", "text": assistant_text}],
    }
    ],
    custom_outputs=request.custom_inputs,
    )


    mlflow.models.set_model(SingleTurnMCPAgent())

    if __name__ == "__main__":
    req = ResponsesAgentRequest(
    input=[{"role": "user", "content": "What's the 100th Fibonacci number?"}]
    )
    resp = SingleTurnMCPAgent().predict(req)
    for item in resp.output:
    print(item)

MCP を使用したエージェントのデプロイ

MCP サーバーに接続するエージェントをデプロイする準備ができたら、 標準のエージェント デプロイ プロセスを使用します。

ロギング時にエージェントがアクセスする必要があるすべてのリソースを指定してください。たとえば、エージェントが次の MCP サーバー URL を使用しているとします。

  • https://<your-workspace-hostname>/api/2.0/mcp/vector-search/prod/customer_support
  • https://<your-workspace-hostname>/api/2.0/mcp/vector-search/prod/billing
  • https://<your-workspace-hostname>/api/2.0/mcp/functions/prod/billing

エージェントが必要とするすべてのベクトル検索インデックスを prod.customer_support スキーマと prod.billing スキーマとして指定し、すべての Unity Catalog 関数を prod.billingに指定する必要があります。

たとえば、上記で定義したエージェントをデプロイするには、次のスニペットを実行できます (保存した場合 mcp_agent.pyのエージェントコード定義:

Python
import os
from databricks.sdk import WorkspaceClient
from databricks import agents
import mlflow
from mlflow.models.resources import DatabricksFunction, DatabricksServingEndpoint, DatabricksVectorSearchIndex
from mcp_agent import LLM_ENDPOINT_NAME

# TODO: Update this to your Databricks CLI profile name
databricks_cli_profile = "YOUR_DATABRICKS_CLI_PROFILE"
assert (
databricks_cli_profile != "YOUR_DATABRICKS_CLI_PROFILE"
), "Set databricks_cli_profile to the Databricks CLI profile name you specified when configuring authentication to the workspace"
workspace_client = WorkspaceClient(profile=databricks_cli_profile)


# Configure MLflow and the Databricks SDK to use your Databricks CLI profile
current_user = workspace_client.current_user.me().user_name
mlflow.set_tracking_uri(f"databricks://{databricks_cli_profile}")
mlflow.set_registry_uri(f"databricks-uc://{databricks_cli_profile}")
mlflow.set_experiment(f"/Users/{current_user}/databricks_docs_example_mcp_agent")
os.environ["DATABRICKS_CONFIG_PROFILE"] = databricks_cli_profile

# Log the agent defined in mcp_agent.py
here = os.path.dirname(os.path.abspath(__file__))
agent_script = os.path.join(here, "mcp_agent.py")
resources = [
DatabricksServingEndpoint(endpoint_name=LLM_ENDPOINT_NAME),
DatabricksFunction("system.ai.python_exec"),
# --- Uncomment and edit the following lines to specify vector search indices and additional UC functions ---
# --- if referenced via the MCP_SERVER_URLS in your agent code ---
# DatabricksVectorSearchIndex(index_name="prod.customer_support.my_index"),
# DatabricksVectorSearchIndex(index_name="prod.billing.another_index"),
# DatabricksFunction("prod.billing.my_custom_function"),
# DatabricksFunction("prod.billing.another_function"),
]

with mlflow.start_run():
logged_model_info = mlflow.pyfunc.log_model(
artifact_path="mcp_agent",
python_model=agent_script,
resources=resources,
)

# TODO Specify your UC model name here
UC_MODEL_NAME = "main.default.databricks_docs_mcp_agent"
registered_model = mlflow.register_model(logged_model_info.model_uri, UC_MODEL_NAME)

agents.deploy(
model_name=UC_MODEL_NAME,
model_version=registered_model.version,
)

コンピュート 価格

マネージド MCP サーバーのコンピュート価格は、MCP ワークロードによって異なります。

カスタム MCP サーバーには、 Databricks Apps の価格が適用されます。