プログラムでトレースを検索する
mlflow.search_traces()を使用して、プログラムでトレースを検索および分析します。
クイックリファレンス
Python
# Search by status
mlflow.search_traces("attributes.status = 'OK'")
mlflow.search_traces("attributes.status = 'ERROR'")
# Search by time (milliseconds since epoch)
mlflow.search_traces("attributes.timestamp_ms > 1749006880539")
mlflow.search_traces("attributes.execution_time_ms > 5000")
# Search by tags
mlflow.search_traces("tags.environment = 'production'")
mlflow.search_traces("tags.`mlflow.traceName` = 'my_function'")
# Search by metadata
mlflow.search_traces("metadata.`mlflow.user` = 'alice@company.com'")
# Combined filters (AND only)
mlflow.search_traces(
"attributes.status = 'OK' AND tags.environment = 'production'"
)
主なルール
- 常に接頭辞を使用します:
attributes.、tags.、またはmetadata. - タグ名または属性名にドットが含まれている場合はバッククォートを使用します**:
tags.`mlflow.traceName` - 一重引用符のみ:
'value'は"value" - 時間のミリ秒:
1749006880539日付ではありません) - ANDのみ: ORはサポートされません
Databricks固有の問題
次の点はDatabricksに固有です。
sql_warehouse_id: オプションの Databricks SQL ウェアハウス ID。指定すると、大規模なトレース データセットのパフォーマンスを向上させるために、指定されたSQLウェアハウスを使用してトレース クエリが実行されます。model_id: Databricks Model Registryからのオプションのモデル ID。 指定すると、指定された登録済みモデルに関連付けられたトレースを検索します。
SQLウェアハウスの統合
大規模なトレース データセットのパフォーマンスを向上させるために、Databricks SQL ウェアハウスを使用してトレース クエリを実行します。
Python
# Use SQL warehouse for better performance
traces = mlflow.search_traces(
filter_string="attributes.status = 'OK'",
sql_warehouse_id="your-warehouse-id"
)
Model Registry統合
Databricks に登録されたモデルに関連付けられたトレースを検索します。
Python
# Find traces for a specific registered model
model_traces = mlflow.search_traces(
model_id="my-model-123",
filter_string="attributes.status = 'OK'"
)
# Analyze model performance from traces
print(f"Found {len(model_traces)} successful traces for model")
print(f"Average latency: {model_traces['execution_time_ms'].mean():.2f}ms")
検索例
ステータスから探す
Python
# Find successful, failed, or in-progress traces
traces = mlflow.search_traces(filter_string="attributes.status = 'OK'")
# Exclude errors
traces = mlflow.search_traces(filter_string="attributes.status != 'ERROR'")
タイムスタンプで検索
Python
import time
from datetime import datetime
# Recent traces (last 5 minutes)
current_time_ms = int(time.time() * 1000)
five_minutes_ago = current_time_ms - (5 * 60 * 1000)
traces = mlflow.search_traces(
filter_string=f"attributes.timestamp_ms > {five_minutes_ago}"
)
# Date range
start_date = int(datetime(2024, 1, 1).timestamp() * 1000)
end_date = int(datetime(2024, 1, 31).timestamp() * 1000)
traces = mlflow.search_traces(
filter_string=f"attributes.timestamp_ms > {start_date} AND attributes.timestamp_ms < {end_date}"
)
# Can also use 'timestamp' alias instead of 'timestamp_ms'
traces = mlflow.search_traces(filter_string=f"attributes.timestamp > {five_minutes_ago}")
実行時間で検索
Python
# Find slow traces
traces = mlflow.search_traces(filter_string="attributes.execution_time_ms > 5000")
# Performance range
traces = mlflow.search_traces(
filter_string="attributes.execution_time_ms > 100 AND attributes.execution_time_ms < 1000"
)
# Can also use 'latency' alias instead of 'execution_time_ms'
traces = mlflow.search_traces(filter_string="attributes.latency > 1000")
タグで検索
Python
# Custom tags (set via mlflow.update_current_trace)
traces = mlflow.search_traces(filter_string="tags.customer_id = 'C001'")
# System tags (require backticks for dotted names)
traces = mlflow.search_traces(
filter_string="tags.`mlflow.traceName` = 'process_chat_request'"
)
traces = mlflow.search_traces(
filter_string="tags.`mlflow.artifactLocation` != ''"
)
複雑なフィルター
Python
# Recent successful production traces
current_time_ms = int(time.time() * 1000)
one_hour_ago = current_time_ms - (60 * 60 * 1000)
traces = mlflow.search_traces(
filter_string=f"attributes.status = 'OK' AND "
f"attributes.timestamp_ms > {one_hour_ago} AND "
f"tags.environment = 'production'"
)
# Fast traces from specific user
traces = mlflow.search_traces(
filter_string="attributes.execution_time_ms < 100 AND "
"metadata.`mlflow.user` = 'alice@company.com'"
)
# Specific function with performance threshold
traces = mlflow.search_traces(
filter_string="tags.`mlflow.traceName` = 'process_payment' AND "
"attributes.execution_time_ms > 1000"
)
コンテキストメタデータによるクエリ
これらの例は、ユーザー ID、セッション、環境、機能フラグなどのコンテキスト メタデータを使用して複数のトレースを検索する方法を示しています。トレースにコンテキスト メタデータを追加する方法の詳細については、 「トレースにコンテキストを追加する」を参照してください。
ユーザーの行動を分析する
Python
from mlflow.client import MlflowClient
client = MlflowClient()
def analyze_user_behavior(user_id: str, experiment_id: str):
"""Analyze a specific user's interaction patterns."""
# Search for all traces from a specific user
user_traces = client.search_traces(
experiment_ids=[experiment_id],
filter_string=f"metadata.`mlflow.trace.user` = '{user_id}'",
max_results=1000
)
# Calculate key metrics
total_interactions = len(user_traces)
unique_sessions = len(set(t.info.metadata.get("mlflow.trace.session", "") for t in user_traces))
avg_response_time = sum(t.info.execution_time_ms for t in user_traces) / total_interactions
return {
"total_interactions": total_interactions,
"unique_sessions": unique_sessions,
"avg_response_time": avg_response_time
}
セッションフローを分析する
Python
def analyze_session_flow(session_id: str, experiment_id: str):
"""Analyze conversation flow within a session."""
# Get all traces from a session, ordered chronologically
session_traces = client.search_traces(
experiment_ids=[experiment_id],
filter_string=f"metadata.`mlflow.trace.session` = '{session_id}'",
order_by=["timestamp ASC"]
)
# Build a timeline of the conversation
conversation_turns = []
for i, trace in enumerate(session_traces):
conversation_turns.append({
"turn": i + 1,
"timestamp": trace.info.timestamp,
"duration_ms": trace.info.execution_time_ms,
"status": trace.info.status
})
return conversation_turns
バージョン間でエラー率を比較する
Python
def compare_version_error_rates(experiment_id: str, versions: list):
"""Compare error rates across different app versions in production."""
error_rates = {}
for version in versions:
traces = client.search_traces(
experiment_ids=[experiment_id],
filter_string=f"metadata.`mlflow.source.type` = 'production' AND metadata.app_version = '{version}'"
)
if not traces:
error_rates[version] = None # Or 0 if no traces means no errors
continue
error_count = sum(1 for t in traces if t.info.status == "ERROR")
error_rates[version] = (error_count / len(traces)) * 100
return error_rates
# version_errors = compare_version_error_rates("your_exp_id", ["1.0.0", "1.1.0"])
# print(version_errors)
機能フラグのパフォーマンスを分析する
Python
def analyze_feature_flag_performance(experiment_id: str, flag_name: str):
"""Analyze performance differences between feature flag states."""
control_latency = []
treatment_latency = []
control_traces = client.search_traces(
experiment_ids=[experiment_id],
filter_string=f"metadata.feature_flag_{flag_name} = 'false'",
)
for t in control_traces:
control_latency.append(t.info.execution_time_ms)
treatment_traces = client.search_traces(
experiment_ids=[experiment_id],
filter_string=f"metadata.feature_flag_{flag_name} = 'true'",
)
for t in treatment_traces:
treatment_latency.append(t.info.execution_time_ms)
avg_control_latency = sum(control_latency) / len(control_latency) if control_latency else 0
avg_treatment_latency = sum(treatment_latency) / len(treatment_latency) if treatment_latency else 0
return {
f"avg_latency_{flag_name}_off": avg_control_latency,
f"avg_latency_{flag_name}_on": avg_treatment_latency
}
# perf_metrics = analyze_feature_flag_performance("your_exp_id", "new_retriever")
# print(perf_metrics)
データフレーム の操作
mlflow.search_traces によって返される DataFrame には、次の列が含まれます。
Python
traces_df = mlflow.search_traces()
# Default columns
print(traces_df.columns)
# ['request_id', 'trace', 'timestamp_ms', 'status', 'execution_time_ms',
# 'request', 'response', 'request_metadata', 'spans', 'tags']
スパンフィールドの抽出
Python
# Extract specific span fields into DataFrame columns
traces = mlflow.search_traces(
extract_fields=[
"process_request.inputs.customer_id",
"process_request.outputs",
"validate_input.inputs",
"generate_response.outputs.message"
]
)
# Use extracted fields for evaluation dataset
eval_data = traces.rename(columns={
"process_request.inputs.customer_id": "customer",
"generate_response.outputs.message": "ground_truth"
})
次のステップ
- 評価データセットの構築 - クエリされたトレースをテストデータセットに変換します