Autor AI agentes em código
Esta página demonstra como criar um agente de AI no Python utilizando a estrutura de agentes Mosaic AI e bibliotecas populares de criação de agentes, como LangGraph e OpenAI.
Requisitos
A Databricks recomenda instalar a versão mais recente do cliente MLflow Python ao desenvolver agentes.
Para criar e implantar agentes usando a abordagem desta página, instale o seguinte:
- databricks-agents1.2.0 ou superior
- mlflow3.1.3 ou acima
- Python 3,10 ou acima.
- Utilize serverless, compute ou Databricks Runtime 13.3 LTS ou acima para cumprir este requisito.
 
%pip install -U -qqqq databricks-agents mlflow
Databricks Também recomendamos a instalação do pacote de integração Databricks AI Bridge para criar agentes. Esses pacotes de integração fornecem uma camada compartilhada de APIs que interage com Databricks AI recurso, como Databricks AI/BI Genie e Vector Search, em estruturas de criação de agentes e SDKs.
- OpenAI
- LangChain/LangGraph
- DSPy
- Pure Python agents
%pip install -U -qqqq databricks-openai
%pip install -U -qqqq databricks-langchain
%pip install -U -qqqq databricks-dspy
%pip install -U -qqqq databricks-ai-bridge
 Use ResponsesAgent para criar agentes
A Databricks recomenda a interface MLflow ResponsesAgent para criar agentes de nível de produção. ResponsesAgent permite que você crie agentes com qualquer estrutura de terceiros e, em seguida, integre-os ao recurso Databricks AI para obter recursos robustos de registro, rastreamento, avaliação, implantação e monitoramento.
O esquema ResponsesAgent é compatível com o esquema OpenAI Responses. Para saber mais sobre o OpenAI Responses, consulte OpenAI: Responses vs. ChatCompletion.
A interface mais antiga ChatAgent ainda é compatível com o Databricks. No entanto, para novos agentes, a Databricks recomenda utilizar a versão mais recente do MLflow e a interface ResponsesAgent.
Consulte o esquema do agente de entrada e saída legado.
ResponsesAgent oferece os seguintes benefícios:
- 
Capacidades avançadas de agente - Suporte multiagente
- transmissão saída : transmissão a saída em partes menores.
- Histórico abrangente de mensagens de tool-calling : Retorna várias mensagens, inclusive mensagens intermediárias de chamadas de ferramentas, para melhorar a qualidade e o gerenciamento de conversas.
- Suporte para confirmação de chamadas de ferramentas
- Suporte de ferramentas de longa duração
 
- 
Desenvolvimento, implantação e monitoramento simplificados - Agentes autores utilizando qualquer estrutura : integre qualquer agente existente utilizando a interface ResponsesAgentpara obter compatibilidade imediata com AI Playground, Avaliação de Agentes e Monitoramento de Agentes.
- Interfaces de criação digitadas : Escreva o código do agente usando classes Python digitadas, beneficiando-se do preenchimento automático do IDE e do Notebook.
- ResponsesAgentI nferência automática de assinatura : o MLflow infere automaticamente assinaturas de um agente quando registra um agente, simplificando o registro e a implantação. Consulte Inferir assinatura do modelo durante o registro.
- Rastreamento automático : O MLflow rastreia automaticamente as funções predictepredict_stream, agregando respostas de transmissão para facilitar a avaliação e a exibição.
- AI Tabelas de inferência aprimoradas pelo gateway : AI As tabelas de inferência do gateway são ativadas automaticamente para agentes implantados, fornecendo acesso a solicitações detalhadas log metadados.
 
- Agentes autores utilizando qualquer estrutura : integre qualquer agente existente utilizando a interface 
Para aprender como criar um ResponsesAgent, consulte os exemplos na seção a seguir e a documentaçãoMLflow - ResponsesAgent para servindo modelo.
Exemplos deResponsesAgent
O Notebook a seguir demonstra como criar transmissão e não transmissão ResponsesAgent utilizando uma biblioteca popular. Para saber como expandir os recursos desses agentes, consulte as ferramentas do agenteAI.
- OpenAI
- LangGraph
- DSPy
Agente de bate-papo simples da OpenAI usando modelos hospedados na Databricks
Agente de chamada de ferramentas OpenAI utilizando modelos hospedados pela Databricks
Agente de chamada de ferramentas OpenAI usando modelos hospedados no OpenAI
Agente de chamada de ferramentas LangGraph
Agente de chamada de ferramenta de volta única DSPy
Exemplo de multiagente
Para aprender como criar um sistema multiagente, consulte Utilizar o Genie em sistemas multiagentes.
E se eu já tiver um agente?
Se o senhor já tiver um agente criado com LangChain, LangGraph ou uma estrutura semelhante, não precisará reescrever o agente para usá-lo no Databricks. Em vez disso, basta envolver seu agente existente com a interface ResponsesAgent do MLflow:
- 
Escreva uma classe envolvente em Python que herda de mlflow.pyfunc.ResponsesAgent.Dentro da classe wrapper, faça referência ao agente existente como um atributo self.agent = your_existing_agent.
- 
A ResponsesAgentclasse requer a implementação de umpredictmétodo que retorne umResponsesAgentResponsepara lidar com solicitações de não transmissão. Veja a seguir um exemplo do esquemaResponsesAgentResponses:Pythonimport uuid
 # input as a dict
 {"input": [{"role": "user", "content": "What did the data scientist say when their Spark job finally completed?"}]}
 # output example
 ResponsesAgentResponse(
 output=[
 {
 "type": "message",
 "id": str(uuid.uuid4()),
 "content": [{"type": "output_text", "text": "Well, that really sparked joy!"}],
 "role": "assistant",
 },
 ]
 )
- 
Na função predict, converta as mensagens recebidas deResponsesAgentRequestno formato esperado pelo agente. Depois que o agente gerar uma resposta, converta sua saída em um objetoResponsesAgentResponse.
Veja os exemplos de código a seguir para ver como converter agentes existentes em ResponsesAgent:
- Basic conversion
- Streaming with code re-use
- Migrate from ChatCompletions
Para agentes não transmissíveis, converta as entradas e saídas na função “ predict ”.
from uuid import uuid4
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
    ResponsesAgentRequest,
    ResponsesAgentResponse,
)
class MyWrappedAgent(ResponsesAgent):
    def __init__(self, agent):
        # Reference your existing agent
        self.agent = agent
    def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
        # Convert incoming messages to your agent's format
        # prep_msgs_for_llm is a function you write to convert the incoming messages
        messages = self.prep_msgs_for_llm([i.model_dump() for i in request.input])
        # Call your existing agent (non-streaming)
        agent_response = self.agent.invoke(messages)
        # Convert your agent's output to ResponsesAgent format, assuming agent_response is a str
        output_item = (self.create_text_output_item(text=agent_response, id=str(uuid4())),)
        # Return the response
        return ResponsesAgentResponse(output=[output_item])
Para agentes de transmissão, é possível ser inteligente e reutilizar a lógica para evitar a duplicação do código que converte mensagens:
from typing import Generator
from uuid import uuid4
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
    ResponsesAgentRequest,
    ResponsesAgentResponse,
    ResponsesAgentStreamEvent,
)
class MyWrappedStreamingAgent(ResponsesAgent):
    def __init__(self, agent):
        # Reference your existing agent
        self.agent = agent
    def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
        """Non-streaming predict: collects all streaming chunks into a single response."""
        # Reuse the streaming logic and collect all output items
        output_items = []
        for stream_event in self.predict_stream(request):
            if stream_event.type == "response.output_item.done":
                output_items.append(stream_event.item)
        # Return all collected items as a single response
        return ResponsesAgentResponse(output=output_items)
    def predict_stream(
        self, request: ResponsesAgentRequest
    ) -> Generator[ResponsesAgentStreamEvent, None, None]:
        """Streaming predict: the core logic that both methods use."""
        # Convert incoming messages to your agent's format
        # prep_msgs_for_llm is a function you write to convert the incoming messages, included in full examples linked below
        messages = self.prep_msgs_for_llm([i.model_dump() for i in request.input])
        # Stream from your existing agent
        item_id = str(uuid4())
        aggregated_stream = ""
        for chunk in self.agent.stream(messages):
            # Convert each chunk to ResponsesAgent format
            yield self.create_text_delta(delta=chunk, item_id=item_id)
            aggregated_stream += chunk
        # Emit an aggregated output_item for all the text deltas with id=item_id
        yield ResponsesAgentStreamEvent(
            type="response.output_item.done",
            item=self.create_text_output_item(text=aggregated_stream, id=item_id),
        )
Se o seu agente atual utiliza a API ChatCompletions da OpenAI, é possível migrá-lo para o OpenAI Chatbot sem alterar sua lógica principal. ResponsesAgent sem reescrever sua lógica principal. Adicione um invólucro que:
- Converte as mensagens ResponsesAgentRequestrecebidas no formatoChatCompletionsque seu agente espera.
- Traduz as saídas ChatCompletionsno esquemaResponsesAgentResponse.
- Opcionalmente, suporta transmissão por meio do mapeamento de deltas incrementais de ChatCompletionsparaResponsesAgentStreamEventobjetos.
from typing import Generator
from uuid import uuid4
from databricks.sdk import WorkspaceClient
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
    ResponsesAgentRequest,
    ResponsesAgentResponse,
    ResponsesAgentStreamEvent,
)
# Legacy agent that outputs ChatCompletions objects
class LegacyAgent:
    def __init__(self):
        self.w = WorkspaceClient()
        self.OpenAI = self.w.serving_endpoints.get_open_ai_client()
    def stream(self, messages):
        for chunk in self.OpenAI.chat.completions.create(
            model="databricks-claude-3-7-sonnet",
            messages=messages,
            stream=True,
        ):
            yield chunk.to_dict()
# Wrapper that converts the legacy agent to a ResponsesAgent
class MyWrappedStreamingAgent(ResponsesAgent):
    def __init__(self, agent):
        # `agent` is your existing ChatCompletions agent
        self.agent = agent
    def prep_msgs_for_llm(self, messages):
        # dummy example of prep_msgs_for_llm
        # real example of prep_msgs_for_llm included in full examples linked below
        return [{"role": "user", "content": "Hello, how are you?"}]
    def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
        """Non-streaming predict: collects all streaming chunks into a single response."""
        # Reuse the streaming logic and collect all output items
        output_items = []
        for stream_event in self.predict_stream(request):
            if stream_event.type == "response.output_item.done":
                output_items.append(stream_event.item)
        # Return all collected items as a single response
        return ResponsesAgentResponse(output=output_items)
    def predict_stream(
        self, request: ResponsesAgentRequest
    ) -> Generator[ResponsesAgentStreamEvent, None, None]:
        """Streaming predict: the core logic that both methods use."""
        # Convert incoming messages to your agent's format
        messages = self.prep_msgs_for_llm([i.model_dump() for i in request.input])
        # process the ChatCompletion output stream
        agent_content = ""
        tool_calls = []
        msg_id = None
        for chunk in self.agent.stream(messages):  # call the underlying agent's stream method
            delta = chunk["choices"][0]["delta"]
            msg_id = chunk.get("id", None)
            content = delta.get("content", None)
            if tc := delta.get("tool_calls"):
                if not tool_calls:  # only accommodate for single tool call right now
                    tool_calls = tc
                else:
                    tool_calls[0]["function"]["arguments"] += tc[0]["function"]["arguments"]
            elif content is not None:
                agent_content += content
                yield ResponsesAgentStreamEvent(**self.create_text_delta(content, item_id=msg_id))
        # aggregate the streamed text content
        yield ResponsesAgentStreamEvent(
            type="response.output_item.done",
            item=self.create_text_output_item(agent_content, msg_id),
        )
        for tool_call in tool_calls:
            yield ResponsesAgentStreamEvent(
                type="response.output_item.done",
                item=self.create_function_call_item(
                    str(uuid4()),
                    tool_call["id"],
                    tool_call["function"]["name"],
                    tool_call["function"]["arguments"],
                ),
            )
agent = MyWrappedStreamingAgent(LegacyAgent())
for chunk in agent.predict_stream(
    ResponsesAgentRequest(input=[{"role": "user", "content": "Hello, how are you?"}])
):
    print(chunk)
Para obter exemplos completos, consulte ResponsesAgent examples.
respostas de transmissão
A transmissão permite que os agentes enviem respostas em partes em tempo real, em vez de aguardar a resposta completa. Para implementar transmissão com ResponsesAgent, emita uma série de eventos delta seguidos por um evento final de conclusão:
- Emitir eventos delta : enviar vários eventos com o mesmo delta para transmitir trechos de texto em tempo real. output_text.deltaeventos com o mesmoitem_idpara transmitir trechos de texto em tempo real.
- Concluir com o evento concluído : envie um evento response.output_item.donefinal com o mesmoitem_iddos eventos delta contendo o texto final completo da saída.
Cada evento delta transmite um fragmento de texto para o cliente. O evento final concluído contém o texto completo da resposta e sinaliza ao Databricks para realizar as seguintes ações:
- Rastreie a saída do seu agente com o rastreamento do MLflow
- Agregar respostas de transmissão em tabelas de inferência de gateway d AI
- Mostrar a saída completa na interface do usuário do AI Playground
transmissão propagação de erros
Mosaic AI propaga quaisquer erros encontrados durante a transmissão com os últimos tokens em databricks_output.error. Cabe ao cliente chamador tratar e revelar adequadamente esse erro.
{
  "delta": …,
  "databricks_output": {
    "trace": {...},
    "error": {
      "error_code": BAD_REQUEST,
      "message": "TimeoutException: Tool XYZ failed to execute."
    }
  }
}
Recurso avançado
Entradas e saídas personalizadas
Alguns cenários podem exigir entradas adicionais do agente, como client_type e session_id, ou saídas como links de origem de recuperação que não devem ser incluídos no histórico do chat para interações futuras.
Para esses cenários, o MLflow ResponsesAgent suporta nativamente os campos custom_inputs e custom_outputs. É possível acessar as entradas personalizadas através de request.custom_inputs em todos os exemplos vinculados acima em Exemplos do ResponsesAgent.
O aplicativo de análise de avaliação de agentes não oferece suporte à renderização de rastreamentos para agentes com campos de entrada adicionais.
Consulte o Notebook a seguir para saber como definir entradas e saídas personalizadas.
Forneça um custom_inputs o no AI Playground e avalie o aplicativo.
Se o seu agente aceitar entradas adicionais usando o campo “ custom_inputs ”, você poderá fornecer essas entradas manualmente no AI Playground e no aplicativo de revisão.
- 
No AI Playground ou no aplicativo Agent Review, selecione o ícone de engrenagem . 
- 
Habilite custom_inputs . 
- 
Forneça um objeto JSON que corresponda ao esquema de entrada definido pelo seu agente.  
Especifique esquemas de recuperação personalizados
AI Os agentes geralmente usam recuperadores para localizar e consultar dados não estruturados de índices de pesquisa de vetores. Por exemplo, ferramentas de recuperação, consulte Criar e rastrear ferramentas de recuperação para dados não estruturados.
Rastreie esses recuperadores dentro de seu agente com MLflow RETRIEVER spans para permitir Databricks produto recurso, incluindo:
- Exibição automática de links para documentos de origem recuperados na interface do usuário do AI Playground
- Executando automaticamente juízes de fundamentação e relevância de recuperação na Avaliação de Agentes
Databricks recomenda o uso de ferramentas retriever fornecidas por Databricks AI Bridge pacote como databricks_langchain.VectorSearchRetrieverTool e databricks_openai.VectorSearchRetrieverTool porque elas já estão em conformidade com o esquema retriever MLflow. Consulte Desenvolver localmente ferramentas de recuperação do Vector Search com o AI Bridge.
Se seu agente incluir extensões de retriever com um esquema personalizado, chame mlflow.models.set_retriever_schema ao definir seu agente no código. Isso mapeia as colunas de saída do seu recuperador para os campos esperados do MLflow (primary_key, text_column, doc_uri).
import mlflow
# Define the retriever's schema by providing your column names
# For example, the following call specifies the schema of a retriever that returns a list of objects like
# [
#     {
#         'document_id': '9a8292da3a9d4005a988bf0bfdd0024c',
#         'chunk_text': 'MLflow is an open-source platform, purpose-built to assist machine learning practitioners...',
#         'doc_uri': 'https://mlflow.org/docs/latest/index.html',
#         'title': 'MLflow: A Tool for Managing the Machine Learning Lifecycle'
#     },
#     {
#         'document_id': '7537fe93c97f4fdb9867412e9c1f9e5b',
#         'chunk_text': 'A great way to get started with MLflow is to use the autologging feature. Autologging automatically logs your model...',
#         'doc_uri': 'https://mlflow.org/docs/latest/getting-started/',
#         'title': 'Getting Started with MLflow'
#     },
# ...
# ]
mlflow.models.set_retriever_schema(
    # Specify the name of your retriever span
    name="mlflow_docs_vector_search",
    # Specify the output column name to treat as the primary key (ID) of each retrieved document
    primary_key="document_id",
    # Specify the output column name to treat as the text content (page content) of each retrieved document
    text_column="chunk_text",
    # Specify the output column name to treat as the document URI of each retrieved document
    doc_uri="doc_uri",
    # Specify any other columns returned by the retriever
    other_columns=["title"],
)
A coluna doc_uri é especialmente importante ao avaliar o desempenho do retriever. doc_uri é o identificador principal dos documentos retornados pelo recuperador, permitindo compará-los com os conjuntos de avaliação da verdade básica. Consulte Conjuntos de avaliação (MLflow 2).
Considerações de implantação
Prepare-se para um Databricks, servindo modelo
Databricks implantado ResponsesAgentem um ambiente distribuído em um servidor Databricks servindo modelo. Isso significa que, durante uma conversa de vários turnos, a mesma réplica em serviço pode não lidar com todas as solicitações. Preste atenção às seguintes implicações para gerenciar o estado do agente:
- 
Evite o armazenamento em cache local : ao implementar uma ResponsesAgent, não presuma que a mesma réplica lida com todas as solicitações em uma conversa com várias voltas. Reconstrua o estado interno usando um esquemaResponsesAgentRequestdo dicionário para cada turno.
- 
Estado de thread-safe: projete o estado do agente para que seja seguro para thread-safe, evitando conflitos em ambientes com vários segmentos. 
- 
Inicializar estado na função predict: Inicialize o estado sempre que a funçãopredictfor chamada, não durante a inicializaçãoResponsesAgent. Armazenar o estado noResponsesAgentpode causar vazamento de informações entre conversas e conflitos, pois uma única réplica pode processar solicitações de várias conversas.ResponsesAgentréplica pode lidar com solicitações de várias conversas.
Parametrize o código para implantação em todos os ambientes
Parametrize o código do agente para reutilizar o mesmo código do agente em diferentes ambientes.
Os parâmetros são pares chave-valor que você define em um dicionário no Python ou em um arquivo .yaml.
Para configurar o código, crie um arquivo ModelConfig usando um dicionário Python ou um arquivo .yaml. ModelConfig é um conjunto de parâmetros de valor e keyes que permite um gerenciamento flexível da configuração. Por exemplo, é possível utilizar um dicionário durante o desenvolvimento e, em seguida, convertê-lo em um arquivo .yaml para implantação em produção e CI/CD.
Um exemplo ModelConfig é mostrado abaixo:
llm_parameters:
  max_tokens: 500
  temperature: 0.01
model_serving_endpoint: databricks-meta-llama-3-3-70b-instruct
vector_search_index: ml.docs.databricks_docs_index
prompt_template: 'You are a hello world bot. Respond with a reply to the user''s
  question that indicates your prompt template came from a YAML file. Your response
  must use the word "YAML" somewhere. User''s question: {question}'
prompt_template_input_vars:
  - question
No código do agente, o senhor pode fazer referência a uma configuração default (desenvolvimento) do arquivo ou dicionário .yaml:
import mlflow
# Example for loading from a .yml file
config_file = "configs/hello_world_config.yml"
model_config = mlflow.models.ModelConfig(development_config=config_file)
# Example of using a dictionary
config_dict = {
    "prompt_template": "You are a hello world bot. Respond with a reply to the user's question that is fun and interesting to the user. User's question: {question}",
    "prompt_template_input_vars": ["question"],
    "model_serving_endpoint": "databricks-meta-llama-3-3-70b-instruct",
    "llm_parameters": {"temperature": 0.01, "max_tokens": 500},
}
model_config = mlflow.models.ModelConfig(development_config=config_dict)
# Use model_config.get() to retrieve a parameter value
# You can also use model_config.to_dict() to convert the loaded config object
# into a dictionary
value = model_config.get('sample_param')
Em seguida, ao registrar seu agente, especifique o parâmetro model_config para log_model
, especifique um conjunto personalizado de parâmetros a serem usados ao carregar o agente de logs. Consulte a documentação do MLflow em - ModelConfig.
Use código síncrono ou padrões de retorno de chamada
Para garantir estabilidade e compatibilidade, use código síncrono ou padrões baseados em retorno de chamada na implementação do seu agente.
Databricks Gerenciar automaticamente a comunicação assíncrona para fornecer concorrência e desempenho ideais ao implantar um agente. A introdução de loops de eventos personalizados ou estruturas assíncronas pode levar a erros como RuntimeError: This event loop is already running and caused unpredictable behavior.
A Databricks recomenda evitar a programação assíncrona, como o uso do asyncio ou a criação de loops de eventos personalizados, ao desenvolver agentes.