トレースの検索と分析
検索可能なトレースを作成し、効果的にクエリを実行し、結果を分析して生成AIアプリケーションの動作を把握する方法を学びます。
クイックリファレンス
基本的な検索構文
# Search by status
mlflow.search_traces("attributes.status = 'OK'")
mlflow.search_traces("attributes.status = 'ERROR'")
# Search by time (milliseconds since epoch)
mlflow.search_traces("attributes.timestamp_ms > 1749006880539")
mlflow.search_traces("attributes.execution_time_ms > 5000")
# Search by tags
mlflow.search_traces("tags.environment = 'production'")
mlflow.search_traces("tags.`mlflow.traceName` = 'my_function'")
# Search by metadata
mlflow.search_traces("metadata.`mlflow.user` = 'alice@company.com'")
# Combined filters (AND only)
mlflow.search_traces(
"attributes.status = 'OK' AND tags.environment = 'production'"
)
主なルール
- 常にプレフィックスを使用します :
attributes.
、tags.
、またはmetadata.
- タグ名または属性名にドットが含まれている場合のバッククォート : ``tags.`mlflow.traceName```
- 一重引用符のみ :
'value'
しません"value"
- 時間のミリ秒 : 日付ではなく
1749006880539
- AND のみ : OR はサポートされていません
検索可能なフィールド
フィールド | パス | 演算子 |
---|---|---|
ステータス |
|
|
Timestamp |
|
|
期間 |
|
|
Tags |
|
|
メタデータ |
|
|
エンドツーエンドの例
:::note 前提条件
-
MLflow と必要なパッケージをインストールする
Bashpip install --upgrade "mlflow[databricks]>=3.1.0" openai "databricks-connect>=16.1"
-
MLflow エクスペリメントを作成するには、環境のセットアップに関するクイックスタートに従ってください。:::
検索機能を示すサンプル トレースを作成します。
import time
import mlflow
# Define methods to be traced
@mlflow.trace()
def morning_greeting(name: str):
time.sleep(1)
# Add tag and metadata for better categorization
mlflow.update_current_trace(
tags={"person": name},
)
return f"Good morning {name}."
@mlflow.trace()
def evening_greeting(name: str):
time.sleep(1)
# Add tag with different values for comparison
mlflow.update_current_trace(
tags={"person": name},
)
return f"Good evening {name}."
@mlflow.trace()
def goodbye():
# Add tag even for functions that might fail
mlflow.update_current_trace(
tags={"greeting_type": "goodbye"},
)
raise Exception("Cannot say goodbye")
# Execute the methods
morning_greeting("Tom")
# Get the timestamp in milliseconds
morning_time = int(time.time() * 1000)
evening_greeting("Mary")
# Execute goodbye, catching the exception
try:
goodbye()
except Exception as e:
print(f"Caught expected exception: {e}")
pass
上記のコードでは、次のトレースが作成されます。
正しいフィールド接頭辞を使用して、これらのトレースを検索します。
# Search successful traces
traces = mlflow.search_traces(
filter_string="attributes.status = 'OK'",
)
print(traces)
# 2 results
# Search failed traces
traces = mlflow.search_traces(
filter_string="attributes.status = 'ERROR'",
)
print(traces)
# 1 result
# Search all traces in experiment
traces = mlflow.search_traces()
print(traces)
# 3 results
# Search by single tag
traces = mlflow.search_traces(filter_string="tags.person = 'Tom'")
print(traces)
# 1 result
# Complex search combining tags and status
traces = mlflow.search_traces(
filter_string="tags.person = 'Tom' AND attributes.status = 'OK'"
)
print(traces)
# 1 result
# Search by timestamp
traces = mlflow.search_traces(filter_string=f"attributes.timestamp > {morning_time}")
print(traces)
# 1 result
API リファレンス
検索API
mlflow.search_traces()
を使用して、エクスペリメント内のトレースを検索および分析します。
mlflow.search_traces(
experiment_ids: Optional[List[str]] = None, # Uses active experiment if not specified
filter_string: Optional[str] = None,
max_results: Optional[int] = None,
order_by: Optional[List[str]] = None,
extract_fields: Optional[List[str]] = None, # DataFrame column extraction (pandas only)
run_id: Optional[str] = None, # Filter traces by run ID
return_type: Optional[Literal["pandas", "list"]] = None, # Return type (default: pandas if available)
model_id: Optional[str] = None, # Search traces by model ID
sql_warehouse_id: Optional[str] = None # Databricks SQL warehouse ID
) -> Union[pandas.DataFrame, List[Trace]]
パラメーター の詳細:
パラメーター | 説明 |
---|---|
| 検索範囲を絞り込むエクスペリメント ID のリスト。 指定しない場合、検索は現在アクティブなエクスペリメント全体で実行されます。 |
| 検索フィルター文字列。 |
| 必要なトレースの最大数。[なし] の場合、検索式に一致するすべてのトレースが返されます。 |
| order_by句のリスト。 |
| トレースから抽出するフィールドを |
| 検索の範囲を設定するための実行 ID。アクティブな実行の下にトレースが作成されると、トレースは実行に関連付けられ、実行 ID でフィルター処理してトレースを取得できます。実行 ID でトレースをフィルタリングする方法については、以下の例を参照してください。 |
| 戻り値の型。次の戻り値の型がサポートされています。Pandas ライブラリがインストールされている場合、デフォルトの戻り値の型は「Pandas」です。それ以外の場合、デフォルトの戻り値の型は "list" です。 • |
| 指定した場合、指定されたモデル ID に関連付けられたトレースを検索します。 |
MLflow には、 MlflowClient.search_traces()
.ただし、ページネーションのサポートを除き、 mlflow.search_traces()
を使用することをお勧めします。これにより、より便利なデフォルトと、DataFrame 出力やフィールド抽出などの追加機能を備えた機能のスーパーセットが提供されます。
検索可能フィールドの参照
これらのフィールドの完全なリファレンスについては、 トレース・データ・モデルを参照してください。
フィールドタイプ | 検索パス | 演算子 | 値 | 注 |
---|---|---|---|---|
メタデータ |
|
| 文字列の等価性のみ | |
Tags |
|
| 文字列の等価性のみ | |
ステータス |
|
|
| 文字列の等価性のみ |
名前 |
|
| トレース名 | 文字列の等価性のみ |
Timestamp |
|
| 作成時間 (エポックからのミリ秒) | 数値の比較 |
実行時間 |
|
| 期間 (ミリ秒単位) | 数値の比較 |
メタデータの詳細
次のメタデータ フィールドをフィルタリングに使用できます。
metadata.mlflow.traceInputs
: コンテンツのリクエストmetadata.mlflow.traceOutputs
: レスポンス内容metadata.mlflow.sourceRun
: ソース実行 IDmetadata.mlflow.modelId
: モデルIDmetadata.mlflow.trace.sizeBytes
: トレース・サイズ (バイト単位)metadata.mlflow.trace.tokenUsage
: 集計されたトークン使用状況情報 (JSON 文字列)metadata.mlflow.trace.user
: アプリケーションリクエストのユーザーID/名前metadata.mlflow.trace.session
: アプリケーション要求のセッション ID
タグの詳細
ユーザー定義タグに加えて、次のシステム定義タグを使用できます。
mlflow.traceName
: トレースの名前eval.requestId
: 評価リクエスト ID、mlflow.genai.evaluate()
で設定
フィルター構文ルール
- 必要なテーブルプレフィックス : 常に
attributes.
、tags.
、またはmetadata.
- ドットのバッククォート : ドットのあるフィールドにはバッククォートが必要です。 ``tags.`mlflow.traceName```
- 一重引用符のみ : 文字列値には一重引用符を使用する必要があります。
'value'
- 大文字と小文字を区別する : すべてのフィールド名と値で大文字と小文字が区別されます
- AND のみ : OR 演算子はサポートされていません
構文による順序付け
# Single field ordering
order_by=["attributes.timestamp_ms DESC"]
order_by=["attributes.execution_time_ms ASC"]
# Multiple field ordering (applied in sequence)
order_by=[
"attributes.timestamp_ms DESC",
"attributes.execution_time_ms ASC"
]
# Supported fields for ordering
# - attributes.timestamp_ms (and aliases)
# - attributes.execution_time_ms (and aliases)
# - attributes.status
# - attributes.name
一般的なパターン
# Status filtering
"attributes.status = 'OK'"
"attributes.status = 'ERROR'"
# Time-based queries
"attributes.timestamp_ms > 1749006880539"
"attributes.execution_time_ms > 5000"
# Tag searches
"tags.user_id = 'U001'"
"tags.`mlflow.traceName` = 'my_function'"
# Metadata queries
"metadata.`mlflow.user` = 'alice@company.com'"
"metadata.`mlflow.traceOutputs` != ''"
# Combined filters
"attributes.status = 'OK' AND tags.environment = 'production'"
"attributes.timestamp_ms > 1749006880539 AND attributes.execution_time_ms > 1000"
よくある落とし穴
❌ 間違った | ✅ そうです | 問題 |
---|---|---|
|
| プレフィックスがありません |
|
| プレフィックスとバッククォートがありません |
|
| 文字列ではなくミリ秒を使用する |
|
| 一重引用符を使用する |
| 個別のクエリを使用する | OR サポートされていません |
詳細な検索例
実行 ID で検索
# Find all traces associated with a specific MLflow run
with mlflow.start_run() as run:
# Your traced code here
traced_result = my_traced_function()
# Search for traces from this run
run_traces = mlflow.search_traces(
run_id=run.info.run_id,
return_type="list" # Get list of Trace objects
)
コントロールの戻り値の型
# Get results as pandas DataFrame (default if pandas is installed)
traces_df = mlflow.search_traces(
filter_string="attributes.status = 'OK'",
return_type="pandas"
)
# Get results as list of Trace objects
traces_list = mlflow.search_traces(
filter_string="attributes.status = 'OK'",
return_type="list"
)
# Access trace details from list
for trace in traces_list:
print(f"Trace ID: {trace.info.trace_id}")
print(f"Status: {trace.info.state}")
print(f"Duration: {trace.info.execution_duration}")
モデルIDから探す
# Find traces associated with a specific MLflow model
model_traces = mlflow.search_traces(
model_id="my-model-123",
filter_string="attributes.status = 'OK'"
)
# Analyze model performance
print(f"Found {len(model_traces)} successful traces for model")
print(f"Average latency: {model_traces['execution_time_ms'].mean():.2f}ms")
ステータスから探す
# Find successful traces
traces = mlflow.search_traces(filter_string="attributes.status = 'OK'")
# Find failed traces
traces = mlflow.search_traces(filter_string="attributes.status = 'ERROR'")
# Find in-progress traces
traces = mlflow.search_traces(filter_string="attributes.status = 'IN_PROGRESS'")
# Exclude errors
traces = mlflow.search_traces(filter_string="attributes.status != 'ERROR'")
トレース名で検索
# Find traces with specific name (rarely used - legacy field)
traces = mlflow.search_traces(filter_string="attributes.name = 'foo'")
# Find traces excluding a specific name
traces = mlflow.search_traces(filter_string="attributes.name != 'test_trace'")
# Note: Most users should use tags.`mlflow.traceName` instead
traces = mlflow.search_traces(
filter_string="tags.`mlflow.traceName` = 'process_request'"
)
タイムスタンプで検索
import time
from datetime import datetime
# Current time in milliseconds
current_time_ms = int(time.time() * 1000)
# Last 5 minutes
five_minutes_ago = current_time_ms - (5 * 60 * 1000)
traces = mlflow.search_traces(
filter_string=f"attributes.timestamp_ms > {five_minutes_ago}"
)
# Specific date range
start_date = int(datetime(2024, 1, 1).timestamp() * 1000)
end_date = int(datetime(2024, 1, 31).timestamp() * 1000)
traces = mlflow.search_traces(
filter_string=f"attributes.timestamp_ms > {start_date} AND attributes.timestamp_ms < {end_date}"
)
# Using timestamp aliases
traces = mlflow.search_traces(filter_string=f"attributes.timestamp > {five_minutes_ago}")
実行時間で検索
# Find slow traces (>5 seconds)
traces = mlflow.search_traces(filter_string="attributes.execution_time_ms > 5000")
# Find fast traces (<100ms)
traces = mlflow.search_traces(filter_string="attributes.execution_time_ms < 100")
# Performance range
traces = mlflow.search_traces(
filter_string="attributes.execution_time_ms > 100 AND attributes.execution_time_ms < 1000"
)
# Using execution time aliases
traces = mlflow.search_traces(filter_string="attributes.latency > 1000")
タグで検索
# Custom tags (set via mlflow.update_current_trace)
traces = mlflow.search_traces(filter_string="tags.customer_id = 'C001'")
traces = mlflow.search_traces(filter_string="tags.environment = 'production'")
traces = mlflow.search_traces(filter_string="tags.version = 'v2.1.0'")
# MLflow system tags (require backticks)
traces = mlflow.search_traces(
filter_string="tags.`mlflow.traceName` = 'process_chat_request'"
)
traces = mlflow.search_traces(
filter_string="tags.`mlflow.artifactLocation` != ''"
)
メタデータによる検索
# Search by response content (exact match)
traces = mlflow.search_traces(
filter_string="metadata.`mlflow.traceOutputs` = 'exact response text'"
)
# Find traces with any output
traces = mlflow.search_traces(
filter_string="metadata.`mlflow.traceOutputs` != ''"
)
# Search by user
traces = mlflow.search_traces(
filter_string="metadata.`mlflow.user` = 'alice@company.com'"
)
# Search by source file
traces = mlflow.search_traces(
filter_string="metadata.`mlflow.source.name` = 'app.py'"
)
# Search by git information
traces = mlflow.search_traces(
filter_string="metadata.`mlflow.source.git.branch` = 'main'"
)
AND を使用した複雑なフィルター
# Recent successful production traces
current_time_ms = int(time.time() * 1000)
one_hour_ago = current_time_ms - (60 * 60 * 1000)
traces = mlflow.search_traces(
filter_string=f"attributes.status = 'OK' AND "
f"attributes.timestamp_ms > {one_hour_ago} AND "
f"tags.environment = 'production'"
)
# Fast traces from specific user
traces = mlflow.search_traces(
filter_string="attributes.execution_time_ms < 100 AND "
"metadata.`mlflow.user` = 'alice@company.com'"
)
# Specific function with performance threshold
traces = mlflow.search_traces(
filter_string="tags.`mlflow.traceName` = 'process_payment' AND "
"attributes.execution_time_ms > 1000"
)
結果の並べ替え
# Most recent first
traces = mlflow.search_traces(
filter_string="attributes.status = 'OK'",
order_by=["attributes.timestamp_ms DESC"]
)
# Fastest first
traces = mlflow.search_traces(
order_by=["attributes.execution_time_ms ASC"]
)
# Multiple sort criteria
traces = mlflow.search_traces(
filter_string="attributes.status = 'OK'",
order_by=[
"attributes.timestamp_ms DESC",
"attributes.execution_time_ms ASC"
]
)
データフレーム の操作
mlflow.search_traces
によって返される DataFrame には、次の列が含まれています。
traces_df = mlflow.search_traces()
# Default columns
print(traces_df.columns)
# ['request_id', 'trace', 'timestamp_ms', 'status', 'execution_time_ms',
# 'request', 'response', 'request_metadata', 'spans', 'tags']
スパンフィールドの抽出
# Extract specific span fields into DataFrame columns
traces = mlflow.search_traces(
extract_fields=[
"process_request.inputs.customer_id",
"process_request.outputs",
"validate_input.inputs",
"generate_response.outputs.message"
]
)
# Use extracted fields for evaluation dataset
eval_data = traces.rename(columns={
"process_request.inputs.customer_id": "customer",
"generate_response.outputs.message": "ground_truth"
})
動的クエリの構築
def build_trace_filter(status=None, user=None, min_duration=None,
max_duration=None, tags=None, after_timestamp=None):
"""Build dynamic filter string from parameters"""
conditions = []
if status:
conditions.append(f"attributes.status = '{status}'")
if user:
conditions.append(f"metadata.`mlflow.user` = '{user}'")
if min_duration:
conditions.append(f"attributes.execution_time_ms > {min_duration}")
if max_duration:
conditions.append(f"attributes.execution_time_ms < {max_duration}")
if after_timestamp:
conditions.append(f"attributes.timestamp_ms > {after_timestamp}")
if tags:
for key, value in tags.items():
# Handle dotted tag names
if '.' in key:
conditions.append(f"tags.`{key}` = '{value}'")
else:
conditions.append(f"tags.{key} = '{value}'")
return " AND ".join(conditions) if conditions else None
# Usage
filter_string = build_trace_filter(
status="OK",
user="alice@company.com",
min_duration=100,
tags={"environment": "production", "mlflow.traceName": "process_order"}
)
traces = mlflow.search_traces(filter_string=filter_string)
実用例の参照
Error モニタリング
本番運用環境のエラーを監視および分析します。
import mlflow
import time
import pandas as pd
def monitor_errors(experiment_name: str, hours: int = 1):
"""Monitor errors in the last N hours."""
# Calculate time window
current_time_ms = int(time.time() * 1000)
cutoff_time_ms = current_time_ms - (hours * 60 * 60 * 1000)
# Find all errors
failed_traces = mlflow.search_traces(
filter_string=f"attributes.status = 'ERROR' AND "
f"attributes.timestamp_ms > {cutoff_time_ms}",
order_by=["attributes.timestamp_ms DESC"]
)
if len(failed_traces) == 0:
print(f"No errors found in the last {hours} hour(s)")
return
# Analyze error patterns
print(f"Found {len(failed_traces)} errors in the last {hours} hour(s)\n")
# Group by function name
error_by_function = failed_traces.groupby('tags.mlflow.traceName').size()
print("Errors by function:")
print(error_by_function.to_string())
# Show recent error samples
print("\nRecent error samples:")
for _, trace in failed_traces.head(5).iterrows():
print(f"- {trace['request_preview'][:60]}...")
print(f" Function: {trace.get('tags.mlflow.traceName', 'unknown')}")
print(f" Time: {pd.to_datetime(trace['timestamp_ms'], unit='ms')}")
print()
return failed_traces
パフォーマンスプロファイリング
パフォーマンス特性を分析し、ボトルネックを特定します。
def profile_performance(function_name: str = None, percentiles: list = [50, 95, 99]):
"""Profile performance metrics for traces."""
# Build filter
filter_parts = []
if function_name:
filter_parts.append(f"tags.`mlflow.traceName` = '{function_name}'")
filter_string = " AND ".join(filter_parts) if filter_parts else None
# Get traces
traces = mlflow.search_traces(filter_string=filter_string)
if len(traces) == 0:
print("No traces found")
return
# Calculate percentiles
perf_stats = traces['execution_time_ms'].describe(percentiles=[p/100 for p in percentiles])
print(f"Performance Analysis ({len(traces)} traces)")
print("=" * 40)
for p in percentiles:
print(f"P{p}: {perf_stats[f'{p}%']:.1f}ms")
print(f"Mean: {perf_stats['mean']:.1f}ms")
print(f"Max: {perf_stats['max']:.1f}ms")
# Find outliers (>P99)
if 99 in percentiles:
p99_threshold = perf_stats['99%']
outliers = traces[traces['execution_time_ms'] > p99_threshold]
if len(outliers) > 0:
print(f"\nOutliers (>{p99_threshold:.0f}ms): {len(outliers)} traces")
for _, trace in outliers.head(3).iterrows():
print(f"- {trace['execution_time_ms']:.0f}ms: {trace['request_preview'][:50]}...")
return traces
ユーザーアクティビティ分析
ユーザーの行動パターンを追跡して分析します。
def analyze_user_activity(user_id: str, days: int = 7):
"""Analyze activity patterns for a specific user."""
cutoff_ms = int((time.time() - days * 86400) * 1000)
traces = mlflow.search_traces(
filter_string=f"metadata.`mlflow.user` = '{user_id}' AND "
f"attributes.timestamp_ms > {cutoff_ms}",
order_by=["attributes.timestamp_ms DESC"]
)
if len(traces) == 0:
print(f"No activity found for user {user_id}")
return
print(f"User {user_id} Activity Report ({days} days)")
print("=" * 50)
print(f"Total requests: {len(traces)}")
# Daily activity
traces['date'] = pd.to_datetime(traces['timestamp_ms'], unit='ms').dt.date
daily_activity = traces.groupby('date').size()
print(f"\nDaily activity:")
print(daily_activity.to_string())
# Query categories
if 'tags.query_category' in traces.columns:
categories = traces['tags.query_category'].value_counts()
print(f"\nQuery categories:")
print(categories.to_string())
# Performance stats
print(f"\nPerformance:")
print(f"Average response time: {traces['execution_time_ms'].mean():.1f}ms")
print(f"Error rate: {(traces['status'] == 'ERROR').mean() * 100:.1f}%")
return traces
おすすめの方法
1. 一貫性のあるタグ付け戦略を設計する
組織のタグ付け分類を作成します。
class TraceTagging:
"""Standardized tagging strategy for traces."""
# Required tags for all traces
REQUIRED_TAGS = ["environment", "version", "service_name"]
# Category mappings
CATEGORIES = {
"user_management": ["login", "logout", "profile_update"],
"content_generation": ["summarize", "translate", "rewrite"],
"data_retrieval": ["search", "fetch", "query"]
}
@staticmethod
def tag_trace(operation: str, **kwargs):
"""Apply standardized tags to current trace."""
tags = {
"operation": operation,
"timestamp": datetime.now().isoformat(),
"service_name": "genai-platform"
}
# Add category based on operation
for category, operations in TraceTagging.CATEGORIES.items():
if operation in operations:
tags["category"] = category
break
# Add custom tags
tags.update(kwargs)
# Validate required tags
for required in TraceTagging.REQUIRED_TAGS:
if required not in tags:
tags[required] = "unknown"
mlflow.update_current_trace(tags=tags)
return tags
2. 再利用可能な検索ユーティリティを構築する
class TraceSearcher:
"""Reusable trace search utilities."""
def __init__(self, experiment_ids: list = None):
self.experiment_ids = experiment_ids
def recent_errors(self, hours: int = 1) -> pd.DataFrame:
"""Get recent error traces."""
cutoff = int((time.time() - hours * 3600) * 1000)
return mlflow.search_traces(
experiment_ids=self.experiment_ids,
filter_string=f"attributes.status = 'ERROR' AND "
f"attributes.timestamp_ms > {cutoff}",
order_by=["attributes.timestamp_ms DESC"]
)
def slow_operations(self, threshold_ms: int = 5000) -> pd.DataFrame:
"""Find operations slower than threshold."""
return mlflow.search_traces(
experiment_ids=self.experiment_ids,
filter_string=f"attributes.execution_time_ms > {threshold_ms}",
order_by=["attributes.execution_time_ms DESC"]
)
def by_user(self, user_id: str, days: int = 7) -> pd.DataFrame:
"""Get traces for a specific user."""
cutoff = int((time.time() - days * 86400) * 1000)
return mlflow.search_traces(
experiment_ids=self.experiment_ids,
filter_string=f"tags.user_id = '{user_id}' AND "
f"attributes.timestamp_ms > {cutoff}",
order_by=["attributes.timestamp_ms DESC"]
)
def by_category(self, category: str, status: str = None) -> pd.DataFrame:
"""Get traces by category with optional status filter."""
filters = [f"tags.category = '{category}'"]
if status:
filters.append(f"attributes.status = '{status}'")
return mlflow.search_traces(
experiment_ids=self.experiment_ids,
filter_string=" AND ".join(filters)
)
def performance_report(self, function_name: str = None) -> dict:
"""Generate performance report."""
filter_parts = []
if function_name:
filter_parts.append(f"tags.`mlflow.traceName` = '{function_name}'")
filter_string = " AND ".join(filter_parts) if filter_parts else None
traces = mlflow.search_traces(
experiment_ids=self.experiment_ids,
filter_string=filter_string
)
if len(traces) == 0:
return {"error": "No traces found"}
return {
"total_traces": len(traces),
"error_rate": (traces['status'] == 'ERROR').mean(),
"avg_duration_ms": traces['execution_time_ms'].mean(),
"p50_duration_ms": traces['execution_time_ms'].quantile(0.5),
"p95_duration_ms": traces['execution_time_ms'].quantile(0.95),
"p99_duration_ms": traces['execution_time_ms'].quantile(0.99)
}
# Usage example
searcher = TraceSearcher()
errors = searcher.recent_errors(hours=24)
slow_ops = searcher.slow_operations(threshold_ms=10000)
user_traces = searcher.by_user("U001", days=30)
report = searcher.performance_report("process_request")
次のステップ
- 評価データセットの構築 - クエリされたトレースをテストデータセットに変換します
- Delete traces - データマネジメントの検索条件に基づいてトレースを削除します