Databricks マネージド MCP サーバーを使用する
ベータ版
この機能は ベータ版です。
モデル コンテキスト プロトコル (MCP) サーバーは、AI エージェントが外部データやツールにアクセスできるようにするブリッジとして機能します。これらの接続を最初から構築する代わりに、Databricks マネージド MCP サーバーを使用して、エージェントを Unity Catalog、ベクトル検索インデックス、カスタム関数に格納されているデータに即座に接続できます。
使用可能な管理対象サーバー
Databricks には、すぐに使用できる 3 種類のマネージド MCP サーバーが用意されています。
MCP サーバ | 説明 | URL パターン |
---|---|---|
ベクトル検索 | 関連するドキュメントとデータを検索するためにベクトル検索インデックスをクエリーする |
|
Unity Catalog の関数 | カスタム Python ツールや SQL ツールなどの Unity Catalog 関数 を実行する |
|
Genieスペース | 構造化データテーブルから知見を取得するためのGenieスペースにクエリーする |
|
Genie の管理対象 MCP サーバーは Genie を MCP ツールとして呼び出すため、Genie APIsを呼び出すときに履歴が渡されません。別の方法として、 マルチエージェントシステムでGenieを使用することもできます。
例: 顧客サポートエージェント
顧客サポートを支援するエージェントを構築したいと想像してください。 複数の管理対象 MCP サーバーに接続できます。
-
ベクトル検索 :
https://<workspace-hostname>/api/2.0/mcp/vector-search/prod/customer_support
- サポートチケットとドキュメントの検索
-
Genieスペース :
https://<workspace-hostname>/api/2.0/mcp/genie/{billing_space_id}
- 請求データと顧客情報を照会する
-
UCの関数 :
https://<workspace-hostname>/api/2.0/mcp/functions/prod/billing
- アカウントの検索と更新のためのカスタム関数を実行します
これにより、エージェントは非構造化データ(サポートチケット)と構造化データ(請求テーブル)の両方、およびカスタムビジネスロジックにアクセスできるようになります。
ノートブック: マネージド MCP サーバーを使用してエージェントを構築する
次のノートブックは、MCP ツールを呼び出す LangGraph エージェントと OpenAI エージェントを作成する方法を示しています。
LangGraph MCPツール呼び出しエージェント
OpenAI MCP ツール呼び出しエージェント
ローカル IDE: 管理対象 MCP サーバーを使用してエージェントを構築する
Databricks 上の MCP サーバーへの接続は、他のリモート MCP サーバーと似ています。MCP Python SDKなどの標準SDKを使用してサーバーに接続できます。主な違いは、 Databricks MCP サーバーは デフォルト クライアントに認証を指定する必要があります。
databricks-mcp Python ライブラリは、カスタム エージェント コードでの認証を簡略化するのに役立ちます。
エージェントコードを開発する最も簡単な方法は、エージェントコードをローカルで実行し、ワークスペースに対して認証することです。Databricks MCP サーバーに接続する AI エージェントを構築するには、次の手順を使用します。
環境を設定する
-
OAuth を使用してワークスペースに対して認証します。ローカルターミナルで以下を実行します。
Bashdatabricks auth login --host https://<your-workspace-hostname>
-
プロンプトが表示されたらプロファイル名を作成し、この名前を覚えておいてください。
-
Python 3.12 以降のローカル環境があることを確認し、依存関係をインストールします。
Bashpip install -U "mcp>=1.9" "databricks-sdk[openai]" "mlflow>=3.1.0" "databricks-agents>=1.0.0" "databricks-mcp"
ローカル環境接続をテストする
Unity Catalog ツールを一覧表示し、 組み込みの Python コード インタープリター ツールを実行して、MCP サーバーへの接続を検証します。
サーバレス コンピュート このスニペットを実行するには、ワークスペースで有効にする必要があります。
- 次のスニペットを実行して、MCP サーバーへの接続を検証します。
from databricks_mcp import DatabricksMCPClient
from databricks.sdk import WorkspaceClient
# TODO: Update to the Databricks CLI profile name you specified when
# configuring authentication to the workspace.
databricks_cli_profile = "YOUR_DATABRICKS_CLI_PROFILE"
assert (
databricks_cli_profile != "YOUR_DATABRICKS_CLI_PROFILE"
), "Set databricks_cli_profile to the Databricks CLI profile name you specified when configuring authentication to the workspace"
workspace_client = WorkspaceClient(profile=databricks_cli_profile)
workspace_hostname = workspace_client.config.host
mcp_server_url = f"{workspace_hostname}/api/2.0/mcp/functions/system/ai"
# This snippet below uses the Unity Catalog functions MCP server to expose built-in
# AI tools under `system.ai`, like the `system.ai.python_exec` code interpreter tool
def test_connect_to_server():
mcp_client = DatabricksMCPClient(server_url=mcp_server_url, workspace_client=workspace_client)
tools = mcp_client.list_tools()
print(
f"Discovered tools {[t.name for t in tools]} "
f"from MCP server {mcp_server_url}"
)
result = mcp_client.call_tool(
"system__ai__python_exec", {"code": "print('Hello, world!')"}
)
print(
f"Called system__ai__python_exec tool and got result "
f"{result.content}"
)
if __name__ == "__main__":
test_connect_to_server()
エージェントを作成する
-
上記のスニペットに基づいて、ツールを使用する基本的なシングルターンエージェントを定義します。エージェントコードを
mcp_agent.py
という名前のファイルとしてローカルに保存します。Pythonimport json
import uuid
import asyncio
from typing import Any, Callable, List
from pydantic import BaseModel
import mlflow
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import ResponsesAgentRequest, ResponsesAgentResponse
from databricks_mcp import DatabricksMCPClient
from databricks.sdk import WorkspaceClient
# 1) CONFIGURE YOUR ENDPOINTS/PROFILE
LLM_ENDPOINT_NAME = "databricks-claude-3-7-sonnet"
SYSTEM_PROMPT = "You are a helpful assistant."
DATABRICKS_CLI_PROFILE = "YOUR_DATABRICKS_CLI_PROFILE"
assert (
DATABRICKS_CLI_PROFILE != "YOUR_DATABRICKS_CLI_PROFILE"
), "Set DATABRICKS_CLI_PROFILE to the Databricks CLI profile name you specified when configuring authentication to the workspace"
workspace_client = WorkspaceClient(profile=DATABRICKS_CLI_PROFILE)
host = workspace_client.config.host
# Add more MCP server URLs here if desired, e.g
# f"{host}/api/2.0/mcp/vector-search/prod/billing"
# to include vector search indexes under the prod.billing schema, or
# f"{host}/api/2.0/mcp/genie/<genie_space_id>"
# to include a Genie space
MANAGED_MCP_SERVER_URLS = [
f"{host}/api/2.0/mcp/functions/system/ai",
]
# Add Custom MCP Servers hosted on Databricks Apps
CUSTOM_MCP_SERVER_URLS = []
# 2) HELPER: convert between ResponsesAgent “message dict” and ChatCompletions format
def _to_chat_messages(msg: dict[str, Any]) -> List[dict]:
"""
Take a single ResponsesAgent‐style dict and turn it into one or more
ChatCompletions‐compatible dict entries.
"""
msg_type = msg.get("type")
if msg_type == "function_call":
return [
{
"role": "assistant",
"content": None,
"tool_calls": [
{
"id": msg["call_id"],
"type": "function",
"function": {
"name": msg["name"],
"arguments": msg["arguments"],
},
}
],
}
]
elif msg_type == "message" and isinstance(msg["content"], list):
return [
{
"role": "assistant" if msg["role"] == "assistant" else msg["role"],
"content": content["text"],
}
for content in msg["content"]
]
elif msg_type == "function_call_output":
return [
{
"role": "tool",
"content": msg["output"],
"tool_call_id": msg["tool_call_id"],
}
]
else:
# fallback for plain {"role": ..., "content": "..."} or similar
return [
{
k: v
for k, v in msg.items()
if k in ("role", "content", "name", "tool_calls", "tool_call_id")
}
]
# 3) “MCP SESSION” + TOOL‐INVOCATION LOGIC
def _make_exec_fn(
server_url: str, tool_name: str, ws: WorkspaceClient
) -> Callable[..., str]:
def exec_fn(**kwargs):
mcp_client = DatabricksMCPClient(server_url=server_url, workspace_client=ws)
response = mcp_client.call_tool(tool_name, kwargs)
return "".join([c.text for c in response.content])
return exec_fn
class ToolInfo(BaseModel):
name: str
spec: dict
exec_fn: Callable
def _fetch_tool_infos(ws: WorkspaceClient, server_url: str) -> List[ToolInfo]:
print(f"Listing tools from MCP server {server_url}")
infos: List[ToolInfo] = []
mcp_client = DatabricksMCPClient(server_url=server_url, workspace_client=ws)
mcp_tools = mcp_client.list_tools()
for t in mcp_tools:
schema = t.inputSchema.copy()
if "properties" not in schema:
schema["properties"] = {}
spec = {
"type": "function",
"function": {
"name": t.name,
"description": t.description,
"parameters": schema,
},
}
infos.append(
ToolInfo(
name=t.name, spec=spec, exec_fn=_make_exec_fn(server_url, t.name, ws)
)
)
return infos
# 4) “SINGLE‐TURN” AGENT CLASS
class SingleTurnMCPAgent(ResponsesAgent):
def _call_llm(self, history: List[dict], ws: WorkspaceClient, tool_infos):
"""
Send current history → LLM, returning the raw response dict.
"""
client = ws.serving_endpoints.get_open_ai_client()
flat_msgs = []
for msg in history:
flat_msgs.extend(_to_chat_messages(msg))
return client.chat.completions.create(
model=LLM_ENDPOINT_NAME,
messages=flat_msgs,
tools=[ti.spec for ti in tool_infos],
)
def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
ws = WorkspaceClient(profile=DATABRICKS_CLI_PROFILE)
# 1) build initial history: system + user
history: List[dict] = [{"role": "system", "content": SYSTEM_PROMPT}]
for inp in request.input:
history.append(inp.model_dump())
# 2) call LLM once
tool_infos = [
tool_info
for mcp_server_url in (MANAGED_MCP_SERVER_URLS + CUSTOM_MCP_SERVER_URLS)
for tool_info in _fetch_tool_infos(ws, mcp_server_url)
]
tools_dict = {tool_info.name: tool_info for tool_info in tool_infos}
llm_resp = self._call_llm(history, ws, tool_infos)
raw_choice = llm_resp.choices[0].message.to_dict()
raw_choice["id"] = uuid.uuid4().hex
history.append(raw_choice)
tool_calls = raw_choice.get("tool_calls") or []
if tool_calls:
# (we only support a single tool in this “single‐turn” example)
fc = tool_calls[0]
name = fc["function"]["name"]
args = json.loads(fc["function"]["arguments"])
try:
tool_info = tools_dict[name]
result = tool_info.exec_fn(**args)
except Exception as e:
result = f"Error invoking {name}: {e}"
# 4) append the “tool” output
history.append(
{
"type": "function_call_output",
"role": "tool",
"id": uuid.uuid4().hex,
"tool_call_id": fc["id"],
"output": result,
}
)
# 5) call LLM a second time and treat that reply as final
followup = (
self._call_llm(history, ws, tool_infos=[]).choices[0].message.to_dict()
)
followup["id"] = uuid.uuid4().hex
assistant_text = followup.get("content", "")
return ResponsesAgentResponse(
output=[
{
"id": uuid.uuid4().hex,
"type": "message",
"role": "assistant",
"content": [{"type": "output_text", "text": assistant_text}],
}
],
custom_outputs=request.custom_inputs,
)
# 6) if no tool_calls at all, return the assistant’s original reply
assistant_text = raw_choice.get("content", "")
return ResponsesAgentResponse(
output=[
{
"id": uuid.uuid4().hex,
"type": "message",
"role": "assistant",
"content": [{"type": "output_text", "text": assistant_text}],
}
],
custom_outputs=request.custom_inputs,
)
mlflow.models.set_model(SingleTurnMCPAgent())
if __name__ == "__main__":
req = ResponsesAgentRequest(
input=[{"role": "user", "content": "What's the 100th Fibonacci number?"}]
)
resp = SingleTurnMCPAgent().predict(req)
for item in resp.output:
print(item)
エージェントをデプロイする
管理対象 MCP サーバーに接続するエージェントをデプロイする準備ができたら、 標準のエージェントデプロイメントプロセスを使用します。
ロギング時にエージェントがアクセスする必要があるすべてのリソースを必ず指定してください。たとえば、エージェントが次の MCP サーバー URL を使用しているとします。
https://<your-workspace-hostname>/api/2.0/mcp/vector-search/prod/customer_support
https://<your-workspace-hostname>/api/2.0/mcp/vector-search/prod/billing
https://<your-workspace-hostname>/api/2.0/mcp/functions/prod/billing
エージェントが prod.customer_support
で必要とするすべてのベクトル検索インデックスと、 prod.billing
スキーマをリソースとして指定し、 prod.billing
のすべての Unity Catalog 関数を指定する必要があります。
エージェントが Databricks 上の MCP サーバーに接続してツールを検出して実行する場合は、これらの MCP サーバーに必要なリソースをエージェントに記録してください。Databricks では、このプロセスを簡略化するために、 databricks-mcp
PyPI パッケージをインストールすることをお勧めします。
特に、 マネージド MCP サーバーを使用している場合は、 databricks_mcp.DatabricksMCPClient().get_databricks_resources(<server_url>)
を使用してマネージド MCP サーバーに必要なリソースを取得できます。エージェントが Databricks アプリでホストされているカスタム MCP サーバーに対してクエリを実行する場合は、モデルのログ記録時にサーバーをリソースとして明示的に含めることで、承認を構成できます。
たとえば、上記で定義したエージェントをデプロイするには、次のスニペットを実行します。
mcp_agent.py
のエージェントコード定義:
import os
from databricks.sdk import WorkspaceClient
from databricks import agents
import mlflow
from mlflow.models.resources import DatabricksFunction, DatabricksServingEndpoint, DatabricksVectorSearchIndex
from mcp_agent import LLM_ENDPOINT_NAME
from databricks_mcp import DatabricksMCPClient
# TODO: Update this to your Databricks CLI profile name
databricks_cli_profile = "YOUR_DATABRICKS_CLI_PROFILE"
assert (
databricks_cli_profile != "YOUR_DATABRICKS_CLI_PROFILE"
), "Set databricks_cli_profile to the Databricks CLI profile name you specified when configuring authentication to the workspace"
workspace_client = WorkspaceClient(profile=databricks_cli_profile)
# Configure MLflow and the Databricks SDK to use your Databricks CLI profile
current_user = workspace_client.current_user.me().user_name
mlflow.set_tracking_uri(f"databricks://{databricks_cli_profile}")
mlflow.set_registry_uri(f"databricks-uc://{databricks_cli_profile}")
mlflow.set_experiment(f"/Users/{current_user}/databricks_docs_example_mcp_agent")
os.environ["DATABRICKS_CONFIG_PROFILE"] = databricks_cli_profile
MANAGED_MCP_SERVER_URLS = [
f"{host}/api/2.0/mcp/functions/system/ai",
]
# Log the agent defined in mcp_agent.py
here = os.path.dirname(os.path.abspath(__file__))
agent_script = os.path.join(here, "mcp_agent.py")
resources = [
DatabricksServingEndpoint(endpoint_name=LLM_ENDPOINT_NAME),
DatabricksFunction("system.ai.python_exec"),
# --- Uncomment and edit the following lines to include custom mcp servers hosted on Databricks Apps ---
# DatabricksApp(app_name="app-name")
]
for mcp_server_url in MANAGED_MCP_SERVER_URLS:
mcp_client = DatabricksMCPClient(server_url=mcp_server_url, workspace_client=workspace_client)
resources.extend(mcp_client.get_databricks_resources())
with mlflow.start_run():
logged_model_info = mlflow.pyfunc.log_model(
artifact_path="mcp_agent",
python_model=agent_script,
resources=resources,
)
# TODO Specify your UC model name here
UC_MODEL_NAME = "main.default.databricks_docs_mcp_agent"
registered_model = mlflow.register_model(logged_model_info.model_uri, UC_MODEL_NAME)
agents.deploy(
model_name=UC_MODEL_NAME,
model_version=registered_model.version,
)