databricks-logo

openai-pyfunc-responses-tool-calling-agent

(Python)
Loading...

Mosaic AI Agent Framework: Author and deploy a tool-calling OpenAI Responses API agent

This notebook demonstrates how to author an OpenAI agent that's compatible with Mosaic AI Agent Framework features. In this notebook you learn to:

  • Author a tool-calling ChatAgent that uses the Open AI Responses API
  • Manually test the agent's output
  • Evaluate the agent using Mosaic AI Agent Evaluation
  • Log and deploy the agent

Note: This notebook queries the OpenAI REST API directly. For governance, payload logging, and other Databricks AI Gateway functionality, use Databricks external models (AWS | Azure). For an example that uses Databricks external models to query OpenAI models, see the OpenAI tool-calling agent notebook.

To learn more about authoring an agent using Mosaic AI Agent Framework, see Databricks documentation (AWS | Azure).

Prerequisites

  • Create a Databricks secret with your OpenAI API key (AWS | Azure)
  • Address all TODOs in this notebook.
2
%pip install -U -qqqq mlflow backoff databricks-openai openai databricks-agents uv
dbutils.library.restartPython()

Define the agent in code

Define the agent code in a single cell below. This lets you easily write the agent code to a local Python file, using the %%writefile magic command, for subsequent logging and deployment.

Agent tools

This agent code adds the built-in Unity Catalog function system.ai.python_exec to the agent. The agent code also includes commented-out sample code for adding a vector search index to perform unstructured data retrieval.

For more examples of tools to add to your agent, see Databricks documentation (AWS | Azure)

4
%%writefile agent.py
import json
from typing import Any, Callable, Generator, Optional, Union
from uuid import uuid4

import backoff
import mlflow
import openai
from databricks_openai import UCFunctionToolkit, VectorSearchRetrieverTool
from mlflow.entities import SpanType
from mlflow.pyfunc import ChatAgent
from mlflow.types.agent import (
    ChatAgentChunk,
    ChatAgentMessage,
    ChatAgentResponse,
    ChatContext,
)
from openai import OpenAI
from openai.types.chat import ChatCompletionToolParam
from openai.types.responses import (
    ResponseFunctionToolCall,
    ResponseOutputItem,
    ResponseOutputMessage,
)
from pydantic import BaseModel
from unitycatalog.ai.core.base import get_uc_function_client

############################################
# Define your LLM endpoint and system prompt
############################################
# TODO: Replace with your desired OpenAI model
# Databricks does not yet support the Responses API
LLM_ENDPOINT_NAME = "gpt-4o"

# TODO: Update with your system prompt
SYSTEM_PROMPT = """
You are a helpful assistant.
"""

###############################################################################
## Define tools for your agent, enabling it to retrieve data or take actions
## beyond text generation
## To create and see usage examples of more tools, see
## https://docs.databricks.com/en/generative-ai/agent-framework/agent-tool.html
###############################################################################
class ToolInfo(BaseModel):
    """
    Class representing a tool for the agent.
    - "name" (str): The name of the tool.
    - "spec" (dict): JSON description of the tool (matches OpenAI Responses format)
    - "exec_fn" (Callable): Function that implements the tool logic
    """

    name: str
    spec: dict
    exec_fn: Callable


def convert_chat_completion_tool_to_tool_info(
    spec: ChatCompletionToolParam, exec_fn: Optional[Callable] = None
):
    """Converts a ChatCompletionToolParam to a ToolInfo object."""
    spec |= spec.pop("function")
    if exec_fn is None:

        def exec_fn(**kwargs):
            udf_name = spec["name"].replace("__", ".")
            function_result = uc_function_client.execute_function(udf_name, kwargs)
            if function_result.error is not None:
                return function_result.error
            else:
                return function_result.value

    return ToolInfo(name=spec["name"], spec=spec, exec_fn=exec_fn)


TOOL_INFOS = []

# You can use UDFs in Unity Catalog as agent tools
# Below, we add the `system.ai.python_exec` UDF, which provides
# a python code interpreter tool to our agent

# TODO: Add additional tools
UC_TOOL_NAMES = ["system.ai.python_exec"]

uc_toolkit = UCFunctionToolkit(function_names=UC_TOOL_NAMES)
uc_function_client = get_uc_function_client()
for tool_spec in uc_toolkit.tools:
    TOOL_INFOS.append(convert_chat_completion_tool_to_tool_info(tool_spec))


# Use Databricks vector search indexes as tools
# See https://docs.databricks.com/en/generative-ai/agent-framework/unstructured-retrieval-tools.html#locally-develop-vector-search-retriever-tools-with-ai-bridge
# for details
VECTOR_SEARCH_TOOLS = []

# TODO: Add vector search indexes
# VECTOR_SEARCH_TOOLS.append(
#     VectorSearchRetrieverTool(
#         index_name="",
#         # filters="..."
#     )
# )
for vs_tool in VECTOR_SEARCH_TOOLS:
    TOOL_INFOS.append(
        convert_chat_completion_tool_to_tool_info(vs_tool.tool, vs_tool.execute)
    )


class ToolCallingAgent(ChatAgent):
    """
    Class representing a tool-calling Agent
    """

    def __init__(self, llm_endpoint: str, tools: list[ToolInfo]):
        """
        Initializes the ToolCallingAgent with tools.
        """
        super().__init__()
        self.llm_endpoint = llm_endpoint
        self.client: OpenAI = OpenAI()
        self._tools_dict = {tool.name: tool for tool in tools}

    def get_tool_specs(self) -> list[dict]:
        """
        Returns tool specifications in the format OpenAI expects.
        """
        return [tool_info.spec for tool_info in self._tools_dict.values()]

    @mlflow.trace(span_type=SpanType.TOOL)
    def execute_tool(self, tool_name: str, args: dict) -> Any:
        """
        Executes the specified tool with the given arguments.
        """
        if tool_name not in self._tools_dict:
            raise ValueError(f"Unknown tool: {tool_name}")
        return self._tools_dict[tool_name].exec_fn(**args)

    def prepare_messages_for_llm(
        self,
        messages: list[Union[ChatAgentMessage, ResponseOutputItem, dict[str, Any]]],
    ) -> list[Union[dict[str, Any], ResponseOutputItem]]:
        """Filter out ChatAgentMessage fields that are not compatible with LLM message formats"""
        compatible_keys = ["role", "content", "name", "tool_calls", "tool_call_id"]
        return [
            {
                k: v
                for k, v in m.model_dump_compat(exclude_none=True).items()
                if k in compatible_keys
            }
            if isinstance(m, ChatAgentMessage)
            else m
            for m in messages
        ]

    def convert_openai_response_output_to_chat_agent_msg(
        self, output: ResponseOutputItem
    ) -> ChatAgentMessage:
        """Converts an OpenAI ResponseOutputItem to a ChatAgentMessage."""
        if isinstance(output, ResponseFunctionToolCall):
            return ChatAgentMessage(
                **{
                    "role": "assistant",
                    "id": output.id,
                    "content": "",
                    "tool_calls": [
                        {
                            "id": output.call_id,
                            "type": "function",
                            "function": {
                                "name": output.name,
                                "arguments": output.arguments,
                            },
                        }
                    ],
                }
            )
        elif isinstance(output, ResponseOutputMessage):
            return ChatAgentMessage(
                role=output.role, content=output.content[0].text, id=output.id
            )
        else:
            raise NotImplementedError("Add more parsing for other output types")

    @backoff.on_exception(backoff.expo, openai.RateLimitError)
    @mlflow.trace(span_type=SpanType.LLM)
    def chat_completion(
        self, messages: list[Union[ChatAgentMessage, ResponseOutputItem, dict]]
    ) -> ResponseOutputItem:
        return self.client.responses.create(
            model=self.llm_endpoint,
            input=self.prepare_messages_for_llm(messages),
            tools=self.get_tool_specs(),
        ).output[0]

    def handle_tool_call(
        self,
        llm_output: ResponseFunctionToolCall,
        current_msg_history: list[
            Union[ChatAgentMessage, ResponseOutputItem, dict[str, Any]]
        ],
    ) -> ChatAgentMessage:
        """
        Execute tool calls, add them to the running message history, and return a tool ChatAgentMessage
        """
        args = json.loads(llm_output.arguments)
        result = str(self.execute_tool(tool_name=llm_output.name, args=args))

        # format from step 4 https://platform.openai.com/docs/guides/function-calling#function-calling-steps
        openai_response_tool_msg = {
            "type": "function_call_output",
            "call_id": llm_output.call_id,
            "output": result,
        }
        current_msg_history.append(openai_response_tool_msg)
        return ChatAgentMessage(
            role="tool",
            name=llm_output.name,
            tool_call_id=llm_output.call_id,
            content=result,
            id=str(uuid4()),
        )

    def call_and_run_tools(
        self,
        messages: list[Union[ChatAgentMessage, ResponseOutputItem, dict[str, Any]]],
        max_iter: int = 10,
    ) -> Generator[ChatAgentMessage, None, None]:
        for i in range(max_iter):
            llm_output = self.chat_completion(messages=messages)
            messages.append(llm_output)
            yield self.convert_openai_response_output_to_chat_agent_msg(llm_output)

            if not isinstance(llm_output, ResponseFunctionToolCall):
                return  # Stop streaming if no tool calls are needed

            yield self.handle_tool_call(llm_output, messages)

        yield ChatAgentMessage(
            content=f"I'm sorry, I couldn't determine the answer after trying {max_iter} times.",
            role="assistant",
            id=str(uuid4()),
        )

    @mlflow.trace(span_type=SpanType.AGENT)
    def predict(
        self,
        messages: list[ChatAgentMessage],
        context: Optional[ChatContext] = None,
        custom_inputs: Optional[dict[str, Any]] = None,
    ) -> ChatAgentResponse:
        """
        Primary function that takes a user's request and generates a response.
        """
        # NOTE: this assumes that each chunk streamed by self.call_and_run_tools contains
        # a full message (i.e. chunk.delta is a complete message).
        # This is simple to implement, but you can also stream partial response messages from predict_stream,
        # and aggregate them in predict_stream by message ID
        response_messages = [
            chunk.delta
            for chunk in self.predict_stream(messages, context, custom_inputs)
        ]
        return ChatAgentResponse(messages=response_messages)

    @mlflow.trace(span_type=SpanType.AGENT)
    def predict_stream(
        self,
        messages: list[ChatAgentMessage],
        context: Optional[ChatContext] = None,
        custom_inputs: Optional[dict[str, Any]] = None,
    ) -> Generator[ChatAgentChunk, None, None]:
        if len(messages) == 0:
            raise ValueError("`messages` must contain at least one message")
        all_messages = [
            ChatAgentMessage(role="system", content=SYSTEM_PROMPT)
        ] + messages

        for message in self.call_and_run_tools(messages=all_messages):
            yield ChatAgentChunk(delta=message)

# Log the model using MLflow
mlflow.openai.autolog()
AGENT = ToolCallingAgent(llm_endpoint=LLM_ENDPOINT_NAME, tools=TOOL_INFOS)
mlflow.models.set_model(AGENT)

Test the agent

Interact with the agent to test its output. Since we manually traced methods within ChatAgent, you can view the trace for each step the agent takes, with any LLM calls made via the OpenAI SDK automatically traced by autologging.

Replace this placeholder input with an appropriate domain-specific example for your agent.

6
dbutils.library.restartPython()
7
import os

# TODO: set secret_scope_name and secret_key_name to access your OpenAI API key
secret_scope_name = ""
secret_key_name = ""
os.environ["OPENAI_API_KEY"] = dbutils.secrets.get(
    scope=secret_scope_name, key=secret_key_name
)
assert os.environ["OPENAI_API_KEY"] is not None, ("The OPENAI_API_KEY env var was not properly set")
8
from agent import AGENT

AGENT.predict({"messages": [{"role": "user", "content": "What is 4*3 in Python?"}]})
9
for chunk in AGENT.predict_stream(
    {"messages": [{"role": "user", "content": "What is 4*3 in python?"}]}
):
    print(chunk, "-----------\n")

Log the agent as an MLflow model

Log the agent as code from the agent.py file. See MLflow - Models from Code.

Enable automatic authentication for Databricks resources

For the most common Databricks resource types, Databricks supports and recommends declaring resource dependencies for the agent upfront during logging. This enables automatic authentication passthrough when you deploy the agent. With automatic authentication passthrough, Databricks automatically provisions, rotates, and manages short-lived credentials to securely access these resource dependencies from within the agent endpoint.

To enable automatic authentication, specify the dependent Databricks resources when calling mlflow.pyfunc.log_model().

  • TODO: If your Unity Catalog tool queries a vector search index or leverages external functions, you need to include the dependent vector search index and UC connection objects, respectively, as resources. See docs (AWS | Azure).
11
# Determine Databricks resources to specify for automatic auth passthrough at deployment time
from agent import UC_TOOL_NAMES, VECTOR_SEARCH_TOOLS
import mlflow
from mlflow.models.resources import DatabricksFunction

resources = []
for tool in VECTOR_SEARCH_TOOLS:
    resources.extend(tool.resources)
for tool_name in UC_TOOL_NAMES:
    resources.append(DatabricksFunction(function_name=tool_name))

with mlflow.start_run():
    logged_agent_info = mlflow.pyfunc.log_model(
        artifact_path="agent",
        python_model="agent.py",
        pip_requirements=[
            "mlflow",
            "backoff",
            "databricks-openai",
        ],
        resources=resources,
    )

Evaluate the agent with Agent Evaluation

Use Mosaic AI Agent Evaluation to evalaute the agent's responses based on expected responses and other evaluation criteria. Use the evaluation criteria you specify to guide iterations, using MLflow to track the computed quality metrics. See Databricks documentation (AWS | Azure).

To evaluate your tool calls, add custom metrics. See Databricks documentation (AWS | Azure).

13
import pandas as pd

eval_examples = [
    {
        "request": {"messages": [{"role": "user", "content": "What is an LLM agent?"}]},
        "expected_response": None,
    }
]

eval_dataset = pd.DataFrame(eval_examples)
display(eval_dataset)
14
import mlflow

with mlflow.start_run(run_id=logged_agent_info.run_id):
    eval_results = mlflow.evaluate(
        f"runs:/{logged_agent_info.run_id}/agent",
        data=eval_dataset,  # Your evaluation dataset
        model_type="databricks-agent",  # Enable Mosaic AI Agent Evaluation
    )

# Review the evaluation results in the MLFLow UI (see console output), or access them in place:
display(eval_results.tables["eval_results"])

Pre-deployment agent validation

Before registering and deploying the agent, perform pre-deployment checks using the mlflow.models.predict() API. See Databricks documentation (AWS | Azure).

16
mlflow.models.predict(
    model_uri=f"runs:/{logged_agent_info.run_id}/agent",
    input_data={"messages": [{"role": "user", "content": "Hello!"}]},
    env_manager="uv",
)

Register the model to Unity Catalog

Before you deploy the agent, you must register the agent to Unity Catalog.

  • TODO Update the catalog, schema, and model_name below to register the MLflow model to Unity Catalog.
18
mlflow.set_registry_uri("databricks-uc")

# TODO: define the catalog, schema, and model name for your UC model
catalog = ""
schema = ""
model_name = ""
UC_MODEL_NAME = f"{catalog}.{schema}.{model_name}"

# register the model to UC
uc_registered_model_info = mlflow.register_model(model_uri=logged_agent_info.model_uri, name=UC_MODEL_NAME)

Deploy the agent

20
from databricks import agents

agents.deploy(
    UC_MODEL_NAME,
    uc_registered_model_info.version,
    tags={"endpointSource": "docs"},
    environment_vars={
        "OPENAI_API_KEY": f"{{{{secrets/{secret_scope_name}/{secret_key_name}}}}}"
    },
)

Next steps

After your agent is deployed, you can chat with it in AI playground to perform additional checks, share it with SMEs in your organization for feedback, or embed it in a production application. See docs (AWS | Azure) for details

;