Autor AI agentes em código
Esta página demonstra como criar um agente de AI no Python utilizando a estrutura de agentes Mosaic AI e bibliotecas populares de criação de agentes, como LangGraph e OpenAI.
Requisitos
A Databricks recomenda instalar a versão mais recente do cliente MLflow Python ao desenvolver agentes.
Para criar e implantar agentes usando a abordagem desta página, instale o seguinte:
databricks-agents
1.2.0 ou superiormlflow
3.1.3 ou acima- Python 3,10 ou acima.
- Utilize serverless, compute ou Databricks Runtime 13.3 LTS ou acima para cumprir este requisito.
%pip install -U -qqqq databricks-agents mlflow
Databricks Também recomendamos a instalação do pacote de integração Databricks AI Bridge para criar agentes. Esses pacotes de integração fornecem uma camada compartilhada de APIs que interage com Databricks AI recurso, como Databricks AI/BI Genie e Vector Search, em estruturas de criação de agentes e SDKs.
- LangChain/LangGraph
- OpenAI
- Pure Python agents
%pip install -U -qqqq databricks-langchain
%pip install -U -qqqq databricks-openai
%pip install -U -qqqq databricks-ai-bridge
Use ResponsesAgent
para criar agentes
A Databricks recomenda a interface MLflow ResponsesAgent
para criar agentes de nível de produção. ResponsesAgent
permite que você crie agentes com qualquer estrutura de terceiros e, em seguida, integre-os ao recurso Databricks AI para obter recursos robustos de registro, rastreamento, avaliação, implantação e monitoramento.
O esquema ReponsesAgent
é compatível com o esquema OpenAI Responses
. Para saber mais sobre o OpenAI Responses
, consulte OpenAI: Responses vs. ChatCompletion.
A interface mais antiga ChatAgent
ainda é compatível com o Databricks. No entanto, para novos agentes, a Databricks recomenda utilizar a versão mais recente do MLflow e a interface ResponsesAgent
.
Consulte o esquema do agente de entrada e saída legado.
ResponsesAgent
oferece os seguintes benefícios:
-
Capacidades avançadas de agente
- Suporte multiagente
- transmissão saída : transmissão a saída em partes menores.
- Histórico abrangente de mensagens de tool-calling : Retorna várias mensagens, inclusive mensagens intermediárias de chamadas de ferramentas, para melhorar a qualidade e o gerenciamento de conversas.
- Suporte para confirmação de chamadas de ferramentas
- Suporte de ferramentas de longa duração
-
Desenvolvimento, implantação e monitoramento simplificados
- Agentes autores utilizando qualquer estrutura : integre qualquer agente existente utilizando a interface
ResponsesAgent
para obter compatibilidade imediata com AI Playground, Avaliação de Agentes e Monitoramento de Agentes. - Interfaces de criação digitadas : Escreva o código do agente usando classes Python digitadas, beneficiando-se do preenchimento automático do IDE e do Notebook.
ResponsesAgent
I nferência automática de assinatura : o MLflow infere automaticamente assinaturas de um agente quando registra um agente, simplificando o registro e a implantação. Consulte Inferir assinatura do modelo durante o registro.- Rastreamento automático : O MLflow rastreia automaticamente as funções
predict
epredict_stream
, agregando respostas de transmissão para facilitar a avaliação e a exibição. - AI Tabelas de inferência aprimoradas pelo gateway : AI As tabelas de inferência do gateway são ativadas automaticamente para agentes implantados, fornecendo acesso a solicitações detalhadas log metadados.
- Agentes autores utilizando qualquer estrutura : integre qualquer agente existente utilizando a interface
Para aprender como criar um ResponsesAgent
, consulte os exemplos na seção a seguir e a documentaçãoMLflow - ResponsesAgent para servindo modelo.
Exemplos deResponsesAgent
O Notebook a seguir demonstra como criar transmissão e não transmissão ResponsesAgent
utilizando uma biblioteca popular. Para saber como expandir os recursos desses agentes, consulte as ferramentas do agenteAI.
- LangGraph
- OpenAI
Agente de chamada de ferramentas LangGraph
Agente de bate-papo simples da OpenAI usando modelos hospedados na Databricks
Agente de chamada de ferramentas OpenAI utilizando modelos hospedados pela Databricks
Agente de chamada de ferramentas OpenAI usando modelos hospedados no OpenAI
Exemplo de multiagente
Para aprender como criar um sistema multiagente, consulte Utilizar o Genie em sistemas multiagentes.
E se eu já tiver um agente?
Se o senhor já tiver um agente criado com LangChain, LangGraph ou uma estrutura semelhante, não precisará reescrever o agente para usá-lo no Databricks. Em vez disso, basta envolver seu agente existente com a interface ResponsesAgent
do MLflow:
-
Escreva uma classe envolvente em Python que herda de
mlflow.pyfunc.ResponsesAgent
.Dentro da classe wrapper, faça referência ao agente existente como um atributo
self.agent = your_existing_agent
. -
A
ResponsesAgent
classe requer a implementação de umpredict
método que retorne umResponsesAgentResponse
para lidar com solicitações de não transmissão. Veja a seguir um exemplo do esquemaResponsesAgentResponses
:Pythonimport uuid
# input as a dict
{"input": [{"role": "user", "content": "What did the data scientist say when their Spark job finally completed?"}]}
# output example
ResponsesAgentResponse(
output=[
{
"type": "message",
"id": str(uuid.uuid4()),
"content": [{"type": "output_text", "text": "Well, that really sparked joy!"}],
"role": "assistant",
},
]
) -
Na função
predict
, converta as mensagens recebidas deResponsesAgentRequest
no formato esperado pelo agente. Depois que o agente gerar uma resposta, converta sua saída em um objetoResponsesAgentResponse
.
Veja os exemplos de código a seguir para ver como converter agentes existentes em ResponsesAgent
:
- Basic conversion
- Streaming with code re-use
- Migrate from ChatCompletions
Para agentes não transmissíveis, converta as entradas e saídas na função “ predict
”.
from uuid import uuid4
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
ResponsesAgentRequest,
ResponsesAgentResponse,
)
class MyWrappedAgent(ResponsesAgent):
def __init__(self, agent):
# Reference your existing agent
self.agent = agent
def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
# Convert incoming messages to your agent's format
# prep_msgs_for_llm is a function you write to convert the incoming messages
messages = self.prep_msgs_for_llm([i.model_dump() for i in request.input])
# Call your existing agent (non-streaming)
agent_response = self.agent.invoke(messages)
# Convert your agent's output to ResponsesAgent format, assuming agent_response is a str
output_item = (self.create_text_output_item(text=agent_response, id=str(uuid4())),)
# Return the response
return ResponsesAgentResponse(output=[output_item])
Para agentes de transmissão, é possível ser inteligente e reutilizar a lógica para evitar a duplicação do código que converte mensagens:
from typing import Generator
from uuid import uuid4
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
ResponsesAgentRequest,
ResponsesAgentResponse,
ResponsesAgentStreamEvent,
)
class MyWrappedStreamingAgent(ResponsesAgent):
def __init__(self, agent):
# Reference your existing agent
self.agent = agent
def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
"""Non-streaming predict: collects all streaming chunks into a single response."""
# Reuse the streaming logic and collect all output items
output_items = []
for stream_event in self.predict_stream(request):
if stream_event.type == "response.output_item.done":
output_items.append(stream_event.item)
# Return all collected items as a single response
return ResponsesAgentResponse(output=output_items)
def predict_stream(
self, request: ResponsesAgentRequest
) -> Generator[ResponsesAgentStreamEvent, None, None]:
"""Streaming predict: the core logic that both methods use."""
# Convert incoming messages to your agent's format
# prep_msgs_for_llm is a function you write to convert the incoming messages, included in full examples linked below
messages = self.prep_msgs_for_llm([i.model_dump() for i in request.input])
# Stream from your existing agent
item_id = str(uuid4())
aggregated_stream = ""
for chunk in self.agent.stream(messages):
# Convert each chunk to ResponsesAgent format
yield self.create_text_delta(delta=chunk, item_id=item_id)
aggregated_stream += chunk
# Emit an aggregated output_item for all the text deltas with id=item_id
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=self.create_text_output_item(text=aggregated_stream, id=item_id),
)
Se o seu agente atual utiliza a API ChatCompletions da OpenAI, é possível migrá-lo para o OpenAI Chatbot sem alterar sua lógica principal. ResponsesAgent
sem reescrever sua lógica principal. Adicione um invólucro que:
- Converte as mensagens
ResponsesAgentRequest
recebidas no formatoChatCompletions
que seu agente espera. - Traduz as saídas
ChatCompletions
no esquemaResponsesAgentResponse
. - Opcionalmente, suporta transmissão por meio do mapeamento de deltas incrementais de
ChatCompletions
paraResponsesAgentStreamEvent
objetos.
from typing import Generator
from uuid import uuid4
from databricks.sdk import WorkspaceClient
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
ResponsesAgentRequest,
ResponsesAgentResponse,
ResponsesAgentStreamEvent,
)
# Legacy agent that outputs ChatCompletions objects
class LegacyAgent:
def __init__(self):
self.w = WorkspaceClient()
self.OpenAI = self.w.serving_endpoints.get_open_ai_client()
def stream(self, messages):
for chunk in self.OpenAI.chat.completions.create(
model="databricks-claude-3-7-sonnet",
messages=messages,
stream=True,
):
yield chunk.to_dict()
# Wrapper that converts the legacy agent to a ResponsesAgent
class MyWrappedStreamingAgent(ResponsesAgent):
def __init__(self, agent):
# `agent` is your existing ChatCompletions agent
self.agent = agent
def prep_msgs_for_llm(self, messages):
# dummy example of prep_msgs_for_llm
# real example of prep_msgs_for_llm included in full examples linked below
return [{"role": "user", "content": "Hello, how are you?"}]
def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
"""Non-streaming predict: collects all streaming chunks into a single response."""
# Reuse the streaming logic and collect all output items
output_items = []
for stream_event in self.predict_stream(request):
if stream_event.type == "response.output_item.done":
output_items.append(stream_event.item)
# Return all collected items as a single response
return ResponsesAgentResponse(output=output_items)
def predict_stream(
self, request: ResponsesAgentRequest
) -> Generator[ResponsesAgentStreamEvent, None, None]:
"""Streaming predict: the core logic that both methods use."""
# Convert incoming messages to your agent's format
messages = self.prep_msgs_for_llm([i.model_dump() for i in request.input])
# process the ChatCompletion output stream
agent_content = ""
tool_calls = []
msg_id = None
for chunk in self.agent.stream(messages): # call the underlying agent's stream method
delta = chunk["choices"][0]["delta"]
msg_id = chunk.get("id", None)
content = delta.get("content", None)
if tc := delta.get("tool_calls"):
if not tool_calls: # only accomodate for single tool call right now
tool_calls = tc
else:
tool_calls[0]["function"]["arguments"] += tc[0]["function"]["arguments"]
elif content is not None:
agent_content += content
yield ResponsesAgentStreamEvent(**self.create_text_delta(content, item_id=msg_id))
# aggregate the streamed text content
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=self.create_text_output_item(agent_content, msg_id),
)
for tool_call in tool_calls:
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=self.create_function_call_item(
str(uuid4()),
tool_call["id"],
tool_call["function"]["name"],
tool_call["function"]["arguments"],
),
)
agent = MyWrappedStreamingAgent(LegacyAgent())
for chunk in agent.predict_stream(
ResponsesAgentRequest(input=[{"role": "user", "content": "Hello, how are you?"}])
):
print(chunk)
Para obter exemplos completos, consulte ResponsesAgent
examples.
respostas de transmissão
A transmissão permite que os agentes enviem respostas em partes em tempo real, em vez de aguardar a resposta completa. Para implementar transmissão com ResponsesAgent
, emita uma série de eventos delta seguidos por um evento final de conclusão:
- Emitir eventos delta : enviar vários eventos com o mesmo delta para transmitir trechos de texto em tempo real.
output_text.delta
eventos com o mesmoitem_id
para transmitir trechos de texto em tempo real. - Concluir com o evento concluído : envie um evento
response.output_item.done
final com o mesmoitem_id
dos eventos delta contendo o texto final completo da saída.
Cada evento delta transmite um fragmento de texto para o cliente. O evento final concluído contém o texto completo da resposta e sinaliza ao Databricks para realizar as seguintes ações:
- Rastreie a saída do seu agente com o rastreamento do MLflow
- Agregar respostas de transmissão em tabelas de inferência de gateway d AI
- Mostrar a saída completa na interface do usuário do AI Playground
transmissão propagação de erros
Mosaic AI propaga quaisquer erros encontrados durante a transmissão com os últimos tokens em databricks_output.error
. Cabe ao cliente chamador tratar e revelar adequadamente esse erro.
{
"delta": …,
"databricks_output": {
"trace": {...},
"error": {
"error_code": BAD_REQUEST,
"message": "TimeoutException: Tool XYZ failed to execute."
}
}
}
Recurso avançado
Entradas e saídas personalizadas
Alguns cenários podem exigir entradas adicionais do agente, como client_type
e session_id
, ou saídas como links de origem de recuperação que não devem ser incluídos no histórico do chat para interações futuras.
Para esses cenários, o MLflow ResponsesAgent
suporta nativamente os campos custom_inputs
e custom_outputs
. É possível acessar as entradas personalizadas através de request.custom_inputs
em todos os exemplos vinculados acima em Exemplos do ResponsesAgent.
O aplicativo de análise de avaliação de agentes não oferece suporte à renderização de rastreamentos para agentes com campos de entrada adicionais.
Consulte o Notebook a seguir para saber como definir entradas e saídas personalizadas.
Forneça um custom_inputs
o no AI Playground e avalie o aplicativo.
Se o seu agente aceitar entradas adicionais usando o campo “ custom_inputs
”, você poderá fornecer essas entradas manualmente no AI Playground e no aplicativo de revisão.
-
No AI Playground ou no aplicativo Agent Review, selecione o ícone de engrenagem
.
-
Habilite custom_inputs .
-
Forneça um objeto JSON que corresponda ao esquema de entrada definido pelo seu agente.
Especifique esquemas de recuperação personalizados
AI Os agentes geralmente usam recuperadores para localizar e consultar dados não estruturados de índices de pesquisa de vetores. Por exemplo, ferramentas de recuperação, consulte Criar e rastrear ferramentas de recuperação para dados não estruturados.
Rastreie esses recuperadores dentro de seu agente com MLflow RETRIEVER spans para permitir Databricks produto recurso, incluindo:
- Exibição automática de links para documentos de origem recuperados na interface do usuário do AI Playground
- Executando automaticamente juízes de fundamentação e relevância de recuperação na Avaliação de Agentes
Databricks recomenda o uso de ferramentas retriever fornecidas por Databricks AI Bridge pacote como databricks_langchain.VectorSearchRetrieverTool
e databricks_openai.VectorSearchRetrieverTool
porque elas já estão em conformidade com o esquema retriever MLflow. Consulte Desenvolver localmente ferramentas de recuperação do Vector Search com o AI Bridge.
Se seu agente incluir extensões de retriever com um esquema personalizado, chame mlflow.models.set_retriever_schema
ao definir seu agente no código. Isso mapeia as colunas de saída do seu recuperador para os campos esperados do MLflow (primary_key
, text_column
, doc_uri
).
import mlflow
# Define the retriever's schema by providing your column names
# For example, the following call specifies the schema of a retriever that returns a list of objects like
# [
# {
# 'document_id': '9a8292da3a9d4005a988bf0bfdd0024c',
# 'chunk_text': 'MLflow is an open-source platform, purpose-built to assist machine learning practitioners...',
# 'doc_uri': 'https://mlflow.org/docs/latest/index.html',
# 'title': 'MLflow: A Tool for Managing the Machine Learning Lifecycle'
# },
# {
# 'document_id': '7537fe93c97f4fdb9867412e9c1f9e5b',
# 'chunk_text': 'A great way to get started with MLflow is to use the autologging feature. Autologging automatically logs your model...',
# 'doc_uri': 'https://mlflow.org/docs/latest/getting-started/',
# 'title': 'Getting Started with MLflow'
# },
# ...
# ]
mlflow.models.set_retriever_schema(
# Specify the name of your retriever span
name="mlflow_docs_vector_search",
# Specify the output column name to treat as the primary key (ID) of each retrieved document
primary_key="document_id",
# Specify the output column name to treat as the text content (page content) of each retrieved document
text_column="chunk_text",
# Specify the output column name to treat as the document URI of each retrieved document
doc_uri="doc_uri",
# Specify any other columns returned by the retriever
other_columns=["title"],
)
A coluna doc_uri
é especialmente importante ao avaliar o desempenho do retriever. doc_uri
é o identificador principal dos documentos retornados pelo recuperador, permitindo compará-los com os conjuntos de avaliação da verdade básica. Consulte Conjuntos de avaliação (MLflow 2).
Considerações de implantação
Prepare-se para um Databricks, servindo modelo
Databricks implantado ResponsesAgent
em um ambiente distribuído em um servidor Databricks servindo modelo. Isso significa que, durante uma conversa de vários turnos, a mesma réplica em serviço pode não lidar com todas as solicitações. Preste atenção às seguintes implicações para gerenciar o estado do agente:
-
Evite o armazenamento em cache local : ao implementar uma
ResponsesAgent
, não presuma que a mesma réplica lida com todas as solicitações em uma conversa com várias voltas. Reconstrua o estado interno usando um esquemaResponsesAgentRequest
do dicionário para cada turno. -
Estado de thread-safe: projete o estado do agente para que seja seguro para thread-safe, evitando conflitos em ambientes com vários segmentos.
-
Inicializar estado na função
predict
: Inicialize o estado sempre que a funçãopredict
for chamada, não durante a inicializaçãoResponsesAgent
. Armazenar o estado noResponsesAgent
pode causar vazamento de informações entre conversas e conflitos, pois uma única réplica pode processar solicitações de várias conversas.ResponsesAgent
réplica pode lidar com solicitações de várias conversas.
Parametrize o código para implantação em todos os ambientes
Parametrize o código do agente para reutilizar o mesmo código do agente em diferentes ambientes.
Os parâmetros são pares chave-valor que você define em um dicionário no Python ou em um arquivo .yaml
.
Para configurar o código, crie um arquivo ModelConfig
usando um dicionário Python ou um arquivo .yaml
. ModelConfig
é um conjunto de parâmetros de valor e keyes que permite um gerenciamento flexível da configuração. Por exemplo, é possível utilizar um dicionário durante o desenvolvimento e, em seguida, convertê-lo em um arquivo .yaml
para implantação em produção e CI/CD.
Um exemplo ModelConfig
é mostrado abaixo:
llm_parameters:
max_tokens: 500
temperature: 0.01
model_serving_endpoint: databricks-meta-llama-3-3-70b-instruct
vector_search_index: ml.docs.databricks_docs_index
prompt_template: 'You are a hello world bot. Respond with a reply to the user''s
question that indicates your prompt template came from a YAML file. Your response
must use the word "YAML" somewhere. User''s question: {question}'
prompt_template_input_vars:
- question
No código do agente, o senhor pode fazer referência a uma configuração default (desenvolvimento) do arquivo ou dicionário .yaml
:
import mlflow
# Example for loading from a .yml file
config_file = "configs/hello_world_config.yml"
model_config = mlflow.models.ModelConfig(development_config=config_file)
# Example of using a dictionary
config_dict = {
"prompt_template": "You are a hello world bot. Respond with a reply to the user's question that is fun and interesting to the user. User's question: {question}",
"prompt_template_input_vars": ["question"],
"model_serving_endpoint": "databricks-meta-llama-3-3-70b-instruct",
"llm_parameters": {"temperature": 0.01, "max_tokens": 500},
}
model_config = mlflow.models.ModelConfig(development_config=config_dict)
# Use model_config.get() to retrieve a parameter value
# You can also use model_config.to_dict() to convert the loaded config object
# into a dictionary
value = model_config.get('sample_param')
Em seguida, ao registrar seu agente, especifique o parâmetro model_config
para log_model
, especifique um conjunto personalizado de parâmetros a serem usados ao carregar o agente de logs. Consulte a documentação do MLflow em - ModelConfig.
Use código síncrono ou padrões de retorno de chamada
Para garantir estabilidade e compatibilidade, use código síncrono ou padrões baseados em retorno de chamada na implementação do seu agente.
Databricks Gerenciar automaticamente a comunicação assíncrona para fornecer concorrência e desempenho ideais ao implantar um agente. A introdução de loops de eventos personalizados ou estruturas assíncronas pode levar a erros como RuntimeError: This event loop is already running and caused unpredictable behavior
.
A Databricks recomenda evitar a programação assíncrona, como o uso do asyncio ou a criação de loops de eventos personalizados, ao desenvolver agentes.