databricks-logo

    langgraph-multiagent-genie-pat

    (Python)
    Loading...

    Mosaic AI Agent Framework: Author and deploy a multi-agent system with Genie

    This notebook demonstrates how to build a multi-agent system using Mosaic AI Agent Framework and LangGraph, where Genie is one of the agents. In this notebook, you:

    1. Author a multi-agent system using LangGraph.
    2. Wrap the LangGraph agent with MLflow ChatAgent to ensure compatibility with Databricks features.
    3. Manually test the multi-agent system's output.
    4. Log and deploy the multi-agent system.

    This example is based on LangGraph documentation - Multi-agent supervisor example

    Why use a Genie agent?

    Multi-agent systems consist of multiple AI agents working together, each with specialized capabilities. As one of those agents, Genie allows users to interact with their structured data using natural language.

    Unlike SQL functions which can only run pre-defined queries, Genie has the flexibility to create novel queries to answer user questions.

    Prerequisites

    • Address all TODOs in this notebook.
    • Create a Genie Space, see Databricks documentation (AWS | Azure).
    %pip install -U -qqq mlflow-skinny[databricks] langgraph==0.3.4 databricks-langchain databricks-agents uv
    dbutils.library.restartPython()

    Define the multi-agent system

    Create a multi-agent system in LangGraph using a supervisor agent node directing the following agent nodes:

    • GenieAgent: The Genie agent that queries and reasons over structured data.
    • Tool-calling agent: An agent that calls Unity Catalog function tools.

    In this example, the tool-calling agent uses the built-in Unity Catalog function system.ai.python_exec to execute Python code. For examples of other tools you can add to your agents, see Databricks documentation (AWS | Azure).

    Wrap the LangGraph agent using the ChatAgent interface

    Databricks recommends using ChatAgent to ensure compatibility with Databricks AI features and to simplify authoring multi-turn conversational agents using an open source standard.

    The LangGraphChatAgent class implements the ChatAgent interface to wrap the LangGraph agent.

    See MLflow's ChatAgent documentation.

    Write agent code to file

    Define the agent code in a single cell below. This lets you write the agent code to a local Python file, using the %%writefile magic command, for subsequent logging and deployment.

    %%writefile agent.py
    import functools
    import os
    from typing import Any, Generator, Literal, Optional
    
    import mlflow
    from databricks.sdk import WorkspaceClient
    from databricks_langchain import (
        ChatDatabricks,
        UCFunctionToolkit,
    )
    from databricks_langchain.genie import GenieAgent
    from langchain_core.runnables import RunnableLambda
    from langgraph.graph import END, StateGraph
    from langgraph.graph.state import CompiledStateGraph
    from langgraph.prebuilt import create_react_agent
    from mlflow.langchain.chat_agent_langgraph import ChatAgentState
    from mlflow.pyfunc import ChatAgent
    from mlflow.types.agent import (
        ChatAgentChunk,
        ChatAgentMessage,
        ChatAgentResponse,
        ChatContext,
    )
    from pydantic import BaseModel
    
    ###################################################
    ## Create a GenieAgent with access to a Genie Space
    ###################################################
    
    # TODO add GENIE_SPACE_ID and a description for this space
    # You can find the ID in the URL of the genie room /genie/rooms/<GENIE_SPACE_ID>
    # Example description: This Genie agent can answer questions based on a database containing tables related to enterprise software sales, including accounts, opportunities, opportunity history, fiscal periods, quotas, targets, teams, and users. Use Genie to fetch and analyze data from these tables by specifying the relevant columns and filters. Genie can execute SQL queries to provide precise data insights based on your questions.
    GENIE_SPACE_ID = ""
    genie_agent_description = "This genie agent can answer ..."
    
    genie_agent = GenieAgent(
        genie_space_id=GENIE_SPACE_ID,
        genie_agent_name="Genie",
        description=genie_agent_description,
        client=WorkspaceClient(
            host=os.getenv("DB_MODEL_SERVING_HOST_URL"),
            token=os.getenv("DATABRICKS_GENIE_PAT"),
        ),
    )
    
    
    ############################################
    # Define your LLM endpoint and system prompt
    ############################################
    
    # TODO: Replace with your model serving endpoint
    # multi-agent Genie works best with claude 3.7 or gpt 4o models.
    LLM_ENDPOINT_NAME = "databricks-claude-3-7-sonnet"
    llm = ChatDatabricks(endpoint=LLM_ENDPOINT_NAME)
    
    
    ############################################################
    # Create a code agent
    # You can also create agents with access to additional tools
    ############################################################
    tools = []
    
    # TODO if desired, add additional tools and update the description of this agent
    uc_tool_names = ["system.ai.*"]
    uc_toolkit = UCFunctionToolkit(function_names=uc_tool_names)
    tools.extend(uc_toolkit.tools)
    code_agent_description = (
        "The Coder agent specializes in solving programming challenges, generating code snippets, debugging issues, and explaining complex coding concepts.",
    )
    code_agent = create_react_agent(llm, tools=tools)
    
    #############################
    # Define the supervisor agent
    #############################
    
    # TODO update the max number of iterations between supervisor and worker nodes
    # before returning to the user
    MAX_ITERATIONS = 3
    
    worker_descriptions = {
        "Genie": genie_agent_description,
        "Coder": code_agent_description,
    }
    
    formatted_descriptions = "\n".join(
        f"- {name}: {desc}" for name, desc in worker_descriptions.items()
    )
    
    system_prompt = f"Decide between routing between the following workers or ending the conversation if an answer is provided. \n{formatted_descriptions}"
    options = ["FINISH"] + list(worker_descriptions.keys())
    FINISH = {"next_node": "FINISH"}
    
    def supervisor_agent(state):
        count = state.get("iteration_count", 0) + 1
        if count > MAX_ITERATIONS:
            return FINISH
        
        class nextNode(BaseModel):
            next_node: Literal[tuple(options)]
    
        preprocessor = RunnableLambda(
            lambda state: [{"role": "system", "content": system_prompt}] + state["messages"]
        )
        supervisor_chain = preprocessor | llm.with_structured_output(nextNode)
        next_node = supervisor_chain.invoke(state).next_node
        
        # if routed back to the same node, exit the loop
        if state.get("next_node") == next_node:
            return FINISH
        return {
            "iteration_count": count,
            "next_node": next_node
        }
    
    #######################################
    # Define our multiagent graph structure
    #######################################
    
    
    def agent_node(state, agent, name):
        result = agent.invoke(state)
        return {
            "messages": [
                {
                    "role": "assistant",
                    "content": result["messages"][-1].content,
                    "name": name,
                }
            ]
        }
    
    
    def final_answer(state):
        prompt = "Using only the content in the messages, respond to the previous user question using the answer given by the other assistant messages."
        preprocessor = RunnableLambda(
            lambda state: state["messages"] + [{"role": "user", "content": prompt}]
        )
        final_answer_chain = preprocessor | llm
        return {"messages": [final_answer_chain.invoke(state)]}
    
    
    class AgentState(ChatAgentState):
        next_node: str
        iteration_count: int
    
    
    code_node = functools.partial(agent_node, agent=code_agent, name="Coder")
    genie_node = functools.partial(agent_node, agent=genie_agent, name="Genie")
    
    workflow = StateGraph(AgentState)
    workflow.add_node("Genie", genie_node)
    workflow.add_node("Coder", code_node)
    workflow.add_node("supervisor", supervisor_agent)
    workflow.add_node("final_answer", final_answer)
    
    workflow.set_entry_point("supervisor")
    # We want our workers to ALWAYS "report back" to the supervisor when done
    for worker in worker_descriptions.keys():
        workflow.add_edge(worker, "supervisor")
    
    # Let the supervisor decide which next node to go
    workflow.add_conditional_edges(
        "supervisor",
        lambda x: x["next_node"],
        {**{k: k for k in worker_descriptions.keys()}, "FINISH": "final_answer"},
    )
    workflow.add_edge("final_answer", END)
    multi_agent = workflow.compile()
    
    ###################################
    # Wrap our multi-agent in ChatAgent
    ###################################
    
    
    class LangGraphChatAgent(ChatAgent):
        def __init__(self, agent: CompiledStateGraph):
            self.agent = agent
    
        def predict(
            self,
            messages: list[ChatAgentMessage],
            context: Optional[ChatContext] = None,
            custom_inputs: Optional[dict[str, Any]] = None,
        ) -> ChatAgentResponse:
            request = {
                "messages": [m.model_dump_compat(exclude_none=True) for m in messages]
            }
    
            messages = []
            for event in self.agent.stream(request, stream_mode="updates"):
                for node_data in event.values():
                    messages.extend(
                        ChatAgentMessage(**msg) for msg in node_data.get("messages", [])
                    )
            return ChatAgentResponse(messages=messages)
    
        def predict_stream(
            self,
            messages: list[ChatAgentMessage],
            context: Optional[ChatContext] = None,
            custom_inputs: Optional[dict[str, Any]] = None,
        ) -> Generator[ChatAgentChunk, None, None]:
            request = {
                "messages": [m.model_dump_compat(exclude_none=True) for m in messages]
            }
            for event in self.agent.stream(request, stream_mode="updates"):
                for node_data in event.values():
                    yield from (
                        ChatAgentChunk(**{"delta": msg})
                        for msg in node_data.get("messages", [])
                    )
    
    
    # Create the agent object, and specify it as the agent object to use when
    # loading the agent back for inference via mlflow.models.set_model()
    mlflow.langchain.autolog()
    AGENT = LangGraphChatAgent(multi_agent)
    mlflow.models.set_model(AGENT)

    Test the agent

    Interact with the agent to test its output. Since this notebook called mlflow.langchain.autolog() you can view the trace for each step the agent takes.

    TODO: Replace this placeholder input_example with a domain-specific prompt for your agent.

    dbutils.library.restartPython()
    from agent import AGENT
    
    AGENT.predict({"messages": [{"role": "user", "content": "Hello, what kind of questions can I ask you?"}]})

    Create a Personal Access Token (PAT) as a Databricks secret

    In order to access the Genie Space and its underlying resources, we need to create a PAT

    • This can either be your own PAT or that of a System Principal (AWS | Azure). You will have to rotate this token yourself upon expiry.
    • Add secrets-based environment variables to a model serving endpoint (AWS | Azure).
    • You can reference the table in the deploy docs for the right permissions level for each resource: (AWS | Azure).
      • Provision with CAN RUN on the Genie Space
      • Provision with CAN USE on the SQL Warehouse powering the Genie Space
      • Provision with SELECT on underlying Unity Catalog Tables
      • Provision with EXECUTE on underyling Unity Catalog Functions
    import os
    from dbruntime.databricks_repl_context import get_context
    
    # TODO: set WORKSPACE_URL manually if it cannot be inferred from the current notebook
    WORKSPACE_URL = None
    if WORKSPACE_URL is None:
      workspace_url_hostname = get_context().workspaceUrl
      assert workspace_url_hostname is not None, "Unable to look up current workspace URL. This can happen if running against serverless compute. Manually set WORKSPACE_URL yourself above, or run this notebook against classic compute"
      WORKSPACE_URL = f"https://{workspace_url_hostname}"
    # TODO: set secret_scope_name and secret_key_name to access your PAT
    secret_scope_name = ""
    secret_key_name = ""
    
    os.environ["DB_MODEL_SERVING_HOST_URL"] = WORKSPACE_URL
    assert os.environ["DB_MODEL_SERVING_HOST_URL"] is not None
    os.environ["DATABRICKS_GENIE_PAT"] = dbutils.secrets.get(
        scope=secret_scope_name, key=secret_key_name
    )
    assert os.environ["DATABRICKS_GENIE_PAT"] is not None, (
        "The DATABRICKS_GENIE_PAT was not properly set to the PAT secret"
    )
    from agent import AGENT, genie_agent_description
    
    assert genie_agent_description != "This genie agent can answer ...", (
        "Remember to update the genie agent description for higher quality answers."
    )
    input_example = {
        "messages": [
            {
                "role": "user",
                "content": "Explain the datasets and capabilities that the Genie agent has access to.",
            }
        ]
    }
    AGENT.predict(input_example)
    for event in AGENT.predict_stream(input_example):
      print(event, "-----------\n")

    Log the agent as an MLflow model

    Log the agent as code from the agent.py file. See MLflow - Models from Code.

    Enable automatic authentication for Databricks resources

    For the most common Databricks resource types, Databricks supports and recommends declaring resource dependencies for the agent upfront during logging. This enables automatic authentication passthrough when you deploy the agent. With automatic authentication passthrough, Databricks automatically provisions, rotates, and manages short-lived credentials to securely access these resource dependencies from within the agent endpoint.

    To enable automatic authentication, specify the dependent Databricks resources when calling mlflow.pyfunc.log_model().

    • TODO: If your Unity Catalog tool queries a vector search index or leverages external functions, you need to include the dependent vector search index and UC connection objects, respectively, as resources. See docs (AWS | Azure).

    • TODO: If the SQL Warehouse powering your Genie space has secured permissions, include the warehouse ID and table name in your resources to enable passthrough authentication. (AWS | Azure).

    # Determine Databricks resources to specify for automatic auth passthrough at deployment time
    import mlflow
    from agent import GENIE_SPACE_ID, LLM_ENDPOINT_NAME, tools
    from databricks_langchain import UnityCatalogTool, VectorSearchRetrieverTool
    from mlflow.models.resources import (
        DatabricksFunction,
        DatabricksGenieSpace,
        DatabricksServingEndpoint,
    #   DatabricksSQLWarehouse,
    #   DatabricksTable,
    )
    from pkg_resources import get_distribution
    
    # TODO: Manually include underlying resources if needed. See the TODO in the markdown above for more information.
    resources = [
        DatabricksServingEndpoint(endpoint_name=LLM_ENDPOINT_NAME),
        DatabricksGenieSpace(genie_space_id=GENIE_SPACE_ID),
    #   DatabricksSQLWarehouse(warehouse_id="your_warehouse_id"),
    #   DatabricksTable(table_name="your_catalog.schema.table_name"),
    ]
    for tool in tools:
        if isinstance(tool, VectorSearchRetrieverTool):
            resources.extend(tool.resources)
        elif isinstance(tool, UnityCatalogTool):
            resources.append(DatabricksFunction(function_name=tool.uc_function_name))
    
    with mlflow.start_run():
        logged_agent_info = mlflow.pyfunc.log_model(
            name="agent",
            python_model="agent.py",
            input_example=input_example,
            resources=resources,
            pip_requirements=[
                f"databricks-connect=={get_distribution('databricks-connect').version}",
                f"mlflow=={get_distribution('mlflow').version}",
                f"databricks-langchain=={get_distribution('databricks-langchain').version}",
                f"langgraph=={get_distribution('langgraph').version}",
            ],
        )

    Pre-deployment agent validation

    Before registering and deploying the agent, perform pre-deployment checks using the mlflow.models.predict() API. See Databricks documentation (AWS | Azure).

    mlflow.models.predict(
        model_uri=f"runs:/{logged_agent_info.run_id}/agent",
        input_data=input_example,
        env_manager="uv",
    )

    Register the model to Unity Catalog

    Update the catalog, schema, and model_name below to register the MLflow model to Unity Catalog.

    mlflow.set_registry_uri("databricks-uc")
    
    # TODO: define the catalog, schema, and model name for your UC model
    catalog = ""
    schema = ""
    model_name = ""
    UC_MODEL_NAME = f"{catalog}.{schema}.{model_name}"
    
    # register the model to UC
    uc_registered_model_info = mlflow.register_model(
        model_uri=logged_agent_info.model_uri, name=UC_MODEL_NAME
    )

    Deploy the agent

    from databricks import agents
    
    agents.deploy(
        UC_MODEL_NAME,
        uc_registered_model_info.version,
        tags={"endpointSource": "docs"},
        environment_vars={
            "DATABRICKS_GENIE_PAT": f"{{{{secrets/{secret_scope_name}/{secret_key_name}}}}}"
        },
    )

    Next steps

    After your agent is deployed, you can chat with it in AI playground to perform additional checks, share it with SMEs in your organization for feedback, or embed it in a production application. See Databricks documentation (AWS | Azure).

    ;