%md # Mosaic AI Agent Framework: Author and deploy a tool-calling AutoGen agent This notebook demonstrates how to author an AutoGen agent that's compatible with Mosaic AI Agent Framework features. In this notebook, you learn to: - Author a tool-calling AutoGen agent wrapped with `ChatAgent` with custom inputs and outputs - Manually test the agent's output - Evaluate the agent using Mosaic AI Agent Evaluation - Log and deploy the agent To learn more about authoring an agent using Mosaic AI Agent Framework, see Databricks documentation ([AWS](https://docs.databricks.com/aws/generative-ai/agent-framework/author-agent) | [Azure](https://learn.microsoft.com/azure/databricks/generative-ai/agent-framework/create-chat-model)). ## Prerequisites - A cluster that has access to both Unity Catalog and the ML Runtime. - Permissions are required to create models within a catalog and schema in Unity Catalog. - Permission to create a serving endpoint.
Mosaic AI Agent Framework: Author and deploy a tool-calling AutoGen agent
This notebook demonstrates how to author an AutoGen agent that's compatible with Mosaic AI Agent Framework features. In this notebook, you learn to:
- Author a tool-calling AutoGen agent wrapped with
ChatAgent
with custom inputs and outputs - Manually test the agent's output
- Evaluate the agent using Mosaic AI Agent Evaluation
- Log and deploy the agent
To learn more about authoring an agent using Mosaic AI Agent Framework, see Databricks documentation (AWS | Azure).
Prerequisites
- A cluster that has access to both Unity Catalog and the ML Runtime.
- Permissions are required to create models within a catalog and schema in Unity Catalog.
- Permission to create a serving endpoint.
!pip install autogen-agentchat==0.2.40 unitycatalog-autogen[databricks] databricks-ai-bridge uv databricks-agents dbutils.library.restartPython()
%md ## Define the agent in code Define the agent code in a single cell below. This lets you easily write the agent code to a local Python file, using the `%%writefile` magic command, for subsequent logging and deployment. #### Agent tools This agent code adds the built-in Unity Catalog function `system.ai.python_exec` to the agent. The agent code also includes commented-out sample code for adding a vector search index to perform unstructured data retrieval. For more examples of tools to add to your agent, see Databricks documentation ([AWS](https://docs.databricks.com/aws/generative-ai/agent-framework/agent-tool) | [Azure](https://learn.microsoft.com/en-us/azure/databricks/generative-ai/agent-framework/agent-tool) | [GCP](https://docs.databricks.com/gcp/en/generative-ai/agent-framework/agent-tool)) #### Wrap the AutoGen agent using the `ChatAgent` interface For compatibility with Databricks AI features, the `AutogenAgent` class implements the `ChatAgent` interface to wrap the AutoGen agent. The `predict_stream` method is not implemented in ChatAgent because streaming is not supported in Autogen version 0.2 for custom models. Databricks recommends using `ChatAgent` as it simplifies authoring multi-turn conversational agents using an open source standard. See MLflow's [ChatAgent documentation](https://mlflow.org/docs/latest/python_api/mlflow.pyfunc.html#mlflow.pyfunc.ChatAgent).
Define the agent in code
Define the agent code in a single cell below. This lets you easily write the agent code to a local Python file, using the %%writefile
magic command, for subsequent logging and deployment.
Agent tools
This agent code adds the built-in Unity Catalog function system.ai.python_exec
to the agent. The agent code also includes commented-out sample code for adding a vector search index to perform unstructured data retrieval.
For more examples of tools to add to your agent, see Databricks documentation (AWS | Azure | GCP)
Wrap the AutoGen agent using the ChatAgent
interface
For compatibility with Databricks AI features, the AutogenAgent
class implements the ChatAgent
interface to wrap the AutoGen agent. The predict_stream
method is not implemented in ChatAgent because streaming is not supported in Autogen version 0.2 for custom models.
Databricks recommends using ChatAgent
as it simplifies authoring multi-turn conversational agents using an open source standard. See MLflow's ChatAgent documentation.
%%writefile databricks_model_serving_client.py from databricks.sdk import WorkspaceClient from databricks.sdk.service.serving import ChatMessage, ChatMessageRole from typing import List, Optional # This client is designed to interact with Databricks Serving Endpoints for Foundational Models. # With this client created, users no longer need to worry about authentication through environment variables. # The authentication process is handled via the workspace client. # The client was created following this guide: https://microsoft.github.io/autogen/0.2/blog/2024/01/26/Custom-Models. class DatabricksModelServingClient: def __init__(self, config, **kwargs): self.workspace = WorkspaceClient() self.openai_client = self.workspace.serving_endpoints.get_open_ai_client() self.endpoint_name = config.get("endpoint_name") self.llm_config = config.get("llm_config") self.config = config def transform_messages(self, input_data, remove_keys=['id']): output_messages = input_data['messages'].copy() for message in output_messages: for key in remove_keys: if key in message: message.pop(key) return output_messages def create(self, input_data): # Remove specific fields from the messages, as the serving endpoint does not expect them. input_messages = self.transform_messages(input_data) # Calling DBS Serving Endpoint using openai client. response = self.openai_client.chat.completions.create( model=self.endpoint_name, messages=input_messages, tools=input_data["tools"], **self.llm_config ) return response def message_retrieval(self, response): # Process and return messages from the response return [choice.message for choice in response.choices] def cost(self, response): # Implement cost calculation if applicable return 0 def get_usage(self, response): usage = response.usage # Implement usage statistics if available return {"prompt_tokens": usage.prompt_tokens, "total_tokens": usage.total_tokens, "completion_tokens": usage.completion_tokens}
%%writefile autogen_agentchat.py from random import randint import json import os import uuid from databricks_model_serving_client import DatabricksModelServingClient from autogen import ConversableAgent from autogen import register_function from mlflow.types.agent import ChatAgentRequest, ChatAgentMessage, ChatContext, ChatAgentResponse from typing import Optional, Any from mlflow.pyfunc import ChatAgent import mlflow from unitycatalog.ai.core.databricks import DatabricksFunctionClient from unitycatalog.ai.autogen.toolkit import UCFunctionToolkit, AutogenTool from distutils.util import strtobool def parse_bool_robust(value): return bool(strtobool(value)) #By default enable traces, # if 'MLFLOW_LOG_TRACES' is available as env variable then use it to set the flag. log_traces_flag = True if 'MLFLOW_LOG_TRACES' in os.environ: log_traces_flag = parse_bool_robust(os.environ['MLFLOW_LOG_TRACES']) mlflow.autogen.autolog(log_traces=log_traces_flag) ############################################################################### ## Define tools for your agent, enabling it to retrieve data or take actions ## beyond text generation ## To create and see usage examples of more tools, see ## https://docs.databricks.com/en/generative-ai/agent-framework/agent-tool.html ############################################################################### client = DatabricksFunctionClient() # Create a tool that generates random numbers and provides customized output along with attachments. def generate_random_ints(min: int, max: int, size: int) -> dict[str, Any]: """Generate size random ints in the range [min, max].""" attachments = {"min": str(min), "max": str(max)} custom_outputs = [randint(min, max) for _ in range(size)] content = f"Successfully generated array of {size} random ints in [{min}, {max}]." return { "content": content, "attachments": attachments, "custom_outputs": {"random_nums": custom_outputs}, } # Create a mock tool that provides the weather information for a city in California. def weather_in_california_city(city: str) -> str: """Get the weather description of a city in California.""" return f"The weather in {city} is sunny." tools = [generate_random_ints, weather_in_california_city] # Adding a pre-built tool from the Unity Catalog. uc_tool_names = ["system.ai.python_exec"] uc_toolkit = UCFunctionToolkit(function_names=uc_tool_names, client=client) tools.extend(uc_toolkit.tools) # Use Databricks vector search indexes as tools # See https://docs.databricks.com/en/generative-ai/agent-framework/unstructured-retrieval-tools.html # for details # TODO: Add vector search indexes # vector_search_tools = [ # VectorSearchRetrieverTool( # index_name="", # # filters="..." # ) # ] # tools.extend(vector_search_tools) # Please update this to reflect your specific endpoint name. # The name of the serving endpoint available in Databricks must be specified. LLM_ENDPOINT_NAME = "databricks-meta-llama-3-3-70b-instruct" # Developing the ChatAgent that encapsulates the Autogen Agent. class AutogenAgent(ChatAgent): def __init__(self, tools = None, model_name: str = "autogen_agent"): self.model_name = model_name self.tools = tools if tools else [] def _create_agents(self, chat_history): def _is_termination_message(message): content = message.get("content", "") return (content and "TERMINATE" in content.upper()) or (message['role'] == 'user' and 'tool_calls' not in message) # The user proxy agent is used for interacting with the assistant agent # and executes tool calls. user_proxy = ConversableAgent( name="User", llm_config=False, is_termination_msg=_is_termination_message, human_input_mode="NEVER", ) # Defining a custom LLM configuration that utilizes the Databricks Model Serving Client. config_list = [{ "model_client_cls": "DatabricksModelServingClient", "model": self.model_name, "endpoint_name": LLM_ENDPOINT_NAME, "llm_config": {"max_tokens": 1500, "temperature": 0.01}}] # The actual agent that interacts with the large language model (LLM). assistant = ConversableAgent( name="Assistant", system_message="You are a helpful assistant with various tools available. Use tools as the primary source of information to assist with the user's question, if not possible then use your generation capacity.", llm_config={"config_list": config_list, "cache_seed": None, "stream": True}, chat_messages={user_proxy: chat_history} ) # Adding tools to the assistant agent and making the user proxy the executor of those tools. for tool in self.tools: if isinstance(tool, AutogenTool): tool.register_function(callers = assistant, executors = user_proxy ) else: register_function( tool, caller=assistant, # The assistant agent can suggest calls to the tools. executor=user_proxy, # The user proxy agent can execute the tool calls. description=tool.__doc__, # Extracting the tool's docstring as the description. ) # Register the custom Databricks Model Serving Client in the assistant. assistant.register_model_client(model_client_cls=DatabricksModelServingClient) return assistant, user_proxy # Create a tool message and extract any available custom outputs. def _convert_tool_message(self, message): tool_response = message['tool_responses'][0] tool_message = {'role': 'tool', 'name': message['name'], 'tool_call_id': tool_response['tool_call_id']} custom_outputs = None try: tool_response_content = json.loads(tool_response["content"]) if "custom_outputs" in tool_response_content: custom_outputs = tool_response_content["custom_outputs"] tool_message['attachments'] = tool_response_content['attachments'] content = tool_response_content['content'] except Exception: content = tool_response['content'] tool_message['content'] = content return tool_message, custom_outputs # Convert AutoGen messages to ChatAgent messages. def _convert_to_chat_messages(self, messages_generated): output_messages = [] custom_outputs = None for message in messages_generated: if message['role'] == 'tool': message, custom_outputs = self._convert_tool_message(message) message['id'] = str(uuid.uuid4()) output_messages.append(ChatAgentMessage(**message)) return output_messages, custom_outputs def predict( self, messages: list[ChatAgentMessage], context: Optional[ChatContext] = None, custom_inputs: Optional[dict[str, Any]] = None, ) -> ChatAgentResponse: request = {"messages": self._convert_messages_to_dict(messages)} # Extract the last message from the conversation and retain all # previous messages except for the last one in the history. last_message = request["messages"][-1] chat_history = request["messages"][:-1] # Create the agents and execute the chat completion. assistant, user_proxy = self._create_agents(chat_history) model_response = user_proxy.initiate_chat(assistant, message=last_message['content'], max_turns=5, clear_history=False) #Using chat_messages from assistant because model_response.chat_history #is written from the perspective of the assistant, where the assistant #acts as the user and the user takes on the role of the assistant. messages_generated = assistant.chat_messages[user_proxy][len(request["messages"]):] output_messages, custom_outputs = self._convert_to_chat_messages(messages_generated) return ChatAgentResponse(messages=output_messages, custom_outputs=custom_outputs) autogent_model = AutogenAgent(tools) mlflow.models.set_model(autogent_model)
%md ## Test the agent Interact with the agent to test its output and tool-calling abilities. Since this notebook called `mlflow.autogen.autolog()`, you can view the trace for each step the agent takes. Testing one-turn and multi-turn scenarios.
Test the agent
Interact with the agent to test its output and tool-calling abilities. Since this notebook called mlflow.autogen.autolog()
, you can view the trace for each step the agent takes.
Testing one-turn and multi-turn scenarios.
dbutils.library.restartPython()
# Import the model from the file that was just created. from autogen_agentchat import autogent_model
# TODO: Replace this placeholder inputs with an appropriate domain-specific example for your agent. single_turn_example = [{"role": "user", "content": "What is 10 + 10?"}] multi_turn_example = [{"role": "user", "content": "What is 10 + 10?"}, {"role": "assistant", "content": "10 + 10 = 20"}, {"role": "user", "content": "Is that number greater or lower than 50?"}] single_turn_tc_example = [{"role": "user", "content": "Can you generate ten random numbers between 10 and 100?"}]
autogent_model.predict({"messages": single_turn_example})
autogent_model.predict({"messages": multi_turn_example})
autogent_model.predict({"messages": single_turn_tc_example})
%md ## Log the agent as an MLflow model Log the agent as code from the `autogen_agentchat.py` file. See [MLflow - Models from Code](https://mlflow.org/docs/latest/models.html#models-from-code). ### Enable automatic authentication for Databricks resources For the most common Databricks resource types, Databricks supports and recommends declaring resource dependencies for the agent upfront during logging. This enables automatic authentication passthrough when you deploy the agent. With automatic authentication passthrough, Databricks automatically provisions, rotates, and manages short-lived credentials to securely access these resource dependencies from within the agent endpoint. To enable automatic authentication, specify the dependent Databricks resources when calling `mlflow.pyfunc.log_model().` - **TODO**: If your Unity Catalog tool queries a [vector search index](docs link) or leverages [external functions](docs link), you need to include the dependent vector search index and UC connection objects, respectively, as resources. See docs ([AWS](https://docs.databricks.com/generative-ai/agent-framework/log-agent.html#specify-resources-for-automatic-authentication-passthrough) | [Azure](https://learn.microsoft.com/azure/databricks/generative-ai/agent-framework/log-agent#resources)).
Log the agent as an MLflow model
Log the agent as code from the autogen_agentchat.py
file. See MLflow - Models from Code.
Enable automatic authentication for Databricks resources
For the most common Databricks resource types, Databricks supports and recommends declaring resource dependencies for the agent upfront during logging. This enables automatic authentication passthrough when you deploy the agent. With automatic authentication passthrough, Databricks automatically provisions, rotates, and manages short-lived credentials to securely access these resource dependencies from within the agent endpoint.
To enable automatic authentication, specify the dependent Databricks resources when calling mlflow.pyfunc.log_model().
- TODO: If your Unity Catalog tool queries a vector search index or leverages external functions, you need to include the dependent vector search index and UC connection objects, respectively, as resources. See docs (AWS | Azure).
import mlflow import os from autogen_agentchat import tools, LLM_ENDPOINT_NAME from mlflow.models.resources import DatabricksFunction, DatabricksServingEndpoint from unitycatalog.ai.autogen.toolkit import AutogenTool resources = [DatabricksServingEndpoint(endpoint_name=LLM_ENDPOINT_NAME)] for tool in tools: if isinstance(tool, AutogenTool): resources.append(DatabricksFunction(function_name=tool.name.replace("__", "."))) os.environ['MLFLOW_LOG_TRACES'] = "False" with mlflow.start_run() as run: logged_agent_info = mlflow.pyfunc.log_model( python_model=os.path.join(os.getcwd(), 'autogen_agentchat.py'), code_paths=["databricks_model_serving_client.py"], artifact_path="agent", pip_requirements = ["autogen-agentchat==0.2.40", "unitycatalog-autogen[databricks]", "databricks-ai-bridge"], resources=resources ) os.environ['MLFLOW_LOG_TRACES'] = "True"
%md ## Evaluate the agent with Agent Evaluation Use Mosaic AI Agent Evaluation to evalaute the agent's responses based on expected responses and other evaluation criteria. Use the evaluation criteria you specify to guide iterations, using MLflow to track the computed quality metrics. See Databricks documentation ([AWS]((https://docs.databricks.com/aws/generative-ai/agent-evaluation) | [Azure](https://learn.microsoft.com/azure/databricks/generative-ai/agent-evaluation/)). To evaluate your tool calls, add custom metrics. See Databricks documentation ([AWS](https://docs.databricks.com/en/generative-ai/agent-evaluation/custom-metrics.html#evaluating-tool-calls) | [Azure](https://learn.microsoft.com/en-us/azure/databricks/generative-ai/agent-evaluation/custom-metrics#evaluating-tool-calls)).
Evaluate the agent with Agent Evaluation
Use Mosaic AI Agent Evaluation to evalaute the agent's responses based on expected responses and other evaluation criteria. Use the evaluation criteria you specify to guide iterations, using MLflow to track the computed quality metrics. See Databricks documentation (AWS | Azure).
To evaluate your tool calls, add custom metrics. See Databricks documentation (AWS | Azure).
import pandas as pd # Please include additional examples, either manually or automatically, if necessary. eval_examples = [ { "request": {"messages": [{"role": "user", "content": "What is an LLM agent?"}]}, "expected_response": None, } ] eval_dataset = pd.DataFrame(eval_examples) display(eval_dataset)
import mlflow with mlflow.start_run(run_id=logged_agent_info.run_id): eval_results = mlflow.evaluate( f"runs:/{logged_agent_info.run_id}/agent", data=eval_dataset, # Your evaluation dataset model_type="databricks-agent", # Enable Mosaic AI Agent Evaluation ) # Review the evaluation results in the MLFLow UI (see console output), or access them in place: display(eval_results.tables['eval_results'])
%md ## Pre-deployment agent validation Before registering and deploying the agent, perform pre-deployment checks using the [mlflow.models.predict()](https://mlflow.org/docs/latest/python_api/mlflow.models.html#mlflow.models.predict) API. See Databricks documentation ([AWS](https://docs.databricks.com/en/machine-learning/model-serving/model-serving-debug.html#validate-inputs) | [Azure](https://learn.microsoft.com/en-us/azure/databricks/machine-learning/model-serving/model-serving-debug#before-model-deployment-validation-checks)).
Pre-deployment agent validation
Before registering and deploying the agent, perform pre-deployment checks using the mlflow.models.predict() API. See Databricks documentation (AWS | Azure).
mlflow.models.predict( model_uri=f"runs:/{logged_agent_info.run_id}/agent", input_data={"messages": [{"role": "user", "content": "Hello!"}]}, env_manager="uv" )
%md ## Register the model to Unity Catalog Before you deploy the agent, you must register the agent to Unity Catalog. - **TODO** Update the `catalog`, `schema`, and `model_name` below to register the MLflow model to Unity Catalog.
Register the model to Unity Catalog
Before you deploy the agent, you must register the agent to Unity Catalog.
- TODO Update the
catalog
,schema
, andmodel_name
below to register the MLflow model to Unity Catalog.
mlflow.set_registry_uri("databricks-uc") # TODO: define the catalog, schema, and model name for your UC model catalog = "" schema = "" model_name = "" UC_MODEL_NAME = f"{catalog}.{schema}.{model_name}" # register the model to UC uc_registered_model_info = mlflow.register_model( model_uri=logged_agent_info.model_uri, name=UC_MODEL_NAME )
%md ## Deploy the agent
Deploy the agent
from databricks import agents agents.deploy(UC_MODEL_NAME, uc_registered_model_info.version, tags = {"endpointSource": "docs"})
%md ## Next steps After your agent is deployed, you can chat with it in AI playground to perform additional checks, share it with SMEs in your organization for feedback, and embed it in a production application. See Databricks documentation ([AWS](https://docs.databricks.com/en/generative-ai/deploy-agent.html) | [Azure](https://learn.microsoft.com/en-us/azure/databricks/generative-ai/deploy-agent)).