Skip to main content

Search and analyze traces

Learn how to create searchable traces, query them effectively, and analyze the results to gain insights into your GenAI application's behavior.

Quick reference

Essential search syntax

Python
# Search by status
mlflow.search_traces("attributes.status = 'OK'")
mlflow.search_traces("attributes.status = 'ERROR'")

# Search by time (milliseconds since epoch)
mlflow.search_traces("attributes.timestamp_ms > 1749006880539")
mlflow.search_traces("attributes.execution_time_ms > 5000")

# Search by tags
mlflow.search_traces("tags.environment = 'production'")
mlflow.search_traces("tags.`mlflow.traceName` = 'my_function'")

# Search by metadata
mlflow.search_traces("metadata.`mlflow.user` = 'alice@company.com'")

# Combined filters (AND only)
mlflow.search_traces(
"attributes.status = 'OK' AND tags.environment = 'production'"
)

Key rules

  • Always use prefixes: attributes., tags., or metadata.
  • Backticks if tag or attribute names have dots: tags.`mlflow.traceName`
  • Single quotes only: 'value' not "value"
  • Milliseconds for time: 1749006880539 not dates
  • AND only: No OR support

Searchable fields

Field

Path

Operators

Status

attributes.status

=, !=

Timestamp

attributes.timestamp_ms

=, <, <=, >, >=

Duration

attributes.execution_time_ms

=, <, <=, >, >=

Tags

tags.*

=, !=

Metadata

metadata.*

=, !=

End-to-end example

Prerequisites
  1. Install MLflow and required packages

    Bash
    pip install --upgrade "mlflow[databricks]>=3.1.0" openai "databricks-connect>=16.1"
  2. Create an MLflow experiment by following the setup your environment quickstart.

Create sample traces to demonstrate search functionality:

Python
import time
import mlflow

# Define methods to be traced
@mlflow.trace()
def morning_greeting(name: str):
time.sleep(1)
# Add tag and metadata for better categorization
mlflow.update_current_trace(
tags={"person": name},
)
return f"Good morning {name}."


@mlflow.trace()
def evening_greeting(name: str):
time.sleep(1)
# Add tag with different values for comparison
mlflow.update_current_trace(
tags={"person": name},
)
return f"Good evening {name}."

@mlflow.trace()
def goodbye():
# Add tag even for functions that might fail
mlflow.update_current_trace(
tags={"greeting_type": "goodbye"},
)
raise Exception("Cannot say goodbye")


# Execute the methods
morning_greeting("Tom")

# Get the timestamp in milliseconds
morning_time = int(time.time() * 1000)

evening_greeting("Mary")

# Execute goodbye, catching the exception
try:
goodbye()
except Exception as e:
print(f"Caught expected exception: {e}")
pass

The code above creates the following traces:

traces

Search these traces using the correct field prefixes:

Python
# Search successful traces
traces = mlflow.search_traces(
filter_string="attributes.status = 'OK'",
)
print(traces)
# 2 results

# Search failed traces
traces = mlflow.search_traces(
filter_string="attributes.status = 'ERROR'",
)
print(traces)
# 1 result

# Search all traces in experiment
traces = mlflow.search_traces()
print(traces)
# 3 results

# Search by single tag
traces = mlflow.search_traces(filter_string="tags.person = 'Tom'")
print(traces)
# 1 result

# Complex search combining tags and status
traces = mlflow.search_traces(
filter_string="tags.person = 'Tom' AND attributes.status = 'OK'"
)
print(traces)
# 1 result

# Search by timestamp
traces = mlflow.search_traces(filter_string=f"attributes.timestamp > {morning_time}")
print(traces)
# 1 result

API reference

Search API

Use mlflow.search_traces() to search and analyze traces in your experiments:

Python
mlflow.search_traces(
experiment_ids: Optional[List[str]] = None, # Uses active experiment if not specified
filter_string: Optional[str] = None,
max_results: Optional[int] = None,
order_by: Optional[List[str]] = None,
extract_fields: Optional[List[str]] = None, # DataFrame column extraction (pandas only)
run_id: Optional[str] = None, # Filter traces by run ID
return_type: Optional[Literal["pandas", "list"]] = None, # Return type (default: pandas if available)
model_id: Optional[str] = None, # Search traces by model ID
sql_warehouse_id: Optional[str] = None # Databricks SQL warehouse ID
) -> Union[pandas.DataFrame, List[Trace]]

Parameter details:

Parameter

Description

experiment_ids

List of experiment ids to scope the search. If not provided, the search will be performed across the current active experiment.

filter_string

A search filter string.

max_results

Maximum number of traces desired. If None, all traces matching the search expressions will be returned.

order_by

List of order_by clauses.

extract_fields

Specify fields to extract from traces using the format "span_name.[inputs|outputs].field_name" or "span_name.[inputs|outputs]".

run_id

A run id to scope the search. When a trace is created under an active run, it will be associated with the run and you can filter on the run id to retrieve the trace. See the example below for how to filter traces by run id.

return_type

The type of the return value. The following return types are supported. If the pandas library is installed, the default return type is "pandas". Otherwise, the default return type is "list":

"pandas": Returns a Pandas DataFrame containing information about traces where each row represents a single trace and each column represents a field of the trace e.g. trace_id, spans, etc.

"list": Returns a list of :py:class:Trace <mlflow.entities.Trace> objects.

model_id

If specified, search traces associated with the given model ID.

note

MLflow also provides the MlflowClient.search_traces(). However, we recommend using mlflow.search_traces() - with the exception of pagination support, it provides a superset of functionality with more convenient defaults and additional features like DataFrame output and field extraction.

Searchable fields reference

important

For a complete reference on these fields, refer to the trace data model.

Field Type

Search Path

Operators

Values

Notes

Metadata

metadata.*

=, !=

See details below

String equality only

Tags

tags.*

=, !=

See details below

String equality only

Status

attributes.status

=, !=

OK, ERROR, IN_PROGRESS

String equality only

Name

attributes.name

=, !=

Trace name

String equality only

Timestamp

attributes.timestamp_ms

=, <, <=, >, >=

Creation time (ms since epoch)

Numeric comparisons

Execution Time

attributes.execution_time_ms

=, <, <=, >, >=

Duration in milliseconds

Numeric comparisons

Metadata details

The following metadata fields are available for filtering:

  • metadata.mlflow.traceInputs: Request content
  • metadata.mlflow.traceOutputs: Response content
  • metadata.mlflow.sourceRun: Source run ID
  • metadata.mlflow.modelId: Model ID
  • metadata.mlflow.trace.sizeBytes: Trace size in bytes
  • metadata.mlflow.trace.tokenUsage: Aggregated token usage information (JSON string)
  • metadata.mlflow.trace.user: User ID/name of the application request
  • metadata.mlflow.trace.session: Session ID of the application request

Tags details

In addition to user defined tags, the following system defined tags are available:

Filter syntax rules

  1. Table prefixes required: Always use attributes., tags., or metadata.
  2. Backticks for dots: Fields with dots need backticks: tags.`mlflow.traceName`
  3. Single quotes only: String values must use single quotes: 'value'
  4. Case sensitive: All field names and values are case sensitive
  5. AND only: OR operators are not supported

Order by syntax

Python
# Single field ordering
order_by=["attributes.timestamp_ms DESC"]
order_by=["attributes.execution_time_ms ASC"]

# Multiple field ordering (applied in sequence)
order_by=[
"attributes.timestamp_ms DESC",
"attributes.execution_time_ms ASC"
]

# Supported fields for ordering
# - attributes.timestamp_ms (and aliases)
# - attributes.execution_time_ms (and aliases)
# - attributes.status
# - attributes.name

Common patterns

Python
# Status filtering
"attributes.status = 'OK'"
"attributes.status = 'ERROR'"

# Time-based queries
"attributes.timestamp_ms > 1749006880539"
"attributes.execution_time_ms > 5000"

# Tag searches
"tags.user_id = 'U001'"
"tags.`mlflow.traceName` = 'my_function'"

# Metadata queries
"metadata.`mlflow.user` = 'alice@company.com'"
"metadata.`mlflow.traceOutputs` != ''"

# Combined filters
"attributes.status = 'OK' AND tags.environment = 'production'"
"attributes.timestamp_ms > 1749006880539 AND attributes.execution_time_ms > 1000"

Common pitfalls

❌ Incorrect

✅ Correct

Issue

status = 'OK'

attributes.status = 'OK'

Missing prefix

mlflow.user = 'alice'

metadata.`mlflow.user` = 'alice'

Missing prefix and backticks

timestamp > '2024-01-01'

attributes.timestamp > 1704067200000

Use milliseconds, not strings

tags.env = "prod"

tags.env = 'prod'

Use single quotes

status = 'OK' OR status = 'ERROR'

Use separate queries

OR not supported

Detailed search examples

Search by run ID

Python
# Find all traces associated with a specific MLflow run
with mlflow.start_run() as run:
# Your traced code here
traced_result = my_traced_function()

# Search for traces from this run
run_traces = mlflow.search_traces(
run_id=run.info.run_id,
return_type="list" # Get list of Trace objects
)

Control return type

Python
# Get results as pandas DataFrame (default if pandas is installed)
traces_df = mlflow.search_traces(
filter_string="attributes.status = 'OK'",
return_type="pandas"
)

# Get results as list of Trace objects
traces_list = mlflow.search_traces(
filter_string="attributes.status = 'OK'",
return_type="list"
)

# Access trace details from list
for trace in traces_list:
print(f"Trace ID: {trace.info.trace_id}")
print(f"Status: {trace.info.state}")
print(f"Duration: {trace.info.execution_duration}")

Search by model ID

Python
# Find traces associated with a specific MLflow model
model_traces = mlflow.search_traces(
model_id="my-model-123",
filter_string="attributes.status = 'OK'"
)

# Analyze model performance
print(f"Found {len(model_traces)} successful traces for model")
print(f"Average latency: {model_traces['execution_time_ms'].mean():.2f}ms")

Search by status

Python
# Find successful traces
traces = mlflow.search_traces(filter_string="attributes.status = 'OK'")

# Find failed traces
traces = mlflow.search_traces(filter_string="attributes.status = 'ERROR'")

# Find in-progress traces
traces = mlflow.search_traces(filter_string="attributes.status = 'IN_PROGRESS'")

# Exclude errors
traces = mlflow.search_traces(filter_string="attributes.status != 'ERROR'")

Search by trace name

Python
# Find traces with specific name (rarely used - legacy field)
traces = mlflow.search_traces(filter_string="attributes.name = 'foo'")

# Find traces excluding a specific name
traces = mlflow.search_traces(filter_string="attributes.name != 'test_trace'")

# Note: Most users should use tags.`mlflow.traceName` instead
traces = mlflow.search_traces(
filter_string="tags.`mlflow.traceName` = 'process_request'"
)

Search by timestamp

Python
import time
from datetime import datetime

# Current time in milliseconds
current_time_ms = int(time.time() * 1000)

# Last 5 minutes
five_minutes_ago = current_time_ms - (5 * 60 * 1000)
traces = mlflow.search_traces(
filter_string=f"attributes.timestamp_ms > {five_minutes_ago}"
)

# Specific date range
start_date = int(datetime(2024, 1, 1).timestamp() * 1000)
end_date = int(datetime(2024, 1, 31).timestamp() * 1000)
traces = mlflow.search_traces(
filter_string=f"attributes.timestamp_ms > {start_date} AND attributes.timestamp_ms < {end_date}"
)

# Using timestamp aliases
traces = mlflow.search_traces(filter_string=f"attributes.timestamp > {five_minutes_ago}")

Search by execution time

Python
# Find slow traces (>5 seconds)
traces = mlflow.search_traces(filter_string="attributes.execution_time_ms > 5000")

# Find fast traces (<100ms)
traces = mlflow.search_traces(filter_string="attributes.execution_time_ms < 100")

# Performance range
traces = mlflow.search_traces(
filter_string="attributes.execution_time_ms > 100 AND attributes.execution_time_ms < 1000"
)

# Using execution time aliases
traces = mlflow.search_traces(filter_string="attributes.latency > 1000")

Search by tags

Python
# Custom tags (set via mlflow.update_current_trace)
traces = mlflow.search_traces(filter_string="tags.customer_id = 'C001'")
traces = mlflow.search_traces(filter_string="tags.environment = 'production'")
traces = mlflow.search_traces(filter_string="tags.version = 'v2.1.0'")

# MLflow system tags (require backticks)
traces = mlflow.search_traces(
filter_string="tags.`mlflow.traceName` = 'process_chat_request'"
)
traces = mlflow.search_traces(
filter_string="tags.`mlflow.artifactLocation` != ''"
)

Search by metadata

Python
# Search by response content (exact match)
traces = mlflow.search_traces(
filter_string="metadata.`mlflow.traceOutputs` = 'exact response text'"
)

# Find traces with any output
traces = mlflow.search_traces(
filter_string="metadata.`mlflow.traceOutputs` != ''"
)

# Search by user
traces = mlflow.search_traces(
filter_string="metadata.`mlflow.user` = 'alice@company.com'"
)

# Search by source file
traces = mlflow.search_traces(
filter_string="metadata.`mlflow.source.name` = 'app.py'"
)

# Search by git information
traces = mlflow.search_traces(
filter_string="metadata.`mlflow.source.git.branch` = 'main'"
)

Complex filters with AND

Python
# Recent successful production traces
current_time_ms = int(time.time() * 1000)
one_hour_ago = current_time_ms - (60 * 60 * 1000)

traces = mlflow.search_traces(
filter_string=f"attributes.status = 'OK' AND "
f"attributes.timestamp_ms > {one_hour_ago} AND "
f"tags.environment = 'production'"
)

# Fast traces from specific user
traces = mlflow.search_traces(
filter_string="attributes.execution_time_ms < 100 AND "
"metadata.`mlflow.user` = 'alice@company.com'"
)

# Specific function with performance threshold
traces = mlflow.search_traces(
filter_string="tags.`mlflow.traceName` = 'process_payment' AND "
"attributes.execution_time_ms > 1000"
)

Ordering results

Python
# Most recent first
traces = mlflow.search_traces(
filter_string="attributes.status = 'OK'",
order_by=["attributes.timestamp_ms DESC"]
)

# Fastest first
traces = mlflow.search_traces(
order_by=["attributes.execution_time_ms ASC"]
)

# Multiple sort criteria
traces = mlflow.search_traces(
filter_string="attributes.status = 'OK'",
order_by=[
"attributes.timestamp_ms DESC",
"attributes.execution_time_ms ASC"
]
)

DataFrame operations

The DataFrame returned by mlflow.search_traces contains these columns:

Python
traces_df = mlflow.search_traces()

# Default columns
print(traces_df.columns)
# ['request_id', 'trace', 'timestamp_ms', 'status', 'execution_time_ms',
# 'request', 'response', 'request_metadata', 'spans', 'tags']

Extract span fields

Python
# Extract specific span fields into DataFrame columns
traces = mlflow.search_traces(
extract_fields=[
"process_request.inputs.customer_id",
"process_request.outputs",
"validate_input.inputs",
"generate_response.outputs.message"
]
)

# Use extracted fields for evaluation dataset
eval_data = traces.rename(columns={
"process_request.inputs.customer_id": "customer",
"generate_response.outputs.message": "ground_truth"
})

Building dynamic queries

Python
def build_trace_filter(status=None, user=None, min_duration=None,
max_duration=None, tags=None, after_timestamp=None):
"""Build dynamic filter string from parameters"""
conditions = []

if status:
conditions.append(f"attributes.status = '{status}'")

if user:
conditions.append(f"metadata.`mlflow.user` = '{user}'")

if min_duration:
conditions.append(f"attributes.execution_time_ms > {min_duration}")

if max_duration:
conditions.append(f"attributes.execution_time_ms < {max_duration}")

if after_timestamp:
conditions.append(f"attributes.timestamp_ms > {after_timestamp}")

if tags:
for key, value in tags.items():
# Handle dotted tag names
if '.' in key:
conditions.append(f"tags.`{key}` = '{value}'")
else:
conditions.append(f"tags.{key} = '{value}'")

return " AND ".join(conditions) if conditions else None

# Usage
filter_string = build_trace_filter(
status="OK",
user="alice@company.com",
min_duration=100,
tags={"environment": "production", "mlflow.traceName": "process_order"}
)

traces = mlflow.search_traces(filter_string=filter_string)

Practical examples reference

Error monitoring

Monitor and analyze errors in your production environment:

Python
import mlflow
import time
import pandas as pd

def monitor_errors(experiment_name: str, hours: int = 1):
"""Monitor errors in the last N hours."""

# Calculate time window
current_time_ms = int(time.time() * 1000)
cutoff_time_ms = current_time_ms - (hours * 60 * 60 * 1000)

# Find all errors
failed_traces = mlflow.search_traces(
filter_string=f"attributes.status = 'ERROR' AND "
f"attributes.timestamp_ms > {cutoff_time_ms}",
order_by=["attributes.timestamp_ms DESC"]
)

if len(failed_traces) == 0:
print(f"No errors found in the last {hours} hour(s)")
return

# Analyze error patterns
print(f"Found {len(failed_traces)} errors in the last {hours} hour(s)\n")

# Group by function name
error_by_function = failed_traces.groupby('tags.mlflow.traceName').size()
print("Errors by function:")
print(error_by_function.to_string())

# Show recent error samples
print("\nRecent error samples:")
for _, trace in failed_traces.head(5).iterrows():
print(f"- {trace['request_preview'][:60]}...")
print(f" Function: {trace.get('tags.mlflow.traceName', 'unknown')}")
print(f" Time: {pd.to_datetime(trace['timestamp_ms'], unit='ms')}")
print()

return failed_traces

Performance profiling

Analyze performance characteristics and identify bottlenecks:

Python
def profile_performance(function_name: str = None, percentiles: list = [50, 95, 99]):
"""Profile performance metrics for traces."""

# Build filter
filter_parts = []
if function_name:
filter_parts.append(f"tags.`mlflow.traceName` = '{function_name}'")

filter_string = " AND ".join(filter_parts) if filter_parts else None

# Get traces
traces = mlflow.search_traces(filter_string=filter_string)

if len(traces) == 0:
print("No traces found")
return

# Calculate percentiles
perf_stats = traces['execution_time_ms'].describe(percentiles=[p/100 for p in percentiles])

print(f"Performance Analysis ({len(traces)} traces)")
print("=" * 40)
for p in percentiles:
print(f"P{p}: {perf_stats[f'{p}%']:.1f}ms")
print(f"Mean: {perf_stats['mean']:.1f}ms")
print(f"Max: {perf_stats['max']:.1f}ms")

# Find outliers (>P99)
if 99 in percentiles:
p99_threshold = perf_stats['99%']
outliers = traces[traces['execution_time_ms'] > p99_threshold]

if len(outliers) > 0:
print(f"\nOutliers (>{p99_threshold:.0f}ms): {len(outliers)} traces")
for _, trace in outliers.head(3).iterrows():
print(f"- {trace['execution_time_ms']:.0f}ms: {trace['request_preview'][:50]}...")

return traces

User activity analysis

Track and analyze user behavior patterns:

Python
def analyze_user_activity(user_id: str, days: int = 7):
"""Analyze activity patterns for a specific user."""

cutoff_ms = int((time.time() - days * 86400) * 1000)

traces = mlflow.search_traces(
filter_string=f"metadata.`mlflow.user` = '{user_id}' AND "
f"attributes.timestamp_ms > {cutoff_ms}",
order_by=["attributes.timestamp_ms DESC"]
)

if len(traces) == 0:
print(f"No activity found for user {user_id}")
return

print(f"User {user_id} Activity Report ({days} days)")
print("=" * 50)
print(f"Total requests: {len(traces)}")

# Daily activity
traces['date'] = pd.to_datetime(traces['timestamp_ms'], unit='ms').dt.date
daily_activity = traces.groupby('date').size()
print(f"\nDaily activity:")
print(daily_activity.to_string())

# Query categories
if 'tags.query_category' in traces.columns:
categories = traces['tags.query_category'].value_counts()
print(f"\nQuery categories:")
print(categories.to_string())

# Performance stats
print(f"\nPerformance:")
print(f"Average response time: {traces['execution_time_ms'].mean():.1f}ms")
print(f"Error rate: {(traces['status'] == 'ERROR').mean() * 100:.1f}%")

return traces

Best practices

1. Design a consistent tagging strategy

Create a tagging taxonomy for your organization:

Python
class TraceTagging:
"""Standardized tagging strategy for traces."""

# Required tags for all traces
REQUIRED_TAGS = ["environment", "version", "service_name"]

# Category mappings
CATEGORIES = {
"user_management": ["login", "logout", "profile_update"],
"content_generation": ["summarize", "translate", "rewrite"],
"data_retrieval": ["search", "fetch", "query"]
}

@staticmethod
def tag_trace(operation: str, **kwargs):
"""Apply standardized tags to current trace."""
tags = {
"operation": operation,
"timestamp": datetime.now().isoformat(),
"service_name": "genai-platform"
}

# Add category based on operation
for category, operations in TraceTagging.CATEGORIES.items():
if operation in operations:
tags["category"] = category
break

# Add custom tags
tags.update(kwargs)

# Validate required tags
for required in TraceTagging.REQUIRED_TAGS:
if required not in tags:
tags[required] = "unknown"

mlflow.update_current_trace(tags=tags)
return tags

2. Build reusable search utilities

Python
class TraceSearcher:
"""Reusable trace search utilities."""

def __init__(self, experiment_ids: list = None):
self.experiment_ids = experiment_ids

def recent_errors(self, hours: int = 1) -> pd.DataFrame:
"""Get recent error traces."""
cutoff = int((time.time() - hours * 3600) * 1000)
return mlflow.search_traces(
experiment_ids=self.experiment_ids,
filter_string=f"attributes.status = 'ERROR' AND "
f"attributes.timestamp_ms > {cutoff}",
order_by=["attributes.timestamp_ms DESC"]
)

def slow_operations(self, threshold_ms: int = 5000) -> pd.DataFrame:
"""Find operations slower than threshold."""
return mlflow.search_traces(
experiment_ids=self.experiment_ids,
filter_string=f"attributes.execution_time_ms > {threshold_ms}",
order_by=["attributes.execution_time_ms DESC"]
)

def by_user(self, user_id: str, days: int = 7) -> pd.DataFrame:
"""Get traces for a specific user."""
cutoff = int((time.time() - days * 86400) * 1000)
return mlflow.search_traces(
experiment_ids=self.experiment_ids,
filter_string=f"tags.user_id = '{user_id}' AND "
f"attributes.timestamp_ms > {cutoff}",
order_by=["attributes.timestamp_ms DESC"]
)

def by_category(self, category: str, status: str = None) -> pd.DataFrame:
"""Get traces by category with optional status filter."""
filters = [f"tags.category = '{category}'"]
if status:
filters.append(f"attributes.status = '{status}'")

return mlflow.search_traces(
experiment_ids=self.experiment_ids,
filter_string=" AND ".join(filters)
)

def performance_report(self, function_name: str = None) -> dict:
"""Generate performance report."""
filter_parts = []
if function_name:
filter_parts.append(f"tags.`mlflow.traceName` = '{function_name}'")

filter_string = " AND ".join(filter_parts) if filter_parts else None
traces = mlflow.search_traces(
experiment_ids=self.experiment_ids,
filter_string=filter_string
)

if len(traces) == 0:
return {"error": "No traces found"}

return {
"total_traces": len(traces),
"error_rate": (traces['status'] == 'ERROR').mean(),
"avg_duration_ms": traces['execution_time_ms'].mean(),
"p50_duration_ms": traces['execution_time_ms'].quantile(0.5),
"p95_duration_ms": traces['execution_time_ms'].quantile(0.95),
"p99_duration_ms": traces['execution_time_ms'].quantile(0.99)
}

# Usage example
searcher = TraceSearcher()
errors = searcher.recent_errors(hours=24)
slow_ops = searcher.slow_operations(threshold_ms=10000)
user_traces = searcher.by_user("U001", days=30)
report = searcher.performance_report("process_request")

Next Steps