databricks-logo

langgraph-multiagent-genie-pat

(Python)
Loading...

Mosaic AI Agent Framework: Author and deploy a multi-agent system with Genie

This notebook demonstrates how to build a multi-agent system using Mosaic AI Agent Framework and LangGraph, where Genie is one of the agents. In this notebook, you:

  1. Author a multi-agent system using LangGraph.
  2. Wrap the LangGraph agent with MLflow ChatAgent to ensure compatibility with Databricks features.
  3. Manually test the multi-agent system's output.
  4. Log and deploy the multi-agent system.

This example is based on LangGraph documentation - Multi-agent supervisor example

Why use a Genie agent?

Multi-agent systems consist of multiple AI agents working together, each with specialized capabilities. As one of those agents, Genie allows users to interact with their structured data using natural language.

Unlike SQL functions which can only run pre-defined queries, Genie has the flexibility to create novel queries to answer user questions.

Prerequisites

  • Address all TODOs in this notebook.
  • Create a Genie Space, see Databricks documentation (AWS | Azure).
%pip install -U -qqq mlflow langgraph==0.3.4 databricks-langchain databricks-agents uv
dbutils.library.restartPython()

Define the multi-agent system

Create a multi-agent system in LangGraph using a supervisor agent node directing the following agent nodes:

  • GenieAgent: The Genie agent that queries and reasons over structured data.
  • Tool-calling agent: An agent that calls Unity Catalog function tools.

In this example, the tool-calling agent uses the built-in Unity Catalog function system.ai.python_exec to execute Python code. For examples of other tools you can add to your agents, see Databricks documentation (AWS | Azure).

Wrap the LangGraph agent using the ChatAgent interface

Databricks recommends using ChatAgent to ensure compatibility with Databricks AI features and to simplify authoring multi-turn conversational agents using an open source standard.

The LangGraphChatAgent class implements the ChatAgent interface to wrap the LangGraph agent.

See MLflow's ChatAgent documentation.

Write agent code to file

Define the agent code in a single cell below. This lets you write the agent code to a local Python file, using the %%writefile magic command, for subsequent logging and deployment.

%%writefile agent.py
import functools
import os
from typing import Any, Generator, Literal, Optional

import mlflow
from databricks.sdk import WorkspaceClient
from databricks_langchain import (
    ChatDatabricks,
    UCFunctionToolkit,
)
from databricks_langchain.genie import GenieAgent
from langchain_core.runnables import RunnableLambda
from langgraph.graph import END, StateGraph
from langgraph.graph.state import CompiledStateGraph
from langgraph.prebuilt import create_react_agent
from mlflow.langchain.chat_agent_langgraph import ChatAgentState
from mlflow.pyfunc import ChatAgent
from mlflow.types.agent import (
    ChatAgentChunk,
    ChatAgentMessage,
    ChatAgentResponse,
    ChatContext,
)
from pydantic import BaseModel

###################################################
## Create a GenieAgent with access to a Genie Space
###################################################

# TODO add GENIE_SPACE_ID and a description for this space
# You can find the ID in the URL of the genie room /genie/rooms/<GENIE_SPACE_ID>
GENIE_SPACE_ID = ""
genie_agent_description = "This genie agent can answer ..."

genie_agent = GenieAgent(
    genie_space_id=GENIE_SPACE_ID,
    genie_agent_name="Genie",
    description=genie_agent_description,
    client=WorkspaceClient(
        host=os.getenv("DB_MODEL_SERVING_HOST_URL"),
        token=os.getenv("DATABRICKS_GENIE_PAT"),
    ),
)


############################################
# Define your LLM endpoint and system prompt
############################################

# TODO: Replace with your model serving endpoint
# multi-agent Genie works best with claude 3.7 or gpt 4o models.
LLM_ENDPOINT_NAME = "databricks-claude-3-7-sonnet"
llm = ChatDatabricks(endpoint=LLM_ENDPOINT_NAME)


############################################################
# Create a code agent
# You can also create agents with access to additional tools
############################################################
tools = []

# TODO if desired, add additional tools and update the description of this agent
uc_tool_names = ["system.ai.*"]
uc_toolkit = UCFunctionToolkit(function_names=uc_tool_names)
tools.extend(uc_toolkit.tools)
code_agent_description = (
    "The Coder agent specializes in solving programming challenges, generating code snippets, debugging issues, and explaining complex coding concepts.",
)
code_agent = create_react_agent(llm, tools=tools)

#############################
# Define the supervisor agent
#############################

# TODO update the max number of iterations between supervisor and worker nodes
# before returning to the user
MAX_ITERATIONS = 3

worker_descriptions = {
    "Genie": genie_agent_description,
    "Coder": code_agent_description,
}

formatted_descriptions = "\n".join(
    f"- {name}: {desc}" for name, desc in worker_descriptions.items()
)

system_prompt = f"Decide between routing between the following workers or ending the conversation if an answer is provided. \n{formatted_descriptions}"
options = ["FINISH"] + list(worker_descriptions.keys())
FINISH = {"next_node": "FINISH"}

def supervisor_agent(state):
    count = state.get("iteration_count", 0) + 1
    if count > MAX_ITERATIONS:
        return FINISH
    
    class nextNode(BaseModel):
        next_node: Literal[tuple(options)]

    preprocessor = RunnableLambda(
        lambda state: [{"role": "system", "content": system_prompt}] + state["messages"]
    )
    supervisor_chain = preprocessor | llm.with_structured_output(nextNode)
    next_node = supervisor_chain.invoke(state).next_node
    
    # if routed back to the same node, exit the loop
    if state.get("next_node") == next_node:
        return FINISH
    return {
        "iteration_count": count,
        "next_node": next_node
    }

#######################################
# Define our multiagent graph structure
#######################################


def agent_node(state, agent, name):
    result = agent.invoke(state)
    return {
        "messages": [
            {
                "role": "assistant",
                "content": result["messages"][-1].content,
                "name": name,
            }
        ]
    }


def final_answer(state):
    prompt = "Using only the content in the messages, respond to the previous user question using the answer given by the other assistant messages."
    preprocessor = RunnableLambda(
        lambda state: state["messages"] + [{"role": "user", "content": prompt}]
    )
    final_answer_chain = preprocessor | llm
    return {"messages": [final_answer_chain.invoke(state)]}


class AgentState(ChatAgentState):
    next_node: str
    iteration_count: int


code_node = functools.partial(agent_node, agent=code_agent, name="Coder")
genie_node = functools.partial(agent_node, agent=genie_agent, name="Genie")

workflow = StateGraph(AgentState)
workflow.add_node("Genie", genie_node)
workflow.add_node("Coder", code_node)
workflow.add_node("supervisor", supervisor_agent)
workflow.add_node("final_answer", final_answer)

workflow.set_entry_point("supervisor")
# We want our workers to ALWAYS "report back" to the supervisor when done
for worker in worker_descriptions.keys():
    workflow.add_edge(worker, "supervisor")

# Let the supervisor decide which next node to go
workflow.add_conditional_edges(
    "supervisor",
    lambda x: x["next_node"],
    {**{k: k for k in worker_descriptions.keys()}, "FINISH": "final_answer"},
)
workflow.add_edge("final_answer", END)
multi_agent = workflow.compile()

###################################
# Wrap our multi-agent in ChatAgent
###################################


class LangGraphChatAgent(ChatAgent):
    def __init__(self, agent: CompiledStateGraph):
        self.agent = agent

    def predict(
        self,
        messages: list[ChatAgentMessage],
        context: Optional[ChatContext] = None,
        custom_inputs: Optional[dict[str, Any]] = None,
    ) -> ChatAgentResponse:
        request = {
            "messages": [m.model_dump_compat(exclude_none=True) for m in messages]
        }

        messages = []
        for event in self.agent.stream(request, stream_mode="updates"):
            for node_data in event.values():
                messages.extend(
                    ChatAgentMessage(**msg) for msg in node_data.get("messages", [])
                )
        return ChatAgentResponse(messages=messages)

    def predict_stream(
        self,
        messages: list[ChatAgentMessage],
        context: Optional[ChatContext] = None,
        custom_inputs: Optional[dict[str, Any]] = None,
    ) -> Generator[ChatAgentChunk, None, None]:
        request = {
            "messages": [m.model_dump_compat(exclude_none=True) for m in messages]
        }
        for event in self.agent.stream(request, stream_mode="updates"):
            for node_data in event.values():
                yield from (
                    ChatAgentChunk(**{"delta": msg})
                    for msg in node_data.get("messages", [])
                )


# Create the agent object, and specify it as the agent object to use when
# loading the agent back for inference via mlflow.models.set_model()
mlflow.langchain.autolog()
AGENT = LangGraphChatAgent(multi_agent)
mlflow.models.set_model(AGENT)

Test the agent

Interact with the agent to test its output. Since this notebook called mlflow.langchain.autolog() you can view the trace for each step the agent takes.

TODO: Replace this placeholder input_example with a domain-specific prompt for your agent.

dbutils.library.restartPython()

Create a Personal Access Token (PAT) as a Databricks secret

In order to access the Genie Space and its underlying resources, we need to create a PAT

  • This can either be your own PAT or that of a System Principal (AWS | Azure). You will have to rotate this token yourself upon expiry.
  • Add secrets-based environment variables to a model serving endpoint (AWS | Azure).
  • You can reference the table in the deploy docs for the right permissions level for each resource: (AWS | Azure).
    • Provision with CAN RUN on the Genie Space
    • Provision with CAN USE on the SQL Warehouse powering the Genie Space
    • Provision with SELECT on underlying Unity Catalog Tables
    • Provision with EXECUTE on underyling Unity Catalog Functions
import os
from dbruntime.databricks_repl_context import get_context

# TODO: set secret_scope_name and secret_key_name to access your PAT
secret_scope_name = ""
secret_key_name = ""

os.environ["DB_MODEL_SERVING_HOST_URL"] = "https://" + get_context().workspaceUrl
assert os.environ["DB_MODEL_SERVING_HOST_URL"] is not None
os.environ["DATABRICKS_GENIE_PAT"] = dbutils.secrets.get(
    scope=secret_scope_name, key=secret_key_name
)
assert os.environ["DATABRICKS_GENIE_PAT"] is not None, (
    "The DATABRICKS_GENIE_PAT was not properly set to the PAT secret"
)
from agent import AGENT, genie_agent_description

assert genie_agent_description != "This genie agent can answer ...", (
    "Remember to update the genie agent description for higher quality answers."
)
input_example = {
    "messages": [
        {
            "role": "user",
            "content": "Explain the datasets and capabilities that the Genie agent has access to.",
        }
    ]
}
AGENT.predict(input_example)
for event in AGENT.predict_stream(input_example):
  print(event, "-----------\n")

Log the agent as an MLflow model

Log the agent as code from the agent.py file. See MLflow - Models from Code.

Enable automatic authentication for Databricks resources

For the most common Databricks resource types, Databricks supports and recommends declaring resource dependencies for the agent upfront during logging. This enables automatic authentication passthrough when you deploy the agent. With automatic authentication passthrough, Databricks automatically provisions, rotates, and manages short-lived credentials to securely access these resource dependencies from within the agent endpoint.

To enable automatic authentication, specify the dependent Databricks resources when calling mlflow.pyfunc.log_model().

  • TODO: If your Unity Catalog tool queries a vector search index or leverages external functions, you need to include the dependent vector search index and UC connection objects, respectively, as resources. See docs (AWS | Azure).
# Determine Databricks resources to specify for automatic auth passthrough at deployment time
import mlflow
from agent import GENIE_SPACE_ID, LLM_ENDPOINT_NAME, tools
from databricks_langchain import UnityCatalogTool, VectorSearchRetrieverTool
from mlflow.models.resources import (
    DatabricksFunction,
    DatabricksGenieSpace,
    DatabricksServingEndpoint,
)
from pkg_resources import get_distribution

# TODO: Manually include underlying resources if needed. See the TODO in the markdown above for more information.
resources = [
    DatabricksServingEndpoint(endpoint_name=LLM_ENDPOINT_NAME),
    DatabricksGenieSpace(genie_space_id=GENIE_SPACE_ID),
]
for tool in tools:
    if isinstance(tool, VectorSearchRetrieverTool):
        resources.extend(tool.resources)
    elif isinstance(tool, UnityCatalogTool):
        resources.append(DatabricksFunction(function_name=tool.uc_function_name))

with mlflow.start_run():
    logged_agent_info = mlflow.pyfunc.log_model(
        artifact_path="agent",
        python_model="agent.py",
        input_example=input_example,
        extra_pip_requirements=[f"databricks-connect=={get_distribution('databricks-connect').version}"],
        resources=resources,
    )

Pre-deployment agent validation

Before registering and deploying the agent, perform pre-deployment checks using the mlflow.models.predict() API. See Databricks documentation (AWS | Azure)."

mlflow.models.predict(
    model_uri=f"runs:/{logged_agent_info.run_id}/agent",
    input_data=input_example,
    env_manager="uv",
)

Register the model to Unity Catalog

Update the catalog, schema, and model_name below to register the MLflow model to Unity Catalog.

mlflow.set_registry_uri("databricks-uc")

# TODO: define the catalog, schema, and model name for your UC model
catalog = ""
schema = ""
model_name = ""
UC_MODEL_NAME = f"{catalog}.{schema}.{model_name}"

# register the model to UC
uc_registered_model_info = mlflow.register_model(
    model_uri=logged_agent_info.model_uri, name=UC_MODEL_NAME
)

Deploy the agent

from databricks import agents

agents.deploy(
    UC_MODEL_NAME,
    uc_registered_model_info.version,
    tags={"endpointSource": "docs"},
    environment_vars={
        "DATABRICKS_GENIE_PAT": f"{{{{secrets/{secret_scope_name}/{secret_key_name}}}}}"
    },
)

Next steps

After your agent is deployed, you can chat with it in AI playground to perform additional checks, share it with SMEs in your organization for feedback, or embed it in a production application. See Databricks documentation (AWS | Azure).

;