Skip to main content

Access trace data

This page demonstrates how to access every aspect of trace data including metadata, spans, assessments, and more. Once you learn how to access trace data, see Analyze traces

The MLflow Trace object consists of two main components:

Basic metadata properties

API Reference: TraceInfo

Python
# Primary identifiers
print(f"Trace ID: {trace.info.trace_id}")
print(f"Client Request ID: {trace.info.client_request_id}")

# Status information
print(f"State: {trace.info.state}") # OK, ERROR, IN_PROGRESS
print(f"Status (deprecated): {trace.info.status}") # Use state instead

# Request/response previews (truncated)
print(f"Request preview: {trace.info.request_preview}")
print(f"Response preview: {trace.info.response_preview}")

Storage location and experiment

Python
# Trace storage location
location = trace.info.trace_location
print(f"Location type: {location.type}")

# If stored in MLflow experiment
if location.mlflow_experiment:
print(f"Experiment ID: {location.mlflow_experiment.experiment_id}")
# Shortcut property
print(f"Experiment ID: {trace.info.experiment_id}")

# If stored in Databricks inference table
if location.inference_table:
print(f"Table: {location.inference_table.full_table_name}")

Request and response previews

The request_preview and response_preview properties provide truncated summaries of the full request and response data, making it easy to quickly understand what happened without loading the complete payloads.

Python
request_preview = trace.info.request_preview
response_preview = trace.info.response_preview

print(f"Request preview: {request_preview}")
print(f"Response preview: {response_preview}")

# Compare with full request/response data
full_request = trace.data.request # Complete request text
full_response = trace.data.response # Complete response text

if full_request and request_preview:
print(f"Full request length: {len(full_request)} characters")
print(f"Preview is {len(request_preview)/len(full_request)*100:.1f}% of full request")

API Reference: TraceInfo timing properties

Python
# Timestamps (milliseconds since epoch)
print(f"Start time (ms): {trace.info.request_time}")
print(f"Timestamp (ms): {trace.info.timestamp_ms}") # Alias for request_time

# Duration
print(f"Execution duration (ms): {trace.info.execution_duration}")
print(f"Execution time (ms): {trace.info.execution_time_ms}") # Alias

# Convert to human-readable format
import datetime
start_time = datetime.datetime.fromtimestamp(trace.info.request_time / 1000)
print(f"Started at: {start_time}")

Tags and metadata

API Reference: TraceInfo.tags and TraceInfo.trace_metadata

Python
# Tags (mutable, can be updated after creation)
print("Tags:")
for key, value in trace.info.tags.items():
print(f" {key}: {value}")

# Access specific tags
print(f"Environment: {trace.info.tags.get('environment')}")
print(f"User ID: {trace.info.tags.get('user_id')}")

# Trace metadata (immutable, set at creation)
print("\nTrace metadata:")
for key, value in trace.info.trace_metadata.items():
print(f" {key}: {value}")

# Deprecated alias
print(f"Request metadata: {trace.info.request_metadata}") # Same as trace_metadata

Token usage information

MLflow Tracing can track token usage of LLM calls, using token counts returned by LLM provider APIs.

Python
# Get aggregated token usage (if available)
token_usage = trace.info.token_usage
if token_usage:
print(f"Input tokens: {token_usage.get('input_tokens')}")
print(f"Output tokens: {token_usage.get('output_tokens')}")
print(f"Total tokens: {token_usage.get('total_tokens')}")

How you track token usage depends on the LLM provider. The following table describes different methods for tracking token usage across various providers and platforms.

Scenario

How to track token usage

Databricks Foundation Model APIs

Use the OpenAI client to verify that MLflow Tracing automatically tracks token usage.

LLM providers with native MLflow Tracing support

See the provider's integration page under MLflow Tracing Integrations to determine if native token tracking is supported.

Providers without native MLflow Tracing support

Manually log token usage using Span.set_attribute, as shown in this example.

Monitor multiple endpoints across your AI platform.

Use AI Gateway usage tracking for logging token usage to system tables across serving endpoints.

Assessments

Find assessments with search_assessments().

API Reference: Trace.search_assessments

Python
# 1. Get all assessments
all_assessments = trace.search_assessments()
print(f"Total assessments: {len(all_assessments)}")

# 2. Search by name
helpfulness = trace.search_assessments(name="helpfulness")
if helpfulness:
assessment = helpfulness[0]
print(f"Helpfulness: {assessment.value}")
print(f"Source: {assessment.source.source_type} - {assessment.source.source_id}")
print(f"Rationale: {assessment.rationale}")

# 3. Search by type
feedback_only = trace.search_assessments(type="feedback")
expectations_only = trace.search_assessments(type="expectation")
print(f"Feedback assessments: {len(feedback_only)}")
print(f"Expectation assessments: {len(expectations_only)}")

# 4. Search by span ID
span_assessments = trace.search_assessments(span_id=retriever_span.span_id)
print(f"Assessments for retriever span: {len(span_assessments)}")

# 5. Get all assessments including overridden ones
all_including_invalid = trace.search_assessments(all=True)
print(f"All assessments (including overridden): {len(all_including_invalid)}")

# 6. Combine criteria
human_feedback = trace.search_assessments(
type="feedback",
name="helpfulness"
)
for fb in human_feedback:
print(f"Human feedback: {fb.name} = {fb.value}")

Access assessment details

Python
# Get detailed assessment information
for assessment in trace.info.assessments:
print(f"\nAssessment: {assessment.name}")
print(f" Type: {type(assessment).__name__}")
print(f" Value: {assessment.value}")
print(f" Source: {assessment.source.source_type.value}")
print(f" Source ID: {assessment.source.source_id}")

# Optional fields
if assessment.rationale:
print(f" Rationale: {assessment.rationale}")
if assessment.metadata:
print(f" Metadata: {assessment.metadata}")
if assessment.error:
print(f" Error: {assessment.error}")
if hasattr(assessment, 'span_id') and assessment.span_id:
print(f" Span ID: {assessment.span_id}")

Work with Spans

Spans are the building blocks of traces, representing individual operations or units of work. The Span class represents immutable, completed spans retrieved from traces.

Access span properties

API Reference: TraceData.spans and Span

Python
# Access all spans from a trace
spans = trace.data.spans
print(f"Total spans: {len(spans)}")

# Get a specific span
span = spans[0]

# Basic properties
print(f"Span ID: {span.span_id}")
print(f"Name: {span.name}")
print(f"Type: {span.span_type}")
print(f"Trace ID: {span.trace_id}") # Which trace this span belongs to
print(f"Parent ID: {span.parent_id}") # None for root spans

# Timing information (nanoseconds)
print(f"Start time: {span.start_time_ns}")
print(f"End time: {span.end_time_ns}")
duration_ms = (span.end_time_ns - span.start_time_ns) / 1_000_000
print(f"Duration: {duration_ms:.2f}ms")

# Status information
print(f"Status: {span.status}")
print(f"Status code: {span.status.status_code}")
print(f"Status description: {span.status.description}")

# Inputs and outputs
print(f"Inputs: {span.inputs}")
print(f"Outputs: {span.outputs}")

# Iterate through all spans
for span in spans:
print(f"\nSpan: {span.name}")
print(f" ID: {span.span_id}")
print(f" Type: {span.span_type}")
print(f" Duration (ms): {(span.end_time_ns - span.start_time_ns) / 1_000_000:.2f}")

# Parent-child relationships
if span.parent_id:
print(f" Parent ID: {span.parent_id}")

Find specific spans

Use search_spans() to find spans matching specific criteria:

Python
import re
from mlflow.entities import SpanType

# 1. Search by exact name
retriever_spans = trace.search_spans(name="retrieve_documents")
print(f"Found {len(retriever_spans)} retriever spans")

# 2. Search by regex pattern
pattern = re.compile(r".*_tool$")
tool_spans = trace.search_spans(name=pattern)
print(f"Found {len(tool_spans)} tool spans")

# 3. Search by span type
chat_spans = trace.search_spans(span_type=SpanType.CHAT_MODEL)
llm_spans = trace.search_spans(span_type="CHAT_MODEL") # String also works
print(f"Found {len(chat_spans)} chat model spans")

# 4. Search by span ID
specific_span = trace.search_spans(span_id=retriever_spans[0].span_id)
print(f"Found span: {specific_span[0].name if specific_span else 'Not found'}")

# 5. Combine criteria
tool_fact_check = trace.search_spans(
name="fact_check_tool",
span_type=SpanType.TOOL
)
print(f"Found {len(tool_fact_check)} fact check tool spans")

# 6. Get all spans of a type
all_tools = trace.search_spans(span_type=SpanType.TOOL)
for tool in all_tools:
print(f"Tool: {tool.name}")

Intermediate outputs

Python
# Get intermediate outputs from non-root spans
intermediate = trace.data.intermediate_outputs
if intermediate:
print("\nIntermediate outputs:")
for span_name, output in intermediate.items():
print(f" {span_name}: {output}")

Span attributes

API Reference: Span.get_attribute and SpanAttributeKey

Python
from mlflow.tracing.constant import SpanAttributeKey

# Get a chat model span
chat_span = trace.search_spans(span_type=SpanType.CHAT_MODEL)[0]

# Get all attributes
print("All span attributes:")
for key, value in chat_span.attributes.items():
print(f" {key}: {value}")

# Get specific attribute
specific_attr = chat_span.get_attribute("custom_attribute")
print(f"Custom attribute: {specific_attr}")

# Access chat-specific attributes using SpanAttributeKey
messages = chat_span.get_attribute(SpanAttributeKey.CHAT_MESSAGES)
tools = chat_span.get_attribute(SpanAttributeKey.CHAT_TOOLS)

print(f"Chat messages: {messages}")
print(f"Available tools: {tools}")

# Access token usage from span
input_tokens = chat_span.get_attribute("llm.token_usage.input_tokens")
output_tokens = chat_span.get_attribute("llm.token_usage.output_tokens")
print(f"Span token usage - Input: {input_tokens}, Output: {output_tokens}")

Create and modify spans during execution

When creating spans during execution, you work with LiveSpan objects that can be modified:

Python
import mlflow
from mlflow.entities import SpanType, SpanStatus, SpanStatusCode

@mlflow.trace(span_type=SpanType.CHAIN)
def process_data(data: dict):
# Get the current active span (LiveSpan)
span = mlflow.get_current_active_span()

# Set span type (if not set via decorator)
span.set_span_type(SpanType.CHAIN)

# Set inputs
span.set_inputs({"data": data, "timestamp": time.time()})

# Set individual attributes
span.set_attribute("processing_version", "2.0")
span.set_attribute("data_size", len(str(data)))

# Set multiple attributes at once
span.set_attributes({
"environment": "production",
"region": "us-west-2",
"custom_metadata": {"key": "value"}
})

try:
# Process the data
result = {"processed": True, "count": len(data)}

# Set outputs
span.set_outputs(result)

# Set success status
span.set_status(SpanStatusCode.OK)

except Exception as e:
# Record the exception
span.record_exception(e)
# This automatically sets status to ERROR and adds an exception event
raise

return result

# Example with manual span creation
with mlflow.start_span(name="manual_span", span_type=SpanType.TOOL) as span:
# Add events during execution
from mlflow.entities import SpanEvent

span.add_event(SpanEvent(
name="processing_started",
attributes={
"stage": "initialization",
"memory_usage_mb": 256
}
))

# Do some work...
time.sleep(0.1)

# Add another event
span.add_event(SpanEvent(
name="checkpoint_reached",
attributes={"progress": 0.5}
))

# Manually end the span with outputs and status
span.end(
outputs={"result": "success"},
attributes={"final_metric": 0.95},
status=SpanStatusCode.OK
)

Span events

Events record specific occurrences during a span's lifetime.

API Reference: SpanEvent

Python
from mlflow.entities import SpanEvent
import time

# Create an event with current timestamp
event = SpanEvent(
name="validation_completed",
attributes={
"records_validated": 1000,
"errors_found": 3,
"validation_type": "schema"
}
)

# Create an event with specific timestamp (nanoseconds)
specific_time_event = SpanEvent(
name="data_checkpoint",
timestamp=int(time.time() * 1e9),
attributes={"checkpoint_id": "ckpt_123"}
)

# Create an event from an exception
try:
raise ValueError("Invalid input format")
except Exception as e:
error_event = SpanEvent.from_exception(e)
# This creates an event with name="exception" and attributes containing:
# - exception.message
# - exception.type
# - exception.stacktrace

# Add to current span
span = mlflow.get_current_active_span()
span.add_event(error_event)

Span status

Control and query span execution status.

API Reference: SpanStatus

Python
from mlflow.entities import SpanStatus, SpanStatusCode

# Create status objects
success_status = SpanStatus(SpanStatusCode.OK)
error_status = SpanStatus(
SpanStatusCode.ERROR,
description="Failed to connect to database"
)

# Set status on a live span
span = mlflow.get_current_active_span()
span.set_status(success_status)

# Or use string shortcuts
span.set_status("OK")
span.set_status("ERROR")

# Query status from completed spans
for span in trace.data.spans:
if span.status.status_code == SpanStatusCode.ERROR:
print(f"Error in {span.name}: {span.status.description}")

Work with RETRIEVER spans

RETRIEVER spans have special output requirements:

Python
from mlflow.entities import Document, SpanType

@mlflow.trace(span_type=SpanType.RETRIEVER)
def retrieve_documents(query: str):
span = mlflow.get_current_active_span()

# Create Document objects (required for RETRIEVER spans)
documents = [
Document(
page_content="The content of the document...",
metadata={
"doc_uri": "path/to/document.md",
"chunk_id": "chunk_001",
"relevance_score": 0.95,
"source": "knowledge_base"
},
id="doc_123" # Optional document ID
),
Document(
page_content="Another relevant section...",
metadata={
"doc_uri": "path/to/other.md",
"chunk_id": "chunk_042",
"relevance_score": 0.87
}
)
]

# Set outputs as Document objects for proper UI rendering
span.set_outputs(documents)

# Return in your preferred format
return [doc.to_dict() for doc in documents]

# Access retriever outputs
retriever_span = trace.search_spans(span_type=SpanType.RETRIEVER)[0]
if retriever_span.outputs:
for doc in retriever_span.outputs:
if isinstance(doc, dict):
content = doc.get('page_content', '')
uri = doc.get('metadata', {}).get('doc_uri', '')
score = doc.get('metadata', {}).get('relevance_score', 0)
print(f"Document from {uri} (score: {score})")

Advanced span operations

Convert spans to/from dictionaries

Python
# Convert span to dictionary
span_dict = span.to_dict()
print(f"Span dict keys: {span_dict.keys()}")

# Recreate span from dictionary
from mlflow.entities import Span
reconstructed_span = Span.from_dict(span_dict)
print(f"Reconstructed span: {reconstructed_span.name}")

Advanced span analysis

Python
def analyze_span_tree(trace):
"""Analyze the span hierarchy and relationships."""
spans = trace.data.spans

# Build parent-child relationships
span_dict = {span.span_id: span for span in spans}
children = {}

for span in spans:
if span.parent_id:
if span.parent_id not in children:
children[span.parent_id] = []
children[span.parent_id].append(span)

# Find root spans
roots = [s for s in spans if s.parent_id is None]

def print_tree(span, indent=0):
duration_ms = (span.end_time_ns - span.start_time_ns) / 1_000_000
status_icon = "✓" if span.status.status_code == SpanStatusCode.OK else "✗"
print(f"{' ' * indent}{status_icon} {span.name} ({span.span_type}) - {duration_ms:.1f}ms")

# Print children
for child in sorted(children.get(span.span_id, []),
key=lambda s: s.start_time_ns):
print_tree(child, indent + 1)

print("Span Hierarchy:")
for root in roots:
print_tree(root)

# Calculate span statistics
total_time = sum((s.end_time_ns - s.start_time_ns) / 1_000_000
for s in spans)
llm_time = sum((s.end_time_ns - s.start_time_ns) / 1_000_000
for s in spans if s.span_type in [SpanType.LLM, SpanType.CHAT_MODEL])
retrieval_time = sum((s.end_time_ns - s.start_time_ns) / 1_000_000
for s in spans if s.span_type == SpanType.RETRIEVER)

print(f"\nSpan Statistics:")
print(f" Total spans: {len(spans)}")
print(f" Total time: {total_time:.1f}ms")
print(f" LLM time: {llm_time:.1f}ms ({llm_time/total_time*100:.1f}%)")
print(f" Retrieval time: {retrieval_time:.1f}ms ({retrieval_time/total_time*100:.1f}%)")

# Find critical path (longest duration path from root to leaf)
def find_critical_path(span):
child_paths = []
for child in children.get(span.span_id, []):
path, duration = find_critical_path(child)
child_paths.append((path, duration))

span_duration = (span.end_time_ns - span.start_time_ns) / 1_000_000
if child_paths:
best_path, best_duration = max(child_paths, key=lambda x: x[1])
return [span] + best_path, span_duration + best_duration
else:
return [span], span_duration

if roots:
critical_paths = [find_critical_path(root) for root in roots]
critical_path, critical_duration = max(critical_paths, key=lambda x: x[1])

print(f"\nCritical Path ({critical_duration:.1f}ms total):")
for span in critical_path:
duration_ms = (span.end_time_ns - span.start_time_ns) / 1_000_000
print(f" → {span.name} ({duration_ms:.1f}ms)")

# Use the analyzer
analyze_span_tree(trace)

Request and response data

Python
# Get root span request/response (backward compatibility)
request_json = trace.data.request
response_json = trace.data.response

# Parse JSON strings
import json
if request_json:
request_data = json.loads(request_json)
print(f"Request: {request_data}")

if response_json:
response_data = json.loads(response_json)
print(f"Response: {response_data}")

Data export and conversion

Convert to dictionary

API Reference: Trace.to_dict

Python
# Convert entire trace to dictionary
trace_dict = trace.to_dict()
print(f"Trace dict keys: {trace_dict.keys()}")
print(f"Info keys: {trace_dict['info'].keys()}")
print(f"Data keys: {trace_dict['data'].keys()}")

# Convert individual components
info_dict = trace.info.to_dict()
data_dict = trace.data.to_dict()

# Reconstruct trace from dictionary
from mlflow.entities import Trace
reconstructed_trace = Trace.from_dict(trace_dict)
print(f"Reconstructed trace ID: {reconstructed_trace.info.trace_id}")

JSON serialization

API Reference: Trace.to_json

Python
# Convert to JSON string
trace_json = trace.to_json()
print(f"JSON length: {len(trace_json)} characters")

# Pretty print JSON
trace_json_pretty = trace.to_json(pretty=True)
print("Pretty JSON (first 500 chars):")
print(trace_json_pretty[:500])

# Load trace from JSON
from mlflow.entities import Trace
loaded_trace = Trace.from_json(trace_json)
print(f"Loaded trace ID: {loaded_trace.info.trace_id}")

Pandas DataFrame conversion

API Reference: Trace.to_pandas_dataframe_row

Python
# Convert trace to DataFrame row
row_data = trace.to_pandas_dataframe_row()
print(f"DataFrame row keys: {list(row_data.keys())}")

# Create DataFrame from multiple traces
import pandas as pd

# Get multiple traces
traces = mlflow.search_traces(max_results=5)

# If you have individual trace objects
trace_rows = [t.to_pandas_dataframe_row() for t in [trace]]
df = pd.DataFrame(trace_rows)

print(f"DataFrame shape: {df.shape}")
print(f"Columns: {df.columns.tolist()}")

# Access specific data from DataFrame
print(f"Trace IDs: {df['trace_id'].tolist()}")
print(f"States: {df['state'].tolist()}")
print(f"Durations: {df['execution_duration'].tolist()}")

Next steps