databricks-logo

autogen-tool-calling-agent

(Python)
Loading...

Mosaic AI Agent Framework: Author and deploy a tool-calling AutoGen agent

This notebook demonstrates how to author an AutoGen agent that's compatible with Mosaic AI Agent Framework features. In this notebook, you learn to:

  • Author a tool-calling AutoGen agent wrapped with ChatAgent with custom inputs and outputs
  • Manually test the agent's output
  • Evaluate the agent using Mosaic AI Agent Evaluation
  • Log and deploy the agent

To learn more about authoring an agent using Mosaic AI Agent Framework, see Databricks documentation (AWS | Azure).

Prerequisites

  • A cluster that has access to both Unity Catalog and the ML Runtime.
  • Permissions are required to create models within a catalog and schema in Unity Catalog.
  • Permission to create a serving endpoint.
Installing python libraries
!pip install autogen-agentchat==0.2.40 unitycatalog-autogen[databricks] databricks-ai-bridge uv databricks-agents
dbutils.library.restartPython()

Define the agent in code

Define the agent code in a single cell below. This lets you easily write the agent code to a local Python file, using the %%writefile magic command, for subsequent logging and deployment.

Agent tools

This agent code adds the built-in Unity Catalog function system.ai.python_exec to the agent. The agent code also includes commented-out sample code for adding a vector search index to perform unstructured data retrieval.

For more examples of tools to add to your agent, see Databricks documentation (AWS | Azure | GCP)

Wrap the AutoGen agent using the ChatAgent interface

For compatibility with Databricks AI features, the AutogenAgent class implements the ChatAgent interface to wrap the AutoGen agent. The predict_stream method is not implemented in ChatAgent because streaming is not supported in Autogen version 0.2 for custom models.

Databricks recommends using ChatAgent as it simplifies authoring multi-turn conversational agents using an open source standard. See MLflow's ChatAgent documentation.

%%writefile databricks_model_serving_client.py
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.serving import ChatMessage, ChatMessageRole
from typing import List, Optional

# This client is designed to interact with Databricks Serving Endpoints for Foundational Models.
# With this client created, users no longer need to worry about authentication through environment variables. 
# The authentication process is handled via the workspace client.
# The client was created following this guide: https://microsoft.github.io/autogen/0.2/blog/2024/01/26/Custom-Models.

class DatabricksModelServingClient:
    def __init__(self, config, **kwargs):
        self.workspace = WorkspaceClient()
        self.openai_client = self.workspace.serving_endpoints.get_open_ai_client()
        self.endpoint_name = config.get("endpoint_name")
        self.llm_config = config.get("llm_config")
        self.config = config

    def transform_messages(self, input_data, remove_keys=['id']):
      output_messages = input_data['messages'].copy()
      for message in output_messages:
        for key in remove_keys:
          if key in message:
            message.pop(key)
      return output_messages

    def create(self, input_data):
      
      # Remove specific fields from the messages, as the serving endpoint does not expect them.
      input_messages = self.transform_messages(input_data)

      # Calling DBS Serving Endpoint using openai client.
      response = self.openai_client.chat.completions.create(
          model=self.endpoint_name,
          messages=input_messages,
          tools=input_data["tools"],
          **self.llm_config
      )
      
      return response

    def message_retrieval(self, response):
      # Process and return messages from the response
      return [choice.message for choice in response.choices]

    def cost(self, response):
      # Implement cost calculation if applicable
      return 0

    def get_usage(self, response):
      usage = response.usage
      # Implement usage statistics if available
      return {"prompt_tokens": usage.prompt_tokens, "total_tokens": usage.total_tokens, "completion_tokens": usage.completion_tokens}
%%writefile autogen_agentchat.py

from random import randint
import json
import os
import uuid

from databricks_model_serving_client import DatabricksModelServingClient

from autogen import ConversableAgent
from autogen import register_function
from mlflow.types.agent import ChatAgentRequest, ChatAgentMessage, ChatContext, ChatAgentResponse
from typing import Optional, Any
from mlflow.pyfunc import ChatAgent
import mlflow

from unitycatalog.ai.core.databricks import DatabricksFunctionClient
from unitycatalog.ai.autogen.toolkit import UCFunctionToolkit, AutogenTool

from distutils.util import strtobool

def parse_bool_robust(value):
    return bool(strtobool(value))

#By default enable traces, 
# if 'MLFLOW_LOG_TRACES' is available as env variable then use it to set the flag.
log_traces_flag = True

if 'MLFLOW_LOG_TRACES' in os.environ:
  log_traces_flag = parse_bool_robust(os.environ['MLFLOW_LOG_TRACES'])

mlflow.autogen.autolog(log_traces=log_traces_flag)

###############################################################################
## Define tools for your agent, enabling it to retrieve data or take actions
## beyond text generation
## To create and see usage examples of more tools, see
## https://docs.databricks.com/en/generative-ai/agent-framework/agent-tool.html
###############################################################################

client = DatabricksFunctionClient()

# Create a tool that generates random numbers and provides customized output along with attachments.
def generate_random_ints(min: int, max: int, size: int) -> dict[str, Any]:
    """Generate size random ints in the range [min, max]."""
    attachments = {"min": str(min), "max": str(max)}
    custom_outputs = [randint(min, max) for _ in range(size)]
    content = f"Successfully generated array of {size} random ints in [{min}, {max}]."
    return {
        "content": content,
        "attachments": attachments,
        "custom_outputs": {"random_nums": custom_outputs},
    }

# Create a mock tool that provides the weather information for a city in California.
def weather_in_california_city(city: str) -> str:
  """Get the weather description of a city in California."""
  return f"The weather in {city} is sunny."

tools = [generate_random_ints, weather_in_california_city]

# Adding a pre-built tool from the Unity Catalog.

uc_tool_names = ["system.ai.python_exec"]
uc_toolkit = UCFunctionToolkit(function_names=uc_tool_names, client=client)
tools.extend(uc_toolkit.tools)

# Use Databricks vector search indexes as tools
# See https://docs.databricks.com/en/generative-ai/agent-framework/unstructured-retrieval-tools.html
# for details

# TODO: Add vector search indexes
# vector_search_tools = [
#         VectorSearchRetrieverTool(
#         index_name="",
#         # filters="..."
#     )
# ]
# tools.extend(vector_search_tools)

# Please update this to reflect your specific endpoint name.
# The name of the serving endpoint available in Databricks must be specified.
LLM_ENDPOINT_NAME = "databricks-meta-llama-3-3-70b-instruct"

# Developing the ChatAgent that encapsulates the Autogen Agent.
class AutogenAgent(ChatAgent):
  
  def __init__(self, tools = None, model_name: str = "autogen_agent"):
    self.model_name = model_name
    self.tools = tools if tools else []
    
  def _create_agents(self, chat_history):
    
    def _is_termination_message(message):
      content = message.get("content", "")
      return (content and "TERMINATE" in content.upper()) or (message['role'] == 'user' and 'tool_calls' not in message)
    
    # The user proxy agent is used for interacting with the assistant agent
    # and executes tool calls.
    user_proxy = ConversableAgent(
        name="User",
        llm_config=False,
        is_termination_msg=_is_termination_message,
        human_input_mode="NEVER",
    )

    # Defining a custom LLM configuration that utilizes the Databricks Model Serving Client.

    config_list = [{
                "model_client_cls": "DatabricksModelServingClient",
                "model": self.model_name,
                "endpoint_name": LLM_ENDPOINT_NAME,
                "llm_config": {"max_tokens": 1500, "temperature": 0.01}}]
    
    # The actual agent that interacts with the large language model (LLM).
    assistant = ConversableAgent(
            name="Assistant",
            system_message="You are a helpful assistant with various tools available. Use tools as the primary source of information to assist with the user's question, if not possible then use your generation capacity.",
            llm_config={"config_list": config_list, "cache_seed": None, "stream": True},
            chat_messages={user_proxy: chat_history}
        )
    
    # Adding tools to the assistant agent and making the user proxy the executor of those tools.
    for tool in self.tools:
      if isinstance(tool, AutogenTool):
        tool.register_function(callers = assistant,
                        executors = user_proxy )
      else:
        register_function(
          tool,
          caller=assistant,  # The assistant agent can suggest calls to the tools.
          executor=user_proxy,  # The user proxy agent can execute the tool calls.
          description=tool.__doc__, # Extracting the tool's docstring as the description.
        ) 
    
    # Register the custom Databricks Model Serving Client in the assistant.
    assistant.register_model_client(model_client_cls=DatabricksModelServingClient)

    return assistant, user_proxy
  
  # Create a tool message and extract any available custom outputs.
  def _convert_tool_message(self, message):
      tool_response = message['tool_responses'][0]
      tool_message = {'role': 'tool', 'name': message['name'], 'tool_call_id': tool_response['tool_call_id']}
      custom_outputs = None
      try:
        tool_response_content = json.loads(tool_response["content"])
        if "custom_outputs" in tool_response_content:
          custom_outputs = tool_response_content["custom_outputs"]
        tool_message['attachments'] = tool_response_content['attachments']
        content = tool_response_content['content']
      except Exception:
        content = tool_response['content']
      tool_message['content'] = content
      return tool_message, custom_outputs
  
  # Convert AutoGen messages to ChatAgent messages.
  def _convert_to_chat_messages(self, messages_generated):
    output_messages = []
    custom_outputs = None
    for message in messages_generated:
      if message['role'] == 'tool':
        message, custom_outputs = self._convert_tool_message(message)
      message['id'] = str(uuid.uuid4())
      output_messages.append(ChatAgentMessage(**message))

    return output_messages, custom_outputs
  
  def predict(
      self,
      messages: list[ChatAgentMessage],
      context: Optional[ChatContext] = None,
      custom_inputs: Optional[dict[str, Any]] = None,
  ) -> ChatAgentResponse:

      request = {"messages": self._convert_messages_to_dict(messages)}

      # Extract the last message from the conversation and retain all 
      # previous messages except for the last one in the history.
      last_message = request["messages"][-1]

      chat_history = request["messages"][:-1]

      # Create the agents and execute the chat completion.
      
      assistant, user_proxy = self._create_agents(chat_history)

      model_response = user_proxy.initiate_chat(assistant, 
                                          message=last_message['content'],
                                          max_turns=5,
                                          clear_history=False)
      
      #Using chat_messages from assistant because model_response.chat_history 
      #is written from the perspective of the assistant, where the assistant 
      #acts as the user and the user takes on the role of the assistant.
      
      messages_generated = assistant.chat_messages[user_proxy][len(request["messages"]):]

      output_messages, custom_outputs = self._convert_to_chat_messages(messages_generated)

      return ChatAgentResponse(messages=output_messages, custom_outputs=custom_outputs)

autogent_model = AutogenAgent(tools)

mlflow.models.set_model(autogent_model)

Test the agent

Interact with the agent to test its output and tool-calling abilities. Since this notebook called mlflow.autogen.autolog(), you can view the trace for each step the agent takes.

Testing one-turn and multi-turn scenarios.

dbutils.library.restartPython()
# Import the model from the file that was just created.
from autogen_agentchat import autogent_model
# TODO: Replace this placeholder inputs with an appropriate domain-specific example for your agent.
single_turn_example = [{"role": "user", "content": "What is 10 + 10?"}]
multi_turn_example = [{"role": "user", "content": "What is 10 + 10?"},
                                     {"role": "assistant", "content": "10 + 10 = 20"},
                                     {"role": "user", "content": "Is that number greater or lower than 50?"}]
single_turn_tc_example = [{"role": "user", "content": "Can you generate ten random numbers between 10 and 100?"}]
autogent_model.predict({"messages": single_turn_example})
autogent_model.predict({"messages": multi_turn_example})
autogent_model.predict({"messages": single_turn_tc_example})

Log the agent as an MLflow model

Log the agent as code from the autogen_agentchat.py file. See MLflow - Models from Code.

Enable automatic authentication for Databricks resources

For the most common Databricks resource types, Databricks supports and recommends declaring resource dependencies for the agent upfront during logging. This enables automatic authentication passthrough when you deploy the agent. With automatic authentication passthrough, Databricks automatically provisions, rotates, and manages short-lived credentials to securely access these resource dependencies from within the agent endpoint.

To enable automatic authentication, specify the dependent Databricks resources when calling mlflow.pyfunc.log_model().

  • TODO: If your Unity Catalog tool queries a vector search index or leverages external functions, you need to include the dependent vector search index and UC connection objects, respectively, as resources. See docs (AWS | Azure).
import mlflow
import os

from autogen_agentchat import tools, LLM_ENDPOINT_NAME

from mlflow.models.resources import DatabricksFunction, DatabricksServingEndpoint

from unitycatalog.ai.autogen.toolkit import AutogenTool

resources = [DatabricksServingEndpoint(endpoint_name=LLM_ENDPOINT_NAME)]

for tool in tools:
    if isinstance(tool, AutogenTool):
        resources.append(DatabricksFunction(function_name=tool.name.replace("__", ".")))

os.environ['MLFLOW_LOG_TRACES'] = "False"

with mlflow.start_run() as run:
    logged_agent_info = mlflow.pyfunc.log_model(
        python_model=os.path.join(os.getcwd(), 'autogen_agentchat.py'),
        code_paths=["databricks_model_serving_client.py"],
        artifact_path="agent",
        pip_requirements = ["autogen-agentchat==0.2.40", "unitycatalog-autogen[databricks]", "databricks-ai-bridge"],
        resources=resources
    )

os.environ['MLFLOW_LOG_TRACES'] = "True"

Evaluate the agent with Agent Evaluation

Use Mosaic AI Agent Evaluation to evalaute the agent's responses based on expected responses and other evaluation criteria. Use the evaluation criteria you specify to guide iterations, using MLflow to track the computed quality metrics. See Databricks documentation (AWS | Azure).

To evaluate your tool calls, add custom metrics. See Databricks documentation (AWS | Azure).

import pandas as pd

# Please include additional examples, either manually or automatically, if necessary.

eval_examples = [
    {
        "request": {"messages": [{"role": "user", "content": "What is an LLM agent?"}]},
        "expected_response": None,
    }
]

eval_dataset = pd.DataFrame(eval_examples)
display(eval_dataset)
import mlflow

with mlflow.start_run(run_id=logged_agent_info.run_id):
    eval_results = mlflow.evaluate(
        f"runs:/{logged_agent_info.run_id}/agent",
        data=eval_dataset,  # Your evaluation dataset
        model_type="databricks-agent",  # Enable Mosaic AI Agent Evaluation
    )

# Review the evaluation results in the MLFLow UI (see console output), or access them in place:
display(eval_results.tables['eval_results'])

Pre-deployment agent validation

Before registering and deploying the agent, perform pre-deployment checks using the mlflow.models.predict() API. See Databricks documentation (AWS | Azure).

mlflow.models.predict(
    model_uri=f"runs:/{logged_agent_info.run_id}/agent",
    input_data={"messages": [{"role": "user", "content": "Hello!"}]},
    env_manager="uv"
)

Register the model to Unity Catalog

Before you deploy the agent, you must register the agent to Unity Catalog.

  • TODO Update the catalog, schema, and model_name below to register the MLflow model to Unity Catalog.
mlflow.set_registry_uri("databricks-uc")

# TODO: define the catalog, schema, and model name for your UC model
catalog = ""
schema = ""
model_name = ""
UC_MODEL_NAME = f"{catalog}.{schema}.{model_name}"

# register the model to UC
uc_registered_model_info = mlflow.register_model(
    model_uri=logged_agent_info.model_uri, name=UC_MODEL_NAME
)

Deploy the agent

from databricks import agents
agents.deploy(UC_MODEL_NAME, uc_registered_model_info.version, tags = {"endpointSource": "docs"})

Next steps

After your agent is deployed, you can chat with it in AI playground to perform additional checks, share it with SMEs in your organization for feedback, and embed it in a production application. See Databricks documentation (AWS | Azure).

;