Pular para o conteúdo principal

Analisar vestígios

Esta página mostra padrões para analisar rastros em cenários do mundo real.

Erro

Monitore e analise erros em seu ambiente de produção:

Python
import mlflow
import time
import pandas as pd

def monitor_errors(experiment_name: str, hours: int = 1):
"""Monitor errors in the last N hours."""

# Calculate time window
current_time_ms = int(time.time() * 1000)
cutoff_time_ms = current_time_ms - (hours * 60 * 60 * 1000)

# Find all errors
failed_traces = mlflow.search_traces(
filter_string=f"attributes.status = 'ERROR' AND "
f"attributes.timestamp_ms > {cutoff_time_ms}",
order_by=["attributes.timestamp_ms DESC"]
)

if len(failed_traces) == 0:
print(f"No errors found in the last {hours} hour(s)")
return

# Analyze error patterns
print(f"Found {len(failed_traces)} errors in the last {hours} hour(s)\n")

# Group by function name
error_by_function = failed_traces.groupby('tags.mlflow.traceName').size()
print("Errors by function:")
print(error_by_function.to_string())

# Show recent error samples
print("\nRecent error samples:")
for _, trace in failed_traces.head(5).iterrows():
print(f"- {trace['request_preview'][:60]}...")
print(f" Function: {trace.get('tags.mlflow.traceName', 'unknown')}")
print(f" Time: {pd.to_datetime(trace['timestamp_ms'], unit='ms')}")
print()

return failed_traces

desempenho

Analisar as características de desempenho e identificar os gargalos:

Python
def profile_performance(function_name: str = None, percentiles: list = [50, 95, 99]):
"""Profile performance metrics for traces."""

# Build filter
filter_parts = []
if function_name:
filter_parts.append(f"tags.`mlflow.traceName` = '{function_name}'")

filter_string = " AND ".join(filter_parts) if filter_parts else None

# Get traces
traces = mlflow.search_traces(filter_string=filter_string)

if len(traces) == 0:
print("No traces found")
return

# Calculate percentiles
perf_stats = traces['execution_time_ms'].describe(percentiles=[p/100 for p in percentiles])

print(f"Performance Analysis ({len(traces)} traces)")
print("=" * 40)
for p in percentiles:
print(f"P{p}: {perf_stats[f'{p}%']:.1f}ms")
print(f"Mean: {perf_stats['mean']:.1f}ms")
print(f"Max: {perf_stats['max']:.1f}ms")

# Find outliers (>P99)
if 99 in percentiles:
p99_threshold = perf_stats['99%']
outliers = traces[traces['execution_time_ms'] > p99_threshold]

if len(outliers) > 0:
print(f"\nOutliers (>{p99_threshold:.0f}ms): {len(outliers)} traces")
for _, trace in outliers.head(3).iterrows():
print(f"- {trace['execution_time_ms']:.0f}ms: {trace['request_preview'][:50]}...")

return traces

Análise da atividade do usuário

Rastrear e analisar padrões de comportamento do usuário:

Python
def analyze_user_activity(user_id: str, days: int = 7):
"""Analyze activity patterns for a specific user."""

cutoff_ms = int((time.time() - days * 86400) * 1000)

traces = mlflow.search_traces(
filter_string=f"metadata.`mlflow.user` = '{user_id}' AND "
f"attributes.timestamp_ms > {cutoff_ms}",
order_by=["attributes.timestamp_ms DESC"]
)

if len(traces) == 0:
print(f"No activity found for user {user_id}")
return

print(f"User {user_id} Activity Report ({days} days)")
print("=" * 50)
print(f"Total requests: {len(traces)}")

# Daily activity
traces['date'] = pd.to_datetime(traces['timestamp_ms'], unit='ms').dt.date
daily_activity = traces.groupby('date').size()
print(f"\nDaily activity:")
print(daily_activity.to_string())

# Query categories
if 'tags.query_category' in traces.columns:
categories = traces['tags.query_category'].value_counts()
print(f"\nQuery categories:")
print(categories.to_string())

# Performance stats
print(f"\nPerformance:")
print(f"Average response time: {traces['execution_time_ms'].mean():.1f}ms")
print(f"Error rate: {(traces['status'] == 'ERROR').mean() * 100:.1f}%")

return traces

Traçado complexo pipeline RAG

Crie um rastreamento abrangente que demonstre todos os recursos:

Python
import mlflow
import time
from mlflow.entities import SpanType

# Create a complex RAG application trace
@mlflow.trace(span_type=SpanType.CHAIN)
def rag_pipeline(question: str):
"""Main RAG pipeline that orchestrates retrieval and generation."""
# Add custom tags and metadata
mlflow.update_current_trace(
tags={
"environment": "production",
"version": "2.1.0",
"user_id": "U12345",
"session_id": "S98765",
"mlflow.traceName": "rag_pipeline"
}
)

# Retrieve relevant documents
documents = retrieve_documents(question)

# Generate response with context
response = generate_answer(question, documents)

# Simulate tool usage
fact_check_result = fact_check_tool(response)

return {
"answer": response,
"fact_check": fact_check_result,
"sources": [doc["metadata"]["doc_uri"] for doc in documents]
}

@mlflow.trace(span_type=SpanType.RETRIEVER)
def retrieve_documents(query: str):
"""Retrieve relevant documents from vector store."""
time.sleep(0.1) # Simulate retrieval time

# Get current span to set outputs properly
span = mlflow.get_current_active_span()

# Create document objects following MLflow schema
from mlflow.entities import Document
documents = [
Document(
page_content="MLflow Tracing provides observability for GenAI apps...",
metadata={
"doc_uri": "docs/mlflow/tracing_guide.md",
"chunk_id": "chunk_001",
"relevance_score": 0.95
}
),
Document(
page_content="Traces consist of spans that capture execution steps...",
metadata={
"doc_uri": "docs/mlflow/trace_concepts.md",
"chunk_id": "chunk_042",
"relevance_score": 0.87
}
)
]

# Set span outputs properly for RETRIEVER type
span.set_outputs(documents)

return [doc.to_dict() for doc in documents]

@mlflow.trace(span_type=SpanType.CHAT_MODEL)
def generate_answer(question: str, documents: list):
"""Generate answer using LLM with retrieved context."""
time.sleep(0.2) # Simulate LLM processing

# Set chat-specific attributes
from mlflow.tracing import set_span_chat_messages, set_span_chat_tools

messages = [
{"role": "system", "content": "You are a helpful assistant. Use the provided context to answer questions."},
{"role": "user", "content": f"Context: {documents}\n\nQuestion: {question}"}
]

# Define available tools
tools = [
{
"type": "function",
"function": {
"name": "fact_check",
"description": "Verify facts in the response",
"parameters": {
"type": "object",
"properties": {
"statement": {"type": "string"}
},
"required": ["statement"]
}
}
}
]

span = mlflow.get_current_active_span()
set_span_chat_messages(span, messages)
set_span_chat_tools(span, tools)

# Simulate token usage
span.set_attribute("llm.token_usage.input_tokens", 150)
span.set_attribute("llm.token_usage.output_tokens", 75)
span.set_attribute("llm.token_usage.total_tokens", 225)

return "MLflow Tracing provides comprehensive observability for GenAI applications by capturing detailed execution information through spans."

@mlflow.trace(span_type=SpanType.TOOL)
def fact_check_tool(statement: str):
"""Tool to verify facts in the generated response."""
time.sleep(0.05)

# Simulate an error for demonstration
if "comprehensive" in statement:
raise ValueError("Fact verification service unavailable")

return {"verified": True, "confidence": 0.92}

# Execute the pipeline
try:
result = rag_pipeline("What is MLflow Tracing?")
except Exception as e:
print(f"Pipeline error: {e}")

# Get the trace
trace_id = mlflow.get_last_active_trace_id()
trace = mlflow.get_trace(trace_id)

# Log assessments to the trace
from mlflow.entities import AssessmentSource, AssessmentSourceType

# Add human feedback
mlflow.log_feedback(
trace_id=trace_id,
name="helpfulness",
value=4,
source=AssessmentSource(
source_type=AssessmentSourceType.HUMAN,
source_id="reviewer_alice@company.com"
),
rationale="Clear and accurate response with good context usage"
)

# Add LLM judge assessment
mlflow.log_feedback(
trace_id=trace_id,
name="relevance_score",
value=0.92,
source=AssessmentSource(
source_type=AssessmentSourceType.LLM_JUDGE,
source_id="gpt-4-evaluator"
),
metadata={"evaluation_prompt_version": "v2.1"}
)

# Add ground truth expectation
mlflow.log_expectation(
trace_id=trace_id,
name="expected_facts",
value=["observability", "spans", "GenAI applications"],
source=AssessmentSource(
source_type=AssessmentSourceType.HUMAN,
source_id="subject_matter_expert"
)
)

# Add span-specific feedback
retriever_span = trace.search_spans(name="retrieve_documents")[0]
mlflow.log_feedback(
trace_id=trace_id,
span_id=retriever_span.span_id,
name="retrieval_quality",
value="excellent",
source=AssessmentSource(
source_type=AssessmentSourceType.CODE,
source_id="retrieval_evaluator.py"
)
)

# Refresh trace to get assessments
trace = mlflow.get_trace(trace_id)

Análise abrangente de vestígios

Construa uma análise de rastreamento russas completa que extraia todas as informações significativas:

Python
import datetime

def analyze_trace(trace_id: str):
"""Comprehensive analysis of a trace."""

# Get the trace
trace = mlflow.get_trace(trace_id)

print(f"=== TRACE ANALYSIS: {trace_id} ===\n")

# 1. Basic Information
print("1. BASIC INFORMATION")
print(f" State: {trace.info.state}")
print(f" Duration: {trace.info.execution_duration}ms")
print(f" Start time: {datetime.datetime.fromtimestamp(trace.info.request_time/1000)}")

if trace.info.experiment_id:
print(f" Experiment: {trace.info.experiment_id}")

# Show request/response previews for quick context
if trace.info.request_preview:
print(f" Request preview: {trace.info.request_preview}")

if trace.info.response_preview:
print(f" Response preview: {trace.info.response_preview}")

# 2. Tags Analysis
print("\n2. TAGS")
for key, value in sorted(trace.info.tags.items()):
print(f" {key}: {value}")

# 3. Token Usage
print("\n3. TOKEN USAGE")
if tokens := trace.info.token_usage:
print(f" Input: {tokens.get('input_tokens', 0)}")
print(f" Output: {tokens.get('output_tokens', 0)}")
print(f" Total: {tokens.get('total_tokens', 0)}")

# Calculate from spans if not in metadata
total_input = 0
total_output = 0
for span in trace.data.spans:
if span.span_type == SpanType.CHAT_MODEL:
total_input += span.get_attribute("llm.token_usage.input_tokens") or 0
total_output += span.get_attribute("llm.token_usage.output_tokens") or 0

if total_input or total_output:
print(f" (From spans - Input: {total_input}, Output: {total_output})")

# 4. Span Analysis
print("\n4. SPAN ANALYSIS")
span_types = {}
error_spans = []

for span in trace.data.spans:
# Count by type
span_types[span.span_type] = span_types.get(span.span_type, 0) + 1

# Collect errors
if span.status.status_code.name == "ERROR":
error_spans.append(span)

print(" Span counts by type:")
for span_type, count in sorted(span_types.items()):
print(f" {span_type}: {count}")

if error_spans:
print(f"\n Error spans ({len(error_spans)}):")
for span in error_spans:
print(f" - {span.name}: {span.status.description}")

# 5. Retrieval Analysis
print("\n5. RETRIEVAL ANALYSIS")
retriever_spans = trace.search_spans(span_type=SpanType.RETRIEVER)
if retriever_spans:
for r_span in retriever_spans:
if r_span.outputs:
docs = r_span.outputs
print(f" Retrieved {len(docs)} documents:")
for doc in docs[:3]: # Show first 3
if isinstance(doc, dict):
uri = doc.get('metadata', {}).get('doc_uri', 'Unknown')
score = doc.get('metadata', {}).get('relevance_score', 'N/A')
print(f" - {uri} (score: {score})")

# 6. Assessment Summary
print("\n6. ASSESSMENTS")
assessments = trace.search_assessments()

# Group by source type
by_source = {}
for assessment in assessments:
source_type = assessment.source.source_type
if source_type not in by_source:
by_source[source_type] = []
by_source[source_type].append(assessment)

for source_type, items in by_source.items():
print(f"\n {source_type} ({len(items)}):")
for assessment in items:
value_str = f"{assessment.value}"
if assessment.rationale:
value_str += f" - {assessment.rationale[:50]}..."
print(f" {assessment.name}: {value_str}")

# 7. Performance Breakdown
print("\n7. PERFORMANCE BREAKDOWN")
root_span = next((s for s in trace.data.spans if s.parent_id is None), None)
if root_span:
total_duration_ns = root_span.end_time_ns - root_span.start_time_ns

# Calculate time spent in each span type
time_by_type = {}
for span in trace.data.spans:
duration_ms = (span.end_time_ns - span.start_time_ns) / 1_000_000
if span.span_type not in time_by_type:
time_by_type[span.span_type] = 0
time_by_type[span.span_type] += duration_ms

print(" Time by span type:")
for span_type, duration_ms in sorted(time_by_type.items(),
key=lambda x: x[1], reverse=True):
percentage = (duration_ms / (total_duration_ns / 1_000_000)) * 100
print(f" {span_type}: {duration_ms:.1f}ms ({percentage:.1f}%)")

# 8. Data Flow
print("\n8. DATA FLOW")
if intermediate := trace.data.intermediate_outputs:
print(" Intermediate outputs:")
for name, output in intermediate.items():
output_str = str(output)[:100] + "..." if len(str(output)) > 100 else str(output)
print(f" {name}: {output_str}")

return trace

# Run the analysis
analysis_result = analyze_trace(trace_id)

Construir traços reutilizáveis russas

Python
class TraceAnalyzer:
"""Utility class for advanced trace analysis."""

def __init__(self, trace: mlflow.entities.Trace):
self.trace = trace

def get_error_summary(self):
"""Get summary of all errors in the trace."""
errors = []

# Check trace status
if self.trace.info.state == "ERROR":
errors.append({
"level": "trace",
"message": "Trace failed",
"details": self.trace.info.response_preview
})

# Check span errors
for span in self.trace.data.spans:
if span.status.status_code.name == "ERROR":
errors.append({
"level": "span",
"span_name": span.name,
"span_type": span.span_type,
"message": span.status.description,
"span_id": span.span_id
})

# Check assessment errors
for assessment in self.trace.info.assessments:
if assessment.error:
errors.append({
"level": "assessment",
"assessment_name": assessment.name,
"error": str(assessment.error)
})

return errors

def get_llm_usage_summary(self):
"""Aggregate LLM usage across all spans."""
usage = {
"total_llm_calls": 0,
"total_input_tokens": 0,
"total_output_tokens": 0,
"spans": []
}

for span in self.trace.data.spans:
if span.span_type in [SpanType.CHAT_MODEL, "LLM"]:
usage["total_llm_calls"] += 1

input_tokens = span.get_attribute("llm.token_usage.input_tokens") or 0
output_tokens = span.get_attribute("llm.token_usage.output_tokens") or 0

usage["total_input_tokens"] += input_tokens
usage["total_output_tokens"] += output_tokens
usage["spans"].append({
"name": span.name,
"input_tokens": input_tokens,
"output_tokens": output_tokens
})

usage["total_tokens"] = usage["total_input_tokens"] + usage["total_output_tokens"]
return usage

def get_retrieval_metrics(self):
"""Extract retrieval quality metrics."""
metrics = []

for span in self.trace.search_spans(span_type=SpanType.RETRIEVER):
if span.outputs:
docs = span.outputs
relevance_scores = []

for doc in docs:
if isinstance(doc, dict) and 'metadata' in doc:
if score := doc['metadata'].get('relevance_score'):
relevance_scores.append(score)

metrics.append({
"span_name": span.name,
"num_documents": len(docs),
"avg_relevance": sum(relevance_scores) / len(relevance_scores) if relevance_scores else None,
"max_relevance": max(relevance_scores) if relevance_scores else None,
"min_relevance": min(relevance_scores) if relevance_scores else None
})

return metrics

def get_span_hierarchy(self):
"""Build a hierarchical view of spans."""
# Create span lookup
span_dict = {span.span_id: span for span in self.trace.data.spans}

# Find root spans
roots = [span for span in self.trace.data.spans if span.parent_id is None]

def build_tree(span, indent=0):
result = []
duration_ms = (span.end_time_ns - span.start_time_ns) / 1_000_000
result.append({
"indent": indent,
"name": span.name,
"type": span.span_type,
"duration_ms": duration_ms,
"status": span.status.status_code.name
})

# Find children
children = [s for s in self.trace.data.spans if s.parent_id == span.span_id]
for child in sorted(children, key=lambda s: s.start_time_ns):
result.extend(build_tree(child, indent + 1))

return result

hierarchy = []
for root in roots:
hierarchy.extend(build_tree(root))

return hierarchy

def export_for_evaluation(self):
"""Export trace data in a format suitable for evaluation."""
# Get root span data
request = response = None
if self.trace.data.request:
request = json.loads(self.trace.data.request)
if self.trace.data.response:
response = json.loads(self.trace.data.response)

# Get expected values from assessments
expectations = self.trace.search_assessments(type="expectation")
expected_values = {exp.name: exp.value for exp in expectations}

# Get retrieval context
retrieved_context = []
for span in self.trace.search_spans(span_type=SpanType.RETRIEVER):
if span.outputs:
for doc in span.outputs:
if isinstance(doc, dict) and 'page_content' in doc:
retrieved_context.append(doc['page_content'])

return {
"trace_id": self.trace.info.trace_id,
"request": request,
"response": response,
"retrieved_context": retrieved_context,
"expected_facts": expected_values.get("expected_facts", []),
"metadata": {
"user_id": self.trace.info.tags.get("user_id"),
"session_id": self.trace.info.tags.get("session_id"),
"duration_ms": self.trace.info.execution_duration,
"timestamp": self.trace.info.request_time
}
}

# Use the analyzer
analyzer = TraceAnalyzer(trace)

# Get various analyses
errors = analyzer.get_error_summary()
print(f"\nErrors found: {len(errors)}")
for error in errors:
print(f" - {error['level']}: {error.get('message', error.get('error'))}")

llm_usage = analyzer.get_llm_usage_summary()
print(f"\nLLM Usage: {llm_usage['total_tokens']} total tokens across {llm_usage['total_llm_calls']} calls")

retrieval_metrics = analyzer.get_retrieval_metrics()
print(f"\nRetrieval Metrics:")
for metric in retrieval_metrics:
print(f" - {metric['span_name']}: {metric['num_documents']} docs, avg relevance: {metric['avg_relevance']}")

# Export for evaluation
eval_data = analyzer.export_for_evaluation()
print(f"\nExported evaluation data with {len(eval_data['retrieved_context'])} context chunks")

Próximos passos