コンテキストマネージャーによるスパントレース
mlflow.start_span()コンテキスト マネージャーを使用すると、任意のコード ブロックのスパンを作成できます。関数デコレータは関数の粒度でトレースしますが、 start_span()コード内のよりきめ細かで複雑な相互作用をキャプチャできます。
コンテキスト マネージャーを使用したスパン トレースにより、トレースされるコードをきめ細かく制御できます。
- 任意のコードブロック: 関数全体だけでなく、任意のコードブロックをトレースします。
- 自動コンテキスト管理: MLflowは親子関係とクリーンアップを処理します
- 関数デコレータと連携: ハイブリッドアプローチのために
@mlflow.traceと組み合わせて使用します - 例外処理: 関数デコレータと同様に、エラーを自動で捕捉します。
前提条件
このチュートリアルには次のパッケージが必要です。
mlflow[databricks]3.1 以上: GenAI 機能と Databricks 接続を備えたコア MLflow 機能。openai1.0.0 以上: 以下のサンプル アプリは OpenAI クライアントを使用します。独自のコードでは、必要に応じてこれを他の SDK に置き換えます。
基本要件をインストールします。
%pip install --upgrade "mlflow[databricks]>=3.1" "openai>=1.0.0"
dbutils.library.restartPython()
MLflow 2の前提条件
Databricks では、 mlflow[databricks]を使用する場合は MLflow 3.1 以降をインストールすることを強くお勧めします。
MLflow 2 の場合、コンテキスト マネージャーを使用したスパン トレースには次のパッケージが必要です。
mlflow[databricks]2.15.0 以上: Databricks 接続を備えた Core MLflow 機能。openai1.0.0 以上: (オプション) カスタム コードで OpenAI クライアントを使用する場合はインストールします。
%pip install --upgrade "mlflow[databricks]>=2.15.0,<3.0.0"
pip install --upgrade openai>=1.0.0 # Install if needed
dbutils.library.restartPython()
スパントレースAPI
関数デコレータと同様に、コンテキスト マネージャーは親子関係、例外、実行時間を自動的にキャプチャします。自動トレースにも対応しております。
関数デコレータとは異なり、スパンの名前、入力、および出力は手動で提供する必要があります。コンテキスト マネージャーから返されるLiveSpanオブジェクトを使用して設定できます。
以下のコード スニペットは、基本的なスパン トレースを示しています。
import mlflow
with mlflow.start_span(name="my_span") as span:
x = 1
y = 2
span.set_inputs({"x":x, "y": y})
z = x + y
span.set_outputs(z)
スパンイベント
SpanEventオブジェクトは、スパンの有効期間中の特定の発生を記録します。以下のコード スニペットは次の内容を示しています。
- 現在のタイムスタンプでイベントを作成する
- 特定のタイムスタンプ(ナノ秒)を持つイベントを作成する
- イベントの作成
Exception
import mlflow
from mlflow.entities import SpanEvent, SpanType
import time
with mlflow.start_span(name="manual_span", span_type=SpanType.CHAIN) as span:
# Create an event with current timestamp
event = SpanEvent(
name="validation_completed",
attributes={
"records_validated": 1000,
"errors_found": 3,
"validation_type": "schema"
}
)
span.add_event(event)
# Create an event with specific timestamp (nanoseconds)
specific_time_event = SpanEvent(
name="data_checkpoint",
timestamp=int(time.time() * 1e9),
attributes={"checkpoint_id": "ckpt_123"}
)
span.add_event(specific_time_event)
# Create an event from an exception
try:
raise ValueError("Invalid input format")
except Exception as e:
error_event = SpanEvent.from_exception(e)
# This creates an event with name="exception" and attributes containing:
# - exception.message
# - exception.type
# - exception.stacktrace
# Add to current span
span = mlflow.get_current_active_span()
span.add_event(error_event)
スパンステータス
SpanStatusスパンのステータスを定義します。mlflow.start_span()コンテキスト マネージャーは終了時にステータスを上書きすることに注意してください。以下のコード スニペットは、スパンのステータスを設定するさまざまな方法を示しています。
import mlflow
from mlflow.entities import SpanStatus, SpanStatusCode, SpanType
with mlflow.start_span(name="manual_span", span_type=SpanType.CHAIN) as span:
# Create status objects
success_status = SpanStatus(SpanStatusCode.OK)
error_status = SpanStatus(
SpanStatusCode.ERROR,
description="Failed to connect to database"
)
# Set status on a live span
span.set_status(success_status)
# Or use string shortcuts
span.set_status("OK")
span.set_status("ERROR")
# When the context manager exits successfully, the status is overwritten with status "OK"
完了したスパンからのクエリステータス:
last_trace_id = mlflow.get_last_active_trace_id()
trace = mlflow.get_trace(last_trace_id)
for span in trace.data.spans:
print(span.status.status_code)
RETRIEVERスパン
データ ストアからドキュメントを取得するときは、 RETRIEVERスパンを使用します。RETRIEVERスパンは、次の例に示すようにDocumentsのリストを出力する必要があります。
import mlflow
from mlflow.entities import Document, SpanType
@mlflow.trace(span_type=SpanType.RETRIEVER)
def retrieve_documents(query: str):
span = mlflow.get_current_active_span()
# Create Document objects (required for RETRIEVER spans)
documents = [
Document(
page_content="The content of the document...",
metadata={
"doc_uri": "path/to/document.md",
"chunk_id": "chunk_001",
"relevance_score": 0.95,
"source": "knowledge_base"
},
id="doc_123" # Optional document ID
),
Document(
page_content="Another relevant section...",
metadata={
"doc_uri": "path/to/other.md",
"chunk_id": "chunk_042",
"relevance_score": 0.87
}
)
]
# Set outputs as Document objects for proper UI rendering
span.set_outputs(documents)
# Return in your preferred format
return [doc.to_dict() for doc in documents]
retrieve_documents(query="What is ML?")
アクセス リトリーバーの出力:
last_trace_id = mlflow.get_last_active_trace_id()
trace = mlflow.get_trace(last_trace_id)
retriever_span = trace.search_spans(span_type=SpanType.RETRIEVER)[0]
if retriever_span.outputs:
for doc in retriever_span.outputs:
if isinstance(doc, dict):
content = doc.get('page_content', '')
uri = doc.get('metadata', {}).get('doc_uri', '')
score = doc.get('metadata', {}).get('relevance_score', 0)
print(f"Document from {uri} (score: {score})")
高度な例
以下は、次のものを組み合わせたより複雑な例です。
mlflow.start_span()コンテキスト マネージャー@mlflow.trace関数デコレータ- OpenAIの自動トレース
from databricks.sdk import WorkspaceClient
import mlflow
from mlflow.entities import SpanEvent, SpanType
import openai
import time
# Enable auto-tracing for OpenAI
mlflow.openai.autolog()
# Create an OpenAI client that is connected to Databricks-hosted LLMs.
workspace = WorkspaceClient()
client = workspace.serving_endpoints.get_open_ai_client()
@mlflow.trace(span_type=SpanType.CHAIN)
def chat_iteration(messages, user_input):
with mlflow.start_span(name="User", span_type=SpanType.CHAIN) as span:
span.set_inputs({
"messages": messages,
"timestamp": time.time(),
})
# Set individual attribute
span.set_attribute("messages_length", len(messages))
# Set multiple attributes at once
span.set_attributes({
"environment": "production",
"custom_metadata": {"key": "value"}
})
# Add events during execution
span.add_event(SpanEvent(
name="processing_started",
attributes={
"stage": "initialization",
"memory_usage_mb": 256,
}
))
span.set_outputs(user_input)
messages.append({"role": "user", "content": user_input})
response = client.chat.completions.create(
model="databricks-claude-sonnet-4-5",
max_tokens=100,
messages=messages,
)
answer = response.choices[0].message.content
print(f"Assistant: {answer}")
messages.append({"role": "assistant", "content": answer})
chat_iteration(
messages = [{"role": "system", "content": "You are a friendly chat bot"}],
user_input="What is your favorite color?",
)
より長い会話のネストされたトレースの例を確認するには、以下の例のコメントを解除します。
# @mlflow.trace(span_type=SpanType.CHAIN)
# def start_session():
# messages = [{"role": "system", "content": "You are a friendly chat bot"}]
# while True:
# user_input = input(">> ")
# chat_iteration(messages, user_input)
# if user_input == "BYE":
# break
# start_session()
次のステップ
- 関数デコレータ- 関数全体をトレースするためのよりシンプルなアプローチ
- 低レベルのクライアントAPI - 完全な制御を必要とする高度なシナリオを学習します
- アプリのデバッグと分析- ログに記録されたトレースをクエリして分析する