Agentes AI com estado
Beta
Este recurso está em Beta.
Agentes AI com estado mantêm o contexto em todas as interações usando IDs de thread para rastrear threads. O ponto de verificação permite que você salve um agente em um estado específico e a viagem do tempo permite que você reproduza conversas desses estados. Isso ajuda você a entender o processo de tomada de decisão para agentes LLM não determinísticos e fazer o seguinte:
- Observe os agentes : analise exatamente o que o agente sabia e fez em cada passo
- Erros de depuração : identifique onde e por que ocorreram erros no fluxo da conversa
- Explorar alternativas : repita e teste diferentes caminhos de conversação a partir dos pontos de verificação
Esta página mostra como criar agentes com estado usando o Mosaic AI Agent Framework e o LangGraph com o Lakebase como armazenamento de memória.
Requisitos
Para criar agentes com estado, você precisa:
- Uma instância do Lakebase configurada, consulte Criar e gerenciar uma instância de banco de dados.
Caderno de exemplo
O Notebook a seguir usa os conceitos desta página para implementar um agente com estado usando o Lakebase:
Agente com estado com memória com escopo de thread
Implementar LangGraph viagem do tempo
Use o LangGraph viagem do tempo para retomar a execução a partir dos pontos de verificação. Você pode repetir a conversa ou modificá-la para explorar caminhos alternativos. Cada vez que você retoma um ponto de verificação, o LangGraph cria uma nova bifurcação na história da conversa, preservando o original e permitindo a experimentação.
-
No código do agente, crie funções que recuperem o histórico do ponto de verificação e atualizem o estado do ponto de verificação na classe
LangGraphResponsesAgent
:Pythonfrom typing import List, Dict
def get_checkpoint_history(self, thread_id: str, limit: int = 10) -> List[Dict[str, Any]]:
"""Retrieve checkpoint history for a thread.
Args:
thread_id: The thread identifier
limit: Maximum number of checkpoints to return
Returns:
List of checkpoint information including checkpoint_id, timestamp, and next nodes
"""
config = {"configurable": {"thread_id": thread_id}}
with self.get_connection() as conn:
checkpointer = PostgresSaver(conn)
graph = self._create_graph(checkpointer)
history = []
for state in graph.get_state_history(config):
if len(history) >= limit:
break
history.append({
"checkpoint_id": state.config["configurable"]["checkpoint_id"],
"thread_id": thread_id,
"timestamp": state.created_at,
"next_nodes": state.next,
"message_count": len(state.values.get("messages", [])),
# Include last message summary for context
"last_message": self._get_last_message_summary(state.values.get("messages", []))
})
return history
def _get_last_message_summary(self, messages: List[Any]) -> Optional[str]:
"""Get a snippet of the last message for checkpoint identification"""
return getattr(messages[-1], "content", "")[:100] if messages else None
def update_checkpoint_state(self, thread_id: str, checkpoint_id: str,
new_messages: Optional[List[Dict]] = None) -> Dict[str, Any]:
"""Update state at a specific checkpoint (used for modifying conversation history).
Args:
thread_id: The thread identifier
checkpoint_id: The checkpoint to update
new_messages: Optional new messages to set at this checkpoint
Returns:
New checkpoint configuration including the new checkpoint_id
"""
config = {
"configurable": {
"thread_id": thread_id,
"checkpoint_id": checkpoint_id
}
}
with self.get_connection() as conn:
checkpointer = PostgresSaver(conn)
graph = self._create_graph(checkpointer)
# Prepare the values to update
values = {}
if new_messages:
cc_msgs = self.prep_msgs_for_cc_llm(new_messages)
values["messages"] = cc_msgs
# Update the state (creates a new checkpoint)
new_config = graph.update_state(config, values=values)
return {
"thread_id": thread_id,
"checkpoint_id": new_config["configurable"]["checkpoint_id"],
"parent_checkpoint_id": checkpoint_id
} -
Atualize as funções
predict
epredict_stream
para oferecer suporte à passagem de pontos de verificação:
- Predict
- Predict_stream
def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
"""Non-streaming prediction"""
# The same thread_id is used by BOTH predict() and predict_stream()
ci = dict(request.custom_inputs or {})
if "thread_id" not in ci:
ci["thread_id"] = str(uuid.uuid4())
request.custom_inputs = ci
outputs = [
event.item
for event in self.predict_stream(request)
if event.type == "response.output_item.done"
]
# Include thread_id and checkpoint_id in custom outputs
custom_outputs = {
"thread_id": ci["thread_id"]
}
if "checkpoint_id" in ci:
custom_outputs["parent_checkpoint_id"] = ci["checkpoint_id"]
try:
history = self.get_checkpoint_history(ci["thread_id"], limit=1)
if history:
custom_outputs["checkpoint_id"] = history[0]["checkpoint_id"]
except Exception as e:
logger.warning(f"Could not retrieve new checkpoint_id: {e}")
return ResponsesAgentResponse(output=outputs, custom_outputs=custom_outputs)
def predict_stream(
self,
request: ResponsesAgentRequest,
) -> Generator[ResponsesAgentStreamEvent, None, None]:
"""Streaming prediction with PostgreSQL checkpoint branching support.
Accepts in custom_inputs:
- thread_id: Conversation thread identifier for session
- checkpoint_id (optional): Checkpoint to resume from (for branching)
"""
# Get thread ID and checkpoint ID from custom inputs
custom_inputs = request.custom_inputs or {}
thread_id = custom_inputs.get("thread_id", str(uuid.uuid4())) # generate new thread ID if one is not passed in
checkpoint_id = custom_inputs.get("checkpoint_id") # Optional for branching
# Convert incoming Responses messages to LangChain format
langchain_msgs = self.prep_msgs_for_cc_llm([i.model_dump() for i in request.input])
# Build checkpoint configuration
checkpoint_config = {"configurable": {"thread_id": thread_id}}
# If checkpoint_id is provided, we're branching from that checkpoint
if checkpoint_id:
checkpoint_config["configurable"]["checkpoint_id"] = checkpoint_id
logger.info(f"Branching from checkpoint: {checkpoint_id} in thread: {thread_id}")
# DATABASE CONNECTION POOLING LOGIC FOLLOWS
# Use connection from pool
Em seguida, teste a ramificação do seu ponto de verificação:
-
iniciar um tópico de conversa e adicionar algumas mensagens:
Pythonfrom agent import AGENT
# Initial conversation - starts a new thread
response1 = AGENT.predict({
"input": [{"role": "user", "content": "I'm planning for an upcoming trip!"}],
})
print(response1.model_dump(exclude_none=True))
thread_id = response1.custom_outputs["thread_id"]
# Within the same thread, ask a follow-up question - thread-scoped memory will remember previous messages in the same thread/conversation session
response2 = AGENT.predict({
"input": [{"role": "user", "content": "I'm headed to SF!"}],
"custom_inputs": {"thread_id": thread_id}
})
print(response2.model_dump(exclude_none=True))
# Within the same thread, ask a follow-up question - thread-scoped memory will remember previous messages in the same thread/conversation session
response3 = AGENT.predict({
"input": [{"role": "user", "content": "Where did I say I'm going?"}],
"custom_inputs": {"thread_id": thread_id}
})
print(response3.model_dump(exclude_none=True)) -
Recupere o histórico do ponto de verificação e bifurque a conversa com uma mensagem diferente:
Python# Get checkpoint history to find branching point
history = AGENT.get_checkpoint_history(thread_id, 20)
# Retrieve checkpoint at index - indices count backward from most recent checkpoint
index = max(1, len(history) - 4)
branch_checkpoint = history[index]["checkpoint_id"]
# Branch from node with next_node = `('__start__',)` to re-input message to agent at certain part of conversation
# I want to update the information of which city I am going to
# Within the same thread, branch from a checkpoint and override it with different context to continue the conversation in a new fork
response4 = AGENT.predict({
"input": [{"role": "user", "content": "I'm headed to New York!"}],
"custom_inputs": {
"thread_id": thread_id,
"checkpoint_id": branch_checkpoint # Branch from this checkpoint!
}
})
print(response4.model_dump(exclude_none=True))
# Thread ID stays the same even though it branched from a checkpoint:
branched_thread_id = response4.custom_outputs["thread_id"]
print(f"original thread id was {thread_id}")
print(f"new thread id after branching is the same as original: {branched_thread_id}")
# Continue the conversation in the same thread and it will pick up from the information you tell it in your branch
response5 = AGENT.predict({
"input": [{"role": "user", "content": "Where am I going?"}],
"custom_inputs": {
"thread_id": thread_id,
}
})
print(response5.model_dump(exclude_none=True))