Trace entity SDK reference
This comprehensive guide covers the MLflow Trace entity SDK, demonstrating how to access and analyze every aspect of trace data including metadata, spans, assessments, and more.
Overview
The MLflow Trace object consists of two main components:
- TraceInfo: Metadata about the trace (ID, timing, status, tags, assessments)
- TraceData: The actual execution data (spans, request/response)
Creating a complex example trace
Let's create a comprehensive trace that demonstrates all features:
import mlflow
import time
from mlflow.entities import SpanType
# Create a complex RAG application trace
@mlflow.trace(span_type=SpanType.CHAIN)
def rag_pipeline(question: str):
"""Main RAG pipeline that orchestrates retrieval and generation."""
# Add custom tags and metadata
mlflow.update_current_trace(
tags={
"environment": "production",
"version": "2.1.0",
"user_id": "U12345",
"session_id": "S98765",
"mlflow.traceName": "rag_pipeline"
}
)
# Retrieve relevant documents
documents = retrieve_documents(question)
# Generate response with context
response = generate_answer(question, documents)
# Simulate tool usage
fact_check_result = fact_check_tool(response)
return {
"answer": response,
"fact_check": fact_check_result,
"sources": [doc["metadata"]["doc_uri"] for doc in documents]
}
@mlflow.trace(span_type=SpanType.RETRIEVER)
def retrieve_documents(query: str):
"""Retrieve relevant documents from vector store."""
time.sleep(0.1) # Simulate retrieval time
# Get current span to set outputs properly
span = mlflow.get_current_active_span()
# Create document objects following MLflow schema
from mlflow.entities import Document
documents = [
Document(
page_content="MLflow Tracing provides observability for GenAI apps...",
metadata={
"doc_uri": "docs/mlflow/tracing_guide.md",
"chunk_id": "chunk_001",
"relevance_score": 0.95
}
),
Document(
page_content="Traces consist of spans that capture execution steps...",
metadata={
"doc_uri": "docs/mlflow/trace_concepts.md",
"chunk_id": "chunk_042",
"relevance_score": 0.87
}
)
]
# Set span outputs properly for RETRIEVER type
span.set_outputs(documents)
return [doc.to_dict() for doc in documents]
@mlflow.trace(span_type=SpanType.CHAT_MODEL)
def generate_answer(question: str, documents: list):
"""Generate answer using LLM with retrieved context."""
time.sleep(0.2) # Simulate LLM processing
# Set chat-specific attributes
from mlflow.tracing import set_span_chat_messages, set_span_chat_tools
messages = [
{"role": "system", "content": "You are a helpful assistant. Use the provided context to answer questions."},
{"role": "user", "content": f"Context: {documents}\n\nQuestion: {question}"}
]
# Define available tools
tools = [
{
"type": "function",
"function": {
"name": "fact_check",
"description": "Verify facts in the response",
"parameters": {
"type": "object",
"properties": {
"statement": {"type": "string"}
},
"required": ["statement"]
}
}
}
]
span = mlflow.get_current_active_span()
set_span_chat_messages(span, messages)
set_span_chat_tools(span, tools)
# Simulate token usage
span.set_attribute("llm.token_usage.input_tokens", 150)
span.set_attribute("llm.token_usage.output_tokens", 75)
span.set_attribute("llm.token_usage.total_tokens", 225)
return "MLflow Tracing provides comprehensive observability for GenAI applications by capturing detailed execution information through spans."
@mlflow.trace(span_type=SpanType.TOOL)
def fact_check_tool(statement: str):
"""Tool to verify facts in the generated response."""
time.sleep(0.05)
# Simulate an error for demonstration
if "comprehensive" in statement:
raise ValueError("Fact verification service unavailable")
return {"verified": True, "confidence": 0.92}
# Execute the pipeline
try:
result = rag_pipeline("What is MLflow Tracing?")
except Exception as e:
print(f"Pipeline error: {e}")
# Get the trace
trace_id = mlflow.get_last_active_trace_id()
trace = mlflow.get_trace(trace_id)
# Log assessments to the trace
from mlflow.entities import AssessmentSource, AssessmentSourceType
# Add human feedback
mlflow.log_feedback(
trace_id=trace_id,
name="helpfulness",
value=4,
source=AssessmentSource(
source_type=AssessmentSourceType.HUMAN,
source_id="reviewer_alice@company.com"
),
rationale="Clear and accurate response with good context usage"
)
# Add LLM judge assessment
mlflow.log_feedback(
trace_id=trace_id,
name="relevance_score",
value=0.92,
source=AssessmentSource(
source_type=AssessmentSourceType.LLM_JUDGE,
source_id="gpt-4-evaluator"
),
metadata={"evaluation_prompt_version": "v2.1"}
)
# Add ground truth expectation
mlflow.log_expectation(
trace_id=trace_id,
name="expected_facts",
value=["observability", "spans", "GenAI applications"],
source=AssessmentSource(
source_type=AssessmentSourceType.HUMAN,
source_id="subject_matter_expert"
)
)
# Add span-specific feedback
retriever_span = trace.search_spans(name="retrieve_documents")[0]
mlflow.log_feedback(
trace_id=trace_id,
span_id=retriever_span.span_id,
name="retrieval_quality",
value="excellent",
source=AssessmentSource(
source_type=AssessmentSourceType.CODE,
source_id="retrieval_evaluator.py"
)
)
# Refresh trace to get assessments
trace = mlflow.get_trace(trace_id)
Accessing trace metadata (TraceInfo)
Basic metadata properties
# Primary identifiers
print(f"Trace ID: {trace.info.trace_id}")
print(f"Client Request ID: {trace.info.client_request_id}")
# Status information
print(f"State: {trace.info.state}") # OK, ERROR, IN_PROGRESS
print(f"Status (deprecated): {trace.info.status}") # Use state instead
# Request/response previews (truncated)
print(f"Request preview: {trace.info.request_preview}")
print(f"Response preview: {trace.info.response_preview}")
Time-related properties
# Timestamps (milliseconds since epoch)
print(f"Start time (ms): {trace.info.request_time}")
print(f"Timestamp (ms): {trace.info.timestamp_ms}") # Alias for request_time
# Duration
print(f"Execution duration (ms): {trace.info.execution_duration}")
print(f"Execution time (ms): {trace.info.execution_time_ms}") # Alias
# Convert to human-readable format
import datetime
start_time = datetime.datetime.fromtimestamp(trace.info.request_time / 1000)
print(f"Started at: {start_time}")
Location and experiment information
# Trace storage location
location = trace.info.trace_location
print(f"Location type: {location.type}")
# If stored in MLflow experiment
if location.mlflow_experiment:
print(f"Experiment ID: {location.mlflow_experiment.experiment_id}")
# Shortcut property
print(f"Experiment ID: {trace.info.experiment_id}")
# If stored in Databricks inference table
if location.inference_table:
print(f"Table: {location.inference_table.full_table_name}")
Tags and metadata
# Tags (mutable, can be updated after creation)
print("Tags:")
for key, value in trace.info.tags.items():
print(f" {key}: {value}")
# Access specific tags
print(f"Environment: {trace.info.tags.get('environment')}")
print(f"User ID: {trace.info.tags.get('user_id')}")
# Trace metadata (immutable, set at creation)
print("\nTrace metadata:")
for key, value in trace.info.trace_metadata.items():
print(f" {key}: {value}")
# Deprecated alias
print(f"Request metadata: {trace.info.request_metadata}") # Same as trace_metadata
Token usage information
# Get aggregated token usage (if available)
token_usage = trace.info.token_usage
if token_usage:
print(f"Input tokens: {token_usage.get('input_tokens')}")
print(f"Output tokens: {token_usage.get('output_tokens')}")
print(f"Total tokens: {token_usage.get('total_tokens')}")
Accessing trace data (TraceData)
Working with spans
# Access all spans
spans = trace.data.spans
print(f"Total spans: {len(spans)}")
# Iterate through spans
for span in spans:
print(f"\nSpan: {span.name}")
print(f" ID: {span.span_id}")
print(f" Type: {span.span_type}")
print(f" Status: {span.status}")
print(f" Start time: {span.start_time_ns}")
print(f" End time: {span.end_time_ns}")
print(f" Duration (ns): {span.end_time_ns - span.start_time_ns}")
# Parent-child relationships
if span.parent_id:
print(f" Parent ID: {span.parent_id}")
# Inputs and outputs
if span.inputs:
print(f" Inputs: {span.inputs}")
if span.outputs:
print(f" Outputs: {span.outputs}")
Request and response data
# Get root span request/response (backward compatibility)
request_json = trace.data.request
response_json = trace.data.response
# Parse JSON strings
import json
if request_json:
request_data = json.loads(request_json)
print(f"Request: {request_data}")
if response_json:
response_data = json.loads(response_json)
print(f"Response: {response_data}")
Intermediate outputs
# Get intermediate outputs from non-root spans
intermediate = trace.data.intermediate_outputs
if intermediate:
print("\nIntermediate outputs:")
for span_name, output in intermediate.items():
print(f" {span_name}: {output}")
Searching within traces
Finding spans with search_spans()
import re
from mlflow.entities import SpanType
# 1. Search by exact name
retriever_spans = trace.search_spans(name="retrieve_documents")
print(f"Found {len(retriever_spans)} retriever spans")
# 2. Search by regex pattern
pattern = re.compile(r".*_tool$")
tool_spans = trace.search_spans(name=pattern)
print(f"Found {len(tool_spans)} tool spans")
# 3. Search by span type
chat_spans = trace.search_spans(span_type=SpanType.CHAT_MODEL)
llm_spans = trace.search_spans(span_type="CHAT_MODEL") # String also works
print(f"Found {len(chat_spans)} chat model spans")
# 4. Search by span ID
specific_span = trace.search_spans(span_id=retriever_spans[0].span_id)
print(f"Found span: {specific_span[0].name if specific_span else 'Not found'}")
# 5. Combine criteria
tool_fact_check = trace.search_spans(
name="fact_check_tool",
span_type=SpanType.TOOL
)
print(f"Found {len(tool_fact_check)} fact check tool spans")
# 6. Get all spans of a type
all_tools = trace.search_spans(span_type=SpanType.TOOL)
for tool in all_tools:
print(f"Tool: {tool.name}")
Accessing span attributes
from mlflow.tracing.constant import SpanAttributeKey
# Get a chat model span
chat_span = trace.search_spans(span_type=SpanType.CHAT_MODEL)[0]
# Access chat-specific attributes
messages = chat_span.get_attribute(SpanAttributeKey.CHAT_MESSAGES)
tools = chat_span.get_attribute(SpanAttributeKey.CHAT_TOOLS)
print(f"Chat messages: {messages}")
print(f"Available tools: {tools}")
# Access token usage from span
input_tokens = chat_span.get_attribute("llm.token_usage.input_tokens")
output_tokens = chat_span.get_attribute("llm.token_usage.output_tokens")
print(f"Span token usage - Input: {input_tokens}, Output: {output_tokens}")
# Access all attributes
print("\nAll span attributes:")
for key, value in chat_span.attributes.items():
print(f" {key}: {value}")
Working with assessments
Finding assessments with search_assessments()
# 1. Get all assessments
all_assessments = trace.search_assessments()
print(f"Total assessments: {len(all_assessments)}")
# 2. Search by name
helpfulness = trace.search_assessments(name="helpfulness")
if helpfulness:
assessment = helpfulness[0]
print(f"Helpfulness: {assessment.value}")
print(f"Source: {assessment.source.source_type} - {assessment.source.source_id}")
print(f"Rationale: {assessment.rationale}")
# 3. Search by type
feedback_only = trace.search_assessments(type="feedback")
expectations_only = trace.search_assessments(type="expectation")
print(f"Feedback assessments: {len(feedback_only)}")
print(f"Expectation assessments: {len(expectations_only)}")
# 4. Search by span ID
span_assessments = trace.search_assessments(span_id=retriever_span.span_id)
print(f"Assessments for retriever span: {len(span_assessments)}")
# 5. Get all assessments including overridden ones
all_including_invalid = trace.search_assessments(all=True)
print(f"All assessments (including overridden): {len(all_including_invalid)}")
# 6. Combine criteria
human_feedback = trace.search_assessments(
type="feedback",
name="helpfulness"
)
for fb in human_feedback:
print(f"Human feedback: {fb.name} = {fb.value}")
Accessing assessment details
# Get detailed assessment information
for assessment in trace.info.assessments:
print(f"\nAssessment: {assessment.name}")
print(f" Type: {type(assessment).__name__}")
print(f" Value: {assessment.value}")
print(f" Source: {assessment.source.source_type.value}")
print(f" Source ID: {assessment.source.source_id}")
# Optional fields
if assessment.rationale:
print(f" Rationale: {assessment.rationale}")
if assessment.metadata:
print(f" Metadata: {assessment.metadata}")
if assessment.error:
print(f" Error: {assessment.error}")
if hasattr(assessment, 'span_id') and assessment.span_id:
print(f" Span ID: {assessment.span_id}")
Data export and conversion
Converting to dictionary
# Convert entire trace to dictionary
trace_dict = trace.to_dict()
print(f"Trace dict keys: {trace_dict.keys()}")
print(f"Info keys: {trace_dict['info'].keys()}")
print(f"Data keys: {trace_dict['data'].keys()}")
# Convert individual components
info_dict = trace.info.to_dict()
data_dict = trace.data.to_dict()
# Reconstruct trace from dictionary
from mlflow.entities import Trace
reconstructed_trace = Trace.from_dict(trace_dict)
print(f"Reconstructed trace ID: {reconstructed_trace.info.trace_id}")
JSON serialization
# Convert to JSON string
trace_json = trace.to_json()
print(f"JSON length: {len(trace_json)} characters")
# Pretty print JSON
trace_json_pretty = trace.to_json(pretty=True)
print("Pretty JSON (first 500 chars):")
print(trace_json_pretty[:500])
# Load trace from JSON
from mlflow.entities import Trace
loaded_trace = Trace.from_json(trace_json)
print(f"Loaded trace ID: {loaded_trace.info.trace_id}")
Pandas DataFrame conversion
# Convert trace to DataFrame row
row_data = trace.to_pandas_dataframe_row()
print(f"DataFrame row keys: {list(row_data.keys())}")
# Create DataFrame from multiple traces
import pandas as pd
# Get multiple traces
traces = mlflow.search_traces(max_results=5)
# If you have individual trace objects
trace_rows = [t.to_pandas_dataframe_row() for t in [trace]]
df = pd.DataFrame(trace_rows)
print(f"DataFrame shape: {df.shape}")
print(f"Columns: {df.columns.tolist()}")
# Access specific data from DataFrame
print(f"Trace IDs: {df['trace_id'].tolist()}")
print(f"States: {df['state'].tolist()}")
print(f"Durations: {df['execution_duration'].tolist()}")
Span entity SDK reference
Spans are the building blocks of traces, representing individual operations or units of work. MLflow provides several span-related classes and utilities.
Span types
MLflow defines standard span types to categorize operations:
from mlflow.entities import SpanType
# Predefined span types
print("Available span types:")
print(f" CHAIN: {SpanType.CHAIN}") # Orchestration/workflow spans
print(f" LLM: {SpanType.LLM}") # LLM inference spans
print(f" CHAT_MODEL: {SpanType.CHAT_MODEL}") # Chat completion spans
print(f" RETRIEVER: {SpanType.RETRIEVER}") # Document retrieval spans
print(f" TOOL: {SpanType.TOOL}") # Tool/function execution spans
print(f" EMBEDDING: {SpanType.EMBEDDING}") # Embedding generation spans
print(f" PARSER: {SpanType.PARSER}") # Output parsing spans
print(f" RERANKER: {SpanType.RERANKER}") # Document reranking spans
print(f" AGENT: {SpanType.AGENT}") # Agent execution spans
print(f" UNKNOWN: {SpanType.UNKNOWN}") # Default/unspecified type
# You can also use custom string values
custom_type = "CUSTOM_PROCESSOR"
Working with immutable spans (Span class)
The Span
class represents immutable, completed spans retrieved from traces:
# Get a span from a trace
spans = trace.data.spans
span = spans[0]
# Basic properties
print(f"Span ID: {span.span_id}")
print(f"Name: {span.name}")
print(f"Type: {span.span_type}")
print(f"Trace ID: {span.trace_id}") # Which trace this span belongs to
print(f"Parent ID: {span.parent_id}") # None for root spans
# Timing information (nanoseconds)
print(f"Start time: {span.start_time_ns}")
print(f"End time: {span.end_time_ns}")
duration_ms = (span.end_time_ns - span.start_time_ns) / 1_000_000
print(f"Duration: {duration_ms:.2f}ms")
# Status information
print(f"Status: {span.status}")
print(f"Status code: {span.status.status_code}")
print(f"Status description: {span.status.description}")
# Inputs and outputs
print(f"Inputs: {span.inputs}")
print(f"Outputs: {span.outputs}")
# Get all attributes
attributes = span.attributes
print(f"Total attributes: {len(attributes)}")
# Get specific attribute
specific_attr = span.get_attribute("custom_attribute")
print(f"Custom attribute: {specific_attr}")
# Access events
for event in span.events:
print(f"Event: {event.name} at {event.timestamp}")
print(f" Attributes: {event.attributes}")
Converting spans to/from dictionaries
# Convert span to dictionary
span_dict = span.to_dict()
print(f"Span dict keys: {span_dict.keys()}")
# Recreate span from dictionary
from mlflow.entities import Span
reconstructed_span = Span.from_dict(span_dict)
print(f"Reconstructed span: {reconstructed_span.name}")
Working with live spans (LiveSpan class)
When you create spans during execution, you work with LiveSpan
objects that can be modified:
import mlflow
from mlflow.entities import SpanType, SpanStatus, SpanStatusCode
@mlflow.trace(span_type=SpanType.CHAIN)
def process_data(data: dict):
# Get the current active span (LiveSpan)
span = mlflow.get_current_active_span()
# Set span type (if not set via decorator)
span.set_span_type(SpanType.CHAIN)
# Set inputs
span.set_inputs({"data": data, "timestamp": time.time()})
# Set individual attributes
span.set_attribute("processing_version", "2.0")
span.set_attribute("data_size", len(str(data)))
# Set multiple attributes at once
span.set_attributes({
"environment": "production",
"region": "us-west-2",
"custom_metadata": {"key": "value"}
})
try:
# Process the data
result = {"processed": True, "count": len(data)}
# Set outputs
span.set_outputs(result)
# Set success status
span.set_status(SpanStatusCode.OK)
except Exception as e:
# Record the exception
span.record_exception(e)
# This automatically sets status to ERROR and adds an exception event
raise
return result
# Example with manual span creation
with mlflow.start_span(name="manual_span", span_type=SpanType.TOOL) as span:
# Add events during execution
from mlflow.entities import SpanEvent
span.add_event(SpanEvent(
name="processing_started",
attributes={
"stage": "initialization",
"memory_usage_mb": 256
}
))
# Do some work...
time.sleep(0.1)
# Add another event
span.add_event(SpanEvent(
name="checkpoint_reached",
attributes={"progress": 0.5}
))
# Manually end the span with outputs and status
span.end(
outputs={"result": "success"},
attributes={"final_metric": 0.95},
status=SpanStatusCode.OK
)
Span events
Events record specific occurrences during a span's lifetime:
from mlflow.entities import SpanEvent
import time
# Create an event with current timestamp
event = SpanEvent(
name="validation_completed",
attributes={
"records_validated": 1000,
"errors_found": 3,
"validation_type": "schema"
}
)
# Create an event with specific timestamp (nanoseconds)
specific_time_event = SpanEvent(
name="data_checkpoint",
timestamp=int(time.time() * 1e9),
attributes={"checkpoint_id": "ckpt_123"}
)
# Create an event from an exception
try:
raise ValueError("Invalid input format")
except Exception as e:
error_event = SpanEvent.from_exception(e)
# This creates an event with name="exception" and attributes containing:
# - exception.message
# - exception.type
# - exception.stacktrace
# Add to current span
span = mlflow.get_current_active_span()
span.add_event(error_event)
Span status
Control and query span execution status:
from mlflow.entities import SpanStatus, SpanStatusCode
# Create status objects
success_status = SpanStatus(SpanStatusCode.OK)
error_status = SpanStatus(
SpanStatusCode.ERROR,
description="Failed to connect to database"
)
# Set status on a live span
span = mlflow.get_current_active_span()
span.set_status(success_status)
# Or use string shortcuts
span.set_status("OK")
span.set_status("ERROR")
# Query status from completed spans
for span in trace.data.spans:
if span.status.status_code == SpanStatusCode.ERROR:
print(f"Error in {span.name}: {span.status.description}")
Special span attributes
MLflow uses specific attribute keys for special purposes:
from mlflow.tracing.constant import SpanAttributeKey
# Common span attributes
span = mlflow.get_current_active_span()
# These are set automatically but can be accessed
request_id = span.get_attribute(SpanAttributeKey.REQUEST_ID) # Trace ID
span_type = span.get_attribute(SpanAttributeKey.SPAN_TYPE)
# For CHAT_MODEL spans
from mlflow.tracing import set_span_chat_messages, set_span_chat_tools
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Hello!"}
]
tools = [{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get weather for a location"
}
}]
span = mlflow.get_current_active_span()
set_span_chat_messages(span, messages)
set_span_chat_tools(span, tools)
# Access these special attributes
chat_messages = span.get_attribute(SpanAttributeKey.CHAT_MESSAGES)
chat_tools = span.get_attribute(SpanAttributeKey.CHAT_TOOLS)
# For token usage tracking
span.set_attribute("llm.token_usage.input_tokens", 150)
span.set_attribute("llm.token_usage.output_tokens", 75)
span.set_attribute("llm.token_usage.total_tokens", 225)
Working with RETRIEVER spans
RETRIEVER spans have special output requirements:
from mlflow.entities import Document, SpanType
@mlflow.trace(span_type=SpanType.RETRIEVER)
def retrieve_documents(query: str):
span = mlflow.get_current_active_span()
# Create Document objects (required for RETRIEVER spans)
documents = [
Document(
page_content="The content of the document...",
metadata={
"doc_uri": "path/to/document.md",
"chunk_id": "chunk_001",
"relevance_score": 0.95,
"source": "knowledge_base"
},
id="doc_123" # Optional document ID
),
Document(
page_content="Another relevant section...",
metadata={
"doc_uri": "path/to/other.md",
"chunk_id": "chunk_042",
"relevance_score": 0.87
}
)
]
# Set outputs as Document objects for proper UI rendering
span.set_outputs(documents)
# Return in your preferred format
return [doc.to_dict() for doc in documents]
# Accessing retriever outputs
retriever_span = trace.search_spans(span_type=SpanType.RETRIEVER)[0]
if retriever_span.outputs:
for doc in retriever_span.outputs:
if isinstance(doc, dict):
content = doc.get('page_content', '')
uri = doc.get('metadata', {}).get('doc_uri', '')
score = doc.get('metadata', {}).get('relevance_score', 0)
print(f"Document from {uri} (score: {score})")
Advanced span analysis
def analyze_span_tree(trace):
"""Analyze the span hierarchy and relationships."""
spans = trace.data.spans
# Build parent-child relationships
span_dict = {span.span_id: span for span in spans}
children = {}
for span in spans:
if span.parent_id:
if span.parent_id not in children:
children[span.parent_id] = []
children[span.parent_id].append(span)
# Find root spans
roots = [s for s in spans if s.parent_id is None]
def print_tree(span, indent=0):
duration_ms = (span.end_time_ns - span.start_time_ns) / 1_000_000
status_icon = "✓" if span.status.status_code == SpanStatusCode.OK else "✗"
print(f"{' ' * indent}{status_icon} {span.name} ({span.span_type}) - {duration_ms:.1f}ms")
# Print children
for child in sorted(children.get(span.span_id, []),
key=lambda s: s.start_time_ns):
print_tree(child, indent + 1)
print("Span Hierarchy:")
for root in roots:
print_tree(root)
# Calculate span statistics
total_time = sum((s.end_time_ns - s.start_time_ns) / 1_000_000
for s in spans)
llm_time = sum((s.end_time_ns - s.start_time_ns) / 1_000_000
for s in spans if s.span_type in [SpanType.LLM, SpanType.CHAT_MODEL])
retrieval_time = sum((s.end_time_ns - s.start_time_ns) / 1_000_000
for s in spans if s.span_type == SpanType.RETRIEVER)
print(f"\nSpan Statistics:")
print(f" Total spans: {len(spans)}")
print(f" Total time: {total_time:.1f}ms")
print(f" LLM time: {llm_time:.1f}ms ({llm_time/total_time*100:.1f}%)")
print(f" Retrieval time: {retrieval_time:.1f}ms ({retrieval_time/total_time*100:.1f}%)")
# Find critical path (longest duration path from root to leaf)
def find_critical_path(span):
child_paths = []
for child in children.get(span.span_id, []):
path, duration = find_critical_path(child)
child_paths.append((path, duration))
span_duration = (span.end_time_ns - span.start_time_ns) / 1_000_000
if child_paths:
best_path, best_duration = max(child_paths, key=lambda x: x[1])
return [span] + best_path, span_duration + best_duration
else:
return [span], span_duration
if roots:
critical_paths = [find_critical_path(root) for root in roots]
critical_path, critical_duration = max(critical_paths, key=lambda x: x[1])
print(f"\nCritical Path ({critical_duration:.1f}ms total):")
for span in critical_path:
duration_ms = (span.end_time_ns - span.start_time_ns) / 1_000_000
print(f" → {span.name} ({duration_ms:.1f}ms)")
# Use the analyzer
analyze_span_tree(trace)
Practical example: Comprehensive trace analysis
Let's build a complete trace analysis utility that extracts all meaningful information:
def analyze_trace(trace_id: str):
"""Comprehensive analysis of a trace."""
# Get the trace
trace = mlflow.get_trace(trace_id)
print(f"=== TRACE ANALYSIS: {trace_id} ===\n")
# 1. Basic Information
print("1. BASIC INFORMATION")
print(f" State: {trace.info.state}")
print(f" Duration: {trace.info.execution_duration}ms")
print(f" Start time: {datetime.datetime.fromtimestamp(trace.info.request_time/1000)}")
if trace.info.experiment_id:
print(f" Experiment: {trace.info.experiment_id}")
# 2. Tags Analysis
print("\n2. TAGS")
for key, value in sorted(trace.info.tags.items()):
print(f" {key}: {value}")
# 3. Token Usage
print("\n3. TOKEN USAGE")
if tokens := trace.info.token_usage:
print(f" Input: {tokens.get('input_tokens', 0)}")
print(f" Output: {tokens.get('output_tokens', 0)}")
print(f" Total: {tokens.get('total_tokens', 0)}")
# Calculate from spans if not in metadata
total_input = 0
total_output = 0
for span in trace.data.spans:
if span.span_type == SpanType.CHAT_MODEL:
total_input += span.get_attribute("llm.token_usage.input_tokens") or 0
total_output += span.get_attribute("llm.token_usage.output_tokens") or 0
if total_input or total_output:
print(f" (From spans - Input: {total_input}, Output: {total_output})")
# 4. Span Analysis
print("\n4. SPAN ANALYSIS")
span_types = {}
error_spans = []
for span in trace.data.spans:
# Count by type
span_types[span.span_type] = span_types.get(span.span_type, 0) + 1
# Collect errors
if span.status.status_code.name == "ERROR":
error_spans.append(span)
print(" Span counts by type:")
for span_type, count in sorted(span_types.items()):
print(f" {span_type}: {count}")
if error_spans:
print(f"\n Error spans ({len(error_spans)}):")
for span in error_spans:
print(f" - {span.name}: {span.status.description}")
# 5. Retrieval Analysis
print("\n5. RETRIEVAL ANALYSIS")
retriever_spans = trace.search_spans(span_type=SpanType.RETRIEVER)
if retriever_spans:
for r_span in retriever_spans:
if r_span.outputs:
docs = r_span.outputs
print(f" Retrieved {len(docs)} documents:")
for doc in docs[:3]: # Show first 3
if isinstance(doc, dict):
uri = doc.get('metadata', {}).get('doc_uri', 'Unknown')
score = doc.get('metadata', {}).get('relevance_score', 'N/A')
print(f" - {uri} (score: {score})")
# 6. Assessment Summary
print("\n6. ASSESSMENTS")
assessments = trace.search_assessments()
# Group by source type
by_source = {}
for assessment in assessments:
source_type = assessment.source.source_type.value
if source_type not in by_source:
by_source[source_type] = []
by_source[source_type].append(assessment)
for source_type, items in by_source.items():
print(f"\n {source_type} ({len(items)}):")
for assessment in items:
value_str = f"{assessment.value}"
if assessment.rationale:
value_str += f" - {assessment.rationale[:50]}..."
print(f" {assessment.name}: {value_str}")
# 7. Performance Breakdown
print("\n7. PERFORMANCE BREAKDOWN")
root_span = next((s for s in trace.data.spans if s.parent_id is None), None)
if root_span:
total_duration_ns = root_span.end_time_ns - root_span.start_time_ns
# Calculate time spent in each span type
time_by_type = {}
for span in trace.data.spans:
duration_ms = (span.end_time_ns - span.start_time_ns) / 1_000_000
if span.span_type not in time_by_type:
time_by_type[span.span_type] = 0
time_by_type[span.span_type] += duration_ms
print(" Time by span type:")
for span_type, duration_ms in sorted(time_by_type.items(),
key=lambda x: x[1], reverse=True):
percentage = (duration_ms / (total_duration_ns / 1_000_000)) * 100
print(f" {span_type}: {duration_ms:.1f}ms ({percentage:.1f}%)")
# 8. Data Flow
print("\n8. DATA FLOW")
if intermediate := trace.data.intermediate_outputs:
print(" Intermediate outputs:")
for name, output in intermediate.items():
output_str = str(output)[:100] + "..." if len(str(output)) > 100 else str(output)
print(f" {name}: {output_str}")
return trace
# Run the analysis
analysis_result = analyze_trace(trace_id)
Building reusable trace utilities
class TraceAnalyzer:
"""Utility class for advanced trace analysis."""
def __init__(self, trace: mlflow.entities.Trace):
self.trace = trace
def get_error_summary(self):
"""Get summary of all errors in the trace."""
errors = []
# Check trace status
if self.trace.info.state == "ERROR":
errors.append({
"level": "trace",
"message": "Trace failed",
"details": self.trace.info.response_preview
})
# Check span errors
for span in self.trace.data.spans:
if span.status.status_code.name == "ERROR":
errors.append({
"level": "span",
"span_name": span.name,
"span_type": span.span_type,
"message": span.status.description,
"span_id": span.span_id
})
# Check assessment errors
for assessment in self.trace.info.assessments:
if assessment.error:
errors.append({
"level": "assessment",
"assessment_name": assessment.name,
"error": str(assessment.error)
})
return errors
def get_llm_usage_summary(self):
"""Aggregate LLM usage across all spans."""
usage = {
"total_llm_calls": 0,
"total_input_tokens": 0,
"total_output_tokens": 0,
"spans": []
}
for span in self.trace.data.spans:
if span.span_type in [SpanType.CHAT_MODEL, "LLM"]:
usage["total_llm_calls"] += 1
input_tokens = span.get_attribute("llm.token_usage.input_tokens") or 0
output_tokens = span.get_attribute("llm.token_usage.output_tokens") or 0
usage["total_input_tokens"] += input_tokens
usage["total_output_tokens"] += output_tokens
usage["spans"].append({
"name": span.name,
"input_tokens": input_tokens,
"output_tokens": output_tokens
})
usage["total_tokens"] = usage["total_input_tokens"] + usage["total_output_tokens"]
return usage
def get_retrieval_metrics(self):
"""Extract retrieval quality metrics."""
metrics = []
for span in self.trace.search_spans(span_type=SpanType.RETRIEVER):
if span.outputs:
docs = span.outputs
relevance_scores = []
for doc in docs:
if isinstance(doc, dict) and 'metadata' in doc:
if score := doc['metadata'].get('relevance_score'):
relevance_scores.append(score)
metrics.append({
"span_name": span.name,
"num_documents": len(docs),
"avg_relevance": sum(relevance_scores) / len(relevance_scores) if relevance_scores else None,
"max_relevance": max(relevance_scores) if relevance_scores else None,
"min_relevance": min(relevance_scores) if relevance_scores else None
})
return metrics
def get_span_hierarchy(self):
"""Build a hierarchical view of spans."""
# Create span lookup
span_dict = {span.span_id: span for span in self.trace.data.spans}
# Find root spans
roots = [span for span in self.trace.data.spans if span.parent_id is None]
def build_tree(span, indent=0):
result = []
duration_ms = (span.end_time_ns - span.start_time_ns) / 1_000_000
result.append({
"indent": indent,
"name": span.name,
"type": span.span_type,
"duration_ms": duration_ms,
"status": span.status.status_code.name
})
# Find children
children = [s for s in self.trace.data.spans if s.parent_id == span.span_id]
for child in sorted(children, key=lambda s: s.start_time_ns):
result.extend(build_tree(child, indent + 1))
return result
hierarchy = []
for root in roots:
hierarchy.extend(build_tree(root))
return hierarchy
def export_for_evaluation(self):
"""Export trace data in a format suitable for evaluation."""
# Get root span data
request = response = None
if self.trace.data.request:
request = json.loads(self.trace.data.request)
if self.trace.data.response:
response = json.loads(self.trace.data.response)
# Get expected values from assessments
expectations = self.trace.search_assessments(type="expectation")
expected_values = {exp.name: exp.value for exp in expectations}
# Get retrieval context
retrieved_context = []
for span in self.trace.search_spans(span_type=SpanType.RETRIEVER):
if span.outputs:
for doc in span.outputs:
if isinstance(doc, dict) and 'page_content' in doc:
retrieved_context.append(doc['page_content'])
return {
"trace_id": self.trace.info.trace_id,
"request": request,
"response": response,
"retrieved_context": retrieved_context,
"expected_facts": expected_values.get("expected_facts", []),
"metadata": {
"user_id": self.trace.info.tags.get("user_id"),
"session_id": self.trace.info.tags.get("session_id"),
"duration_ms": self.trace.info.execution_duration,
"timestamp": self.trace.info.request_time
}
}
# Use the analyzer
analyzer = TraceAnalyzer(trace)
# Get various analyses
errors = analyzer.get_error_summary()
print(f"\nErrors found: {len(errors)}")
for error in errors:
print(f" - {error['level']}: {error.get('message', error.get('error'))}")
llm_usage = analyzer.get_llm_usage_summary()
print(f"\nLLM Usage: {llm_usage['total_tokens']} total tokens across {llm_usage['total_llm_calls']} calls")
retrieval_metrics = analyzer.get_retrieval_metrics()
print(f"\nRetrieval Metrics:")
for metric in retrieval_metrics:
print(f" - {metric['span_name']}: {metric['num_documents']} docs, avg relevance: {metric['avg_relevance']}")
# Export for evaluation
eval_data = analyzer.export_for_evaluation()
print(f"\nExported evaluation data with {len(eval_data['retrieved_context'])} context chunks")
Next steps
- Search and analyze traces - Learn to query multiple traces
- Build evaluation datasets - Convert traces to evaluation datasets
- Log assessments - Add feedback to traces
Reference guides
- Trace data model - Detailed trace structure documentation
- Tracing concepts - Understand tracing fundamentals