Pesquise e analise traços
Saiba como criar rastros pesquisáveis, consultá-los de forma eficaz e analisar os resultados para obter percepções sobre o comportamento do seu aplicativo GenAI.
Referência rápida
Sintaxe de pesquisa essencial
# Search by status
mlflow.search_traces("attributes.status = 'OK'")
mlflow.search_traces("attributes.status = 'ERROR'")
# Search by time (milliseconds since epoch)
mlflow.search_traces("attributes.timestamp_ms > 1749006880539")
mlflow.search_traces("attributes.execution_time_ms > 5000")
# Search by tags
mlflow.search_traces("tags.environment = 'production'")
mlflow.search_traces("tags.`mlflow.traceName` = 'my_function'")
# Search by metadata
mlflow.search_traces("metadata.`mlflow.user` = 'alice@company.com'")
# Combined filters (AND only)
mlflow.search_traces(
    "attributes.status = 'OK' AND tags.environment = 'production'"
)
regras fundamentais
- Sempre use prefixos : 
attributes.,tags.oumetadata. - Aparece novamente se os nomes das tags ou dos atributos tiverem pontos : ``tags.`mlflow.traceName```
 - Somente aspas simples : 
'value'não"value" - Milissegundos por tempo : 
1749006880539não datas - E somente : Sem suporte de OR
 
Campos pesquisáveis
campo  | Caminho  | Operadores  | 
|---|---|---|
Status  | 
  | 
  | 
Carimbo de data/hora  | 
  | 
  | 
Duração  | 
  | 
  | 
Tags  | 
  | 
  | 
Metadados  | 
  | 
  | 
Exemplo de ponta a ponta
Pré-requisitos
- 
Instale o site MLflow e o pacote necessário
Bashpip install --upgrade "mlflow[databricks]>=3.1.0" openai "databricks-connect>=16.1" - 
Crie um experimento MLflow seguindo o guia de início rápido para configurar seu ambiente.
 
Crie exemplos de rastreamentos para demonstrar a funcionalidade de pesquisa:
import time
import mlflow
# Define methods to be traced
@mlflow.trace()
def morning_greeting(name: str):
    time.sleep(1)
    # Add tag and metadata for better categorization
    mlflow.update_current_trace(
        tags={"person": name},
    )
    return f"Good morning {name}."
@mlflow.trace()
def evening_greeting(name: str):
    time.sleep(1)
    # Add tag with different values for comparison
    mlflow.update_current_trace(
        tags={"person": name},
    )
    return f"Good evening {name}."
@mlflow.trace()
def goodbye():
    # Add tag even for functions that might fail
    mlflow.update_current_trace(
        tags={"greeting_type": "goodbye"},
    )
    raise Exception("Cannot say goodbye")
# Execute the methods
morning_greeting("Tom")
# Get the timestamp in milliseconds
morning_time = int(time.time() * 1000)
evening_greeting("Mary")
# Execute goodbye, catching the exception
try:
    goodbye()
except Exception as e:
    print(f"Caught expected exception: {e}")
    pass
O código acima cria os seguintes traços:

Pesquise esses rastreamentos usando os prefixos de campo corretos:
# Search successful traces
traces = mlflow.search_traces(
    filter_string="attributes.status = 'OK'",
)
print(traces)
# 2 results
# Search failed traces
traces = mlflow.search_traces(
    filter_string="attributes.status = 'ERROR'",
)
print(traces)
# 1 result
# Search all traces in experiment
traces = mlflow.search_traces()
print(traces)
# 3 results
# Search by single tag
traces = mlflow.search_traces(filter_string="tags.person = 'Tom'")
print(traces)
# 1 result
# Complex search combining tags and status
traces = mlflow.search_traces(
    filter_string="tags.person = 'Tom' AND attributes.status = 'OK'"
)
print(traces)
# 1 result
# Search by timestamp
traces = mlflow.search_traces(filter_string=f"attributes.timestamp > {morning_time}")
print(traces)
# 1 result
Referência da API
Pesquisar API
Use mlflow.search_traces() para pesquisar e analisar traços em seus experimentos:
mlflow.search_traces(
    experiment_ids: Optional[List[str]] = None,          # Uses active experiment if not specified
    filter_string: Optional[str] = None,
    max_results: Optional[int] = None,
    order_by: Optional[List[str]] = None,
    extract_fields: Optional[List[str]] = None,          # DataFrame column extraction (pandas only)
    run_id: Optional[str] = None,                        # Filter traces by run ID
    return_type: Optional[Literal["pandas", "list"]] = None,  # Return type (default: pandas if available)
    model_id: Optional[str] = None,                      # Search traces by model ID
    sql_warehouse_id: Optional[str] = None               # Databricks SQL warehouse ID
) -> Union[pandas.DataFrame, List[Trace]]
Detalhes do parâmetro :
Parâmetro  | Descrição  | 
|---|---|
  | Lista de IDs de experimentos para definir o escopo da pesquisa. Se não for fornecida, a pesquisa será realizada em todo o experimento ativo atual.  | 
  | Uma cadeia de filtros de pesquisa.  | 
  | Número máximo de traços desejados. Se Nenhum, todos os traços correspondentes às expressões de pesquisa serão retornados.  | 
  | Lista de cláusulas order_by.  | 
  | Especifique os campos a serem extraídos dos rastreamentos usando o formato   | 
  | Um ID de execução para o escopo da pesquisa. Quando um rastreamento for criado em uma execução ativa, ele será associado à execução e o senhor poderá filtrar pelo ID da execução para recuperar o rastreamento. Veja o exemplo abaixo para saber como filtrar os rastros por ID de execução.  | 
  | O tipo do valor de retorno. Os seguintes tipos de devolução são suportados. Se a Pandas biblioteca estiver instalada, o tipo de retorno default será "Pandas" . Caso contrário, o tipo de retorno default é "list": -   | 
  | Se especificado, pesquise traços associados ao ID de modelo fornecido.  | 
O MLflow também fornece o MlflowClient.search_traces(). No entanto, recomendamos utilizar mlflow.search_traces() - com exceção do suporte à paginação, ele oferece um conjunto superior de funcionalidades com recursos padrão e adicionais mais convenientes, como saída DataFrame e e extração de campos.
Referência de campos pesquisáveis
Para obter uma referência completa sobre esses campos, consulte o modelo de dados de rastreamento.
Tipo de campo  | Caminho de pesquisa  | Operadores  | Valores  | Notas  | 
|---|---|---|---|---|
Metadados  | 
  | 
  | Apenas igualdade de strings  | |
Tags  | 
  | 
  | Apenas igualdade de strings  | |
Status  | 
  | 
  | 
  | Apenas igualdade de strings  | 
Nome  | 
  | 
  | Nome do traço  | Apenas igualdade de strings  | 
Carimbo de data/hora  | 
  | 
  | Tempo de criação (ms desde a época)  | Comparações numéricas  | 
Tempo de execução  | 
  | 
  | duração em milissegundos  | Comparações numéricas  | 
Detalhes dos metadados
Os seguintes campos de metadados estão disponíveis para filtragem:
metadata.mlflow.traceInputs: Solicitar conteúdometadata.mlflow.traceOutputs: Conteúdo da respostametadata.mlflow.sourceRun: ID da execução de origemmetadata.mlflow.modelId: ID do modelometadata.mlflow.trace.sizeBytes: Tamanho do traço em bytesmetadata.mlflow.trace.tokenUsage: Informações de uso de tokens agregados (JSON strings)metadata.mlflow.trace.user: ID de usuário/nome da solicitação do aplicativometadata.mlflow.trace.session: ID da sessão da solicitação do aplicativo
Detalhes das tags
Além das tags definidas pelo usuário, as seguintes tags definidas pelo sistema estão disponíveis:
mlflow.traceName: O nome do rastreamentoeval.requestId: ID da solicitação de avaliação, definido pormlflow.genai.evaluate()
Regras de sintaxe de filtro
- Prefixos de tabela obrigatórios : sempre use 
attributes.,tags.oumetadata. - Ataques invertidos para pontos: campos com pontos precisam de acentos invertidos: ``tags.`mlflow.traceName```
 - Somente aspas simples : os valores de strings devem usar aspas simples: 
'value' - Diferencia maiúsculas de minúsculas : todos os nomes e valores de campo fazem distinção entre maiúsculas
 - E somente : operadores OR não são suportados
 
Ordenar por sintaxe
# Single field ordering
order_by=["attributes.timestamp_ms DESC"]
order_by=["attributes.execution_time_ms ASC"]
# Multiple field ordering (applied in sequence)
order_by=[
    "attributes.timestamp_ms DESC",
    "attributes.execution_time_ms ASC"
]
# Supported fields for ordering
# - attributes.timestamp_ms (and aliases)
# - attributes.execution_time_ms (and aliases)
# - attributes.status
# - attributes.name
Padrões comuns
# Status filtering
"attributes.status = 'OK'"
"attributes.status = 'ERROR'"
# Time-based queries
"attributes.timestamp_ms > 1749006880539"
"attributes.execution_time_ms > 5000"
# Tag searches
"tags.user_id = 'U001'"
"tags.`mlflow.traceName` = 'my_function'"
# Metadata queries
"metadata.`mlflow.user` = 'alice@company.com'"
"metadata.`mlflow.traceOutputs` != ''"
# Combined filters
"attributes.status = 'OK' AND tags.environment = 'production'"
"attributes.timestamp_ms > 1749006880539 AND attributes.execution_time_ms > 1000"
Armadilhas comuns
❌ Incorreto  | ✅ Correto  | Problema  | 
|---|---|---|
  | 
  | Prefixo ausente  | 
  | 
  | Prefixo e acentos incorretos ausentes  | 
  | 
  | Use milissegundos, não strings  | 
  | 
  | Use aspas simples  | 
  | Use consultas separadas  | OU não suportado  | 
Exemplos de pesquisa detalhada
Pesquisar por ID de execução
# Find all traces associated with a specific MLflow run
with mlflow.start_run() as run:
    # Your traced code here
    traced_result = my_traced_function()
# Search for traces from this run
run_traces = mlflow.search_traces(
    run_id=run.info.run_id,
    return_type="list"  # Get list of Trace objects
)
Controle o tipo de retorno
# Get results as pandas DataFrame (default if pandas is installed)
traces_df = mlflow.search_traces(
    filter_string="attributes.status = 'OK'",
    return_type="pandas"
)
# Get results as list of Trace objects
traces_list = mlflow.search_traces(
    filter_string="attributes.status = 'OK'",
    return_type="list"
)
# Access trace details from list
for trace in traces_list:
    print(f"Trace ID: {trace.info.trace_id}")
    print(f"Status: {trace.info.state}")
    print(f"Duration: {trace.info.execution_duration}")
Pesquisar por ID de modelo
# Find traces associated with a specific MLflow model
model_traces = mlflow.search_traces(
    model_id="my-model-123",
    filter_string="attributes.status = 'OK'"
)
# Analyze model performance
print(f"Found {len(model_traces)} successful traces for model")
print(f"Average latency: {model_traces['execution_time_ms'].mean():.2f}ms")
Pesquisar por status
# Find successful traces
traces = mlflow.search_traces(filter_string="attributes.status = 'OK'")
# Find failed traces
traces = mlflow.search_traces(filter_string="attributes.status = 'ERROR'")
# Find in-progress traces
traces = mlflow.search_traces(filter_string="attributes.status = 'IN_PROGRESS'")
# Exclude errors
traces = mlflow.search_traces(filter_string="attributes.status != 'ERROR'")
Pesquisar por nome de rastreamento
# Find traces with specific name (rarely used - legacy field)
traces = mlflow.search_traces(filter_string="attributes.name = 'foo'")
# Find traces excluding a specific name
traces = mlflow.search_traces(filter_string="attributes.name != 'test_trace'")
# Note: Most users should use tags.`mlflow.traceName` instead
traces = mlflow.search_traces(
    filter_string="tags.`mlflow.traceName` = 'process_request'"
)
Pesquisar por carimbo de data/hora
import time
from datetime import datetime
# Current time in milliseconds
current_time_ms = int(time.time() * 1000)
# Last 5 minutes
five_minutes_ago = current_time_ms - (5 * 60 * 1000)
traces = mlflow.search_traces(
    filter_string=f"attributes.timestamp_ms > {five_minutes_ago}"
)
# Specific date range
start_date = int(datetime(2024, 1, 1).timestamp() * 1000)
end_date = int(datetime(2024, 1, 31).timestamp() * 1000)
traces = mlflow.search_traces(
    filter_string=f"attributes.timestamp_ms > {start_date} AND attributes.timestamp_ms < {end_date}"
)
# Using timestamp aliases
traces = mlflow.search_traces(filter_string=f"attributes.timestamp > {five_minutes_ago}")
Pesquisar por tempo de execução
# Find slow traces (>5 seconds)
traces = mlflow.search_traces(filter_string="attributes.execution_time_ms > 5000")
# Find fast traces (<100ms)
traces = mlflow.search_traces(filter_string="attributes.execution_time_ms < 100")
# Performance range
traces = mlflow.search_traces(
    filter_string="attributes.execution_time_ms > 100 AND attributes.execution_time_ms < 1000"
)
# Using execution time aliases
traces = mlflow.search_traces(filter_string="attributes.latency > 1000")
Pesquisar por tags
# Custom tags (set via mlflow.update_current_trace)
traces = mlflow.search_traces(filter_string="tags.customer_id = 'C001'")
traces = mlflow.search_traces(filter_string="tags.environment = 'production'")
traces = mlflow.search_traces(filter_string="tags.version = 'v2.1.0'")
# MLflow system tags (require backticks)
traces = mlflow.search_traces(
    filter_string="tags.`mlflow.traceName` = 'process_chat_request'"
)
traces = mlflow.search_traces(
    filter_string="tags.`mlflow.artifactLocation` != ''"
)
Pesquisar por metadados
# Search by response content (exact match)
traces = mlflow.search_traces(
    filter_string="metadata.`mlflow.traceOutputs` = 'exact response text'"
)
# Find traces with any output
traces = mlflow.search_traces(
    filter_string="metadata.`mlflow.traceOutputs` != ''"
)
# Search by user
traces = mlflow.search_traces(
    filter_string="metadata.`mlflow.user` = 'alice@company.com'"
)
# Search by source file
traces = mlflow.search_traces(
    filter_string="metadata.`mlflow.source.name` = 'app.py'"
)
# Search by git information
traces = mlflow.search_traces(
    filter_string="metadata.`mlflow.source.git.branch` = 'main'"
)
Filtros complexos com AND
# Recent successful production traces
current_time_ms = int(time.time() * 1000)
one_hour_ago = current_time_ms - (60 * 60 * 1000)
traces = mlflow.search_traces(
    filter_string=f"attributes.status = 'OK' AND "
                 f"attributes.timestamp_ms > {one_hour_ago} AND "
                 f"tags.environment = 'production'"
)
# Fast traces from specific user
traces = mlflow.search_traces(
    filter_string="attributes.execution_time_ms < 100 AND "
                 "metadata.`mlflow.user` = 'alice@company.com'"
)
# Specific function with performance threshold
traces = mlflow.search_traces(
    filter_string="tags.`mlflow.traceName` = 'process_payment' AND "
                 "attributes.execution_time_ms > 1000"
)
Resultados do pedido
# Most recent first
traces = mlflow.search_traces(
    filter_string="attributes.status = 'OK'",
    order_by=["attributes.timestamp_ms DESC"]
)
# Fastest first
traces = mlflow.search_traces(
    order_by=["attributes.execution_time_ms ASC"]
)
# Multiple sort criteria
traces = mlflow.search_traces(
    filter_string="attributes.status = 'OK'",
    order_by=[
        "attributes.timestamp_ms DESC",
        "attributes.execution_time_ms ASC"
    ]
)
DataFrame operações
O DataFrame retornado por mlflow.search_traces contém as seguintes colunas:
traces_df = mlflow.search_traces()
# Default columns
print(traces_df.columns)
# ['request_id', 'trace', 'timestamp_ms', 'status', 'execution_time_ms',
#  'request', 'response', 'request_metadata', 'spans', 'tags']
Extraia campos de extensão
# Extract specific span fields into DataFrame columns
traces = mlflow.search_traces(
    extract_fields=[
        "process_request.inputs.customer_id",
        "process_request.outputs",
        "validate_input.inputs",
        "generate_response.outputs.message"
    ]
)
# Use extracted fields for evaluation dataset
eval_data = traces.rename(columns={
    "process_request.inputs.customer_id": "customer",
    "generate_response.outputs.message": "ground_truth"
})
Criação de consultas dinâmicas
def build_trace_filter(status=None, user=None, min_duration=None,
                      max_duration=None, tags=None, after_timestamp=None):
    """Build dynamic filter string from parameters"""
    conditions = []
    if status:
        conditions.append(f"attributes.status = '{status}'")
    if user:
        conditions.append(f"metadata.`mlflow.user` = '{user}'")
    if min_duration:
        conditions.append(f"attributes.execution_time_ms > {min_duration}")
    if max_duration:
        conditions.append(f"attributes.execution_time_ms < {max_duration}")
    if after_timestamp:
        conditions.append(f"attributes.timestamp_ms > {after_timestamp}")
    if tags:
        for key, value in tags.items():
            # Handle dotted tag names
            if '.' in key:
                conditions.append(f"tags.`{key}` = '{value}'")
            else:
                conditions.append(f"tags.{key} = '{value}'")
    return " AND ".join(conditions) if conditions else None
# Usage
filter_string = build_trace_filter(
    status="OK",
    user="alice@company.com",
    min_duration=100,
    tags={"environment": "production", "mlflow.traceName": "process_order"}
)
traces = mlflow.search_traces(filter_string=filter_string)
Referência de exemplos práticos
Monitoramento de erros
Monitore e analise erros em seu ambiente de produção:
import mlflow
import time
import pandas as pd
def monitor_errors(experiment_name: str, hours: int = 1):
    """Monitor errors in the last N hours."""
    # Calculate time window
    current_time_ms = int(time.time() * 1000)
    cutoff_time_ms = current_time_ms - (hours * 60 * 60 * 1000)
    # Find all errors
    failed_traces = mlflow.search_traces(
        filter_string=f"attributes.status = 'ERROR' AND "
                     f"attributes.timestamp_ms > {cutoff_time_ms}",
        order_by=["attributes.timestamp_ms DESC"]
    )
    if len(failed_traces) == 0:
        print(f"No errors found in the last {hours} hour(s)")
        return
    # Analyze error patterns
    print(f"Found {len(failed_traces)} errors in the last {hours} hour(s)\n")
    # Group by function name
    error_by_function = failed_traces.groupby('tags.mlflow.traceName').size()
    print("Errors by function:")
    print(error_by_function.to_string())
    # Show recent error samples
    print("\nRecent error samples:")
    for _, trace in failed_traces.head(5).iterrows():
        print(f"- {trace['request_preview'][:60]}...")
        print(f"  Function: {trace.get('tags.mlflow.traceName', 'unknown')}")
        print(f"  Time: {pd.to_datetime(trace['timestamp_ms'], unit='ms')}")
        print()
    return failed_traces
Perfil de desempenho
Analisar as características de desempenho e identificar os gargalos:
def profile_performance(function_name: str = None, percentiles: list = [50, 95, 99]):
    """Profile performance metrics for traces."""
    # Build filter
    filter_parts = []
    if function_name:
        filter_parts.append(f"tags.`mlflow.traceName` = '{function_name}'")
    filter_string = " AND ".join(filter_parts) if filter_parts else None
    # Get traces
    traces = mlflow.search_traces(filter_string=filter_string)
    if len(traces) == 0:
        print("No traces found")
        return
    # Calculate percentiles
    perf_stats = traces['execution_time_ms'].describe(percentiles=[p/100 for p in percentiles])
    print(f"Performance Analysis ({len(traces)} traces)")
    print("=" * 40)
    for p in percentiles:
        print(f"P{p}: {perf_stats[f'{p}%']:.1f}ms")
    print(f"Mean: {perf_stats['mean']:.1f}ms")
    print(f"Max: {perf_stats['max']:.1f}ms")
    # Find outliers (>P99)
    if 99 in percentiles:
        p99_threshold = perf_stats['99%']
        outliers = traces[traces['execution_time_ms'] > p99_threshold]
        if len(outliers) > 0:
            print(f"\nOutliers (>{p99_threshold:.0f}ms): {len(outliers)} traces")
            for _, trace in outliers.head(3).iterrows():
                print(f"- {trace['execution_time_ms']:.0f}ms: {trace['request_preview'][:50]}...")
    return traces
Análise da atividade do usuário
Acompanhe e analise os padrões de comportamento do usuário:
def analyze_user_activity(user_id: str, days: int = 7):
    """Analyze activity patterns for a specific user."""
    cutoff_ms = int((time.time() - days * 86400) * 1000)
    traces = mlflow.search_traces(
        filter_string=f"metadata.`mlflow.user` = '{user_id}' AND "
                     f"attributes.timestamp_ms > {cutoff_ms}",
        order_by=["attributes.timestamp_ms DESC"]
    )
    if len(traces) == 0:
        print(f"No activity found for user {user_id}")
        return
    print(f"User {user_id} Activity Report ({days} days)")
    print("=" * 50)
    print(f"Total requests: {len(traces)}")
    # Daily activity
    traces['date'] = pd.to_datetime(traces['timestamp_ms'], unit='ms').dt.date
    daily_activity = traces.groupby('date').size()
    print(f"\nDaily activity:")
    print(daily_activity.to_string())
    # Query categories
    if 'tags.query_category' in traces.columns:
        categories = traces['tags.query_category'].value_counts()
        print(f"\nQuery categories:")
        print(categories.to_string())
    # Performance stats
    print(f"\nPerformance:")
    print(f"Average response time: {traces['execution_time_ms'].mean():.1f}ms")
    print(f"Error rate: {(traces['status'] == 'ERROR').mean() * 100:.1f}%")
    return traces
Melhores práticas
1. Crie uma estratégia de marcação consistente
Crie uma taxonomia de marcação para sua organização:
class TraceTagging:
    """Standardized tagging strategy for traces."""
    # Required tags for all traces
    REQUIRED_TAGS = ["environment", "version", "service_name"]
    # Category mappings
    CATEGORIES = {
        "user_management": ["login", "logout", "profile_update"],
        "content_generation": ["summarize", "translate", "rewrite"],
        "data_retrieval": ["search", "fetch", "query"]
    }
    @staticmethod
    def tag_trace(operation: str, **kwargs):
        """Apply standardized tags to current trace."""
        tags = {
            "operation": operation,
            "timestamp": datetime.now().isoformat(),
            "service_name": "genai-platform"
        }
        # Add category based on operation
        for category, operations in TraceTagging.CATEGORIES.items():
            if operation in operations:
                tags["category"] = category
                break
        # Add custom tags
        tags.update(kwargs)
        # Validate required tags
        for required in TraceTagging.REQUIRED_TAGS:
            if required not in tags:
                tags[required] = "unknown"
        mlflow.update_current_trace(tags=tags)
        return tags
2. Criar utilidades de pesquisa reutilizáveis
class TraceSearcher:
    """Reusable trace search utilities."""
    def __init__(self, experiment_ids: list = None):
        self.experiment_ids = experiment_ids
    def recent_errors(self, hours: int = 1) -> pd.DataFrame:
        """Get recent error traces."""
        cutoff = int((time.time() - hours * 3600) * 1000)
        return mlflow.search_traces(
            experiment_ids=self.experiment_ids,
            filter_string=f"attributes.status = 'ERROR' AND "
                         f"attributes.timestamp_ms > {cutoff}",
            order_by=["attributes.timestamp_ms DESC"]
        )
    def slow_operations(self, threshold_ms: int = 5000) -> pd.DataFrame:
        """Find operations slower than threshold."""
        return mlflow.search_traces(
            experiment_ids=self.experiment_ids,
            filter_string=f"attributes.execution_time_ms > {threshold_ms}",
            order_by=["attributes.execution_time_ms DESC"]
        )
    def by_user(self, user_id: str, days: int = 7) -> pd.DataFrame:
        """Get traces for a specific user."""
        cutoff = int((time.time() - days * 86400) * 1000)
        return mlflow.search_traces(
            experiment_ids=self.experiment_ids,
            filter_string=f"tags.user_id = '{user_id}' AND "
                         f"attributes.timestamp_ms > {cutoff}",
            order_by=["attributes.timestamp_ms DESC"]
        )
    def by_category(self, category: str, status: str = None) -> pd.DataFrame:
        """Get traces by category with optional status filter."""
        filters = [f"tags.category = '{category}'"]
        if status:
            filters.append(f"attributes.status = '{status}'")
        return mlflow.search_traces(
            experiment_ids=self.experiment_ids,
            filter_string=" AND ".join(filters)
        )
    def performance_report(self, function_name: str = None) -> dict:
        """Generate performance report."""
        filter_parts = []
        if function_name:
            filter_parts.append(f"tags.`mlflow.traceName` = '{function_name}'")
        filter_string = " AND ".join(filter_parts) if filter_parts else None
        traces = mlflow.search_traces(
            experiment_ids=self.experiment_ids,
            filter_string=filter_string
        )
        if len(traces) == 0:
            return {"error": "No traces found"}
        return {
            "total_traces": len(traces),
            "error_rate": (traces['status'] == 'ERROR').mean(),
            "avg_duration_ms": traces['execution_time_ms'].mean(),
            "p50_duration_ms": traces['execution_time_ms'].quantile(0.5),
            "p95_duration_ms": traces['execution_time_ms'].quantile(0.95),
            "p99_duration_ms": traces['execution_time_ms'].quantile(0.99)
        }
# Usage example
searcher = TraceSearcher()
errors = searcher.recent_errors(hours=24)
slow_ops = searcher.slow_operations(threshold_ms=10000)
user_traces = searcher.by_user("U001", days=30)
report = searcher.performance_report("process_request")
Próximas etapas
- Criar um conjunto de dados de avaliação - Converter os traços consultados em um conjunto de dados de teste
 - Excluir rastros - Remover rastros com base em critérios de pesquisa para gestão de dados