メインコンテンツまでスキップ

デコレーター & Fluent API (推奨)

Auto Tracing の統合に加えて、Python を使用してMLflow TracingSDK コードをインストゥルメントできます。これは、カスタムPythonコードを計測可能にする必要がある場合に特に便利です。

デコレータ

@mlflow.traceデコレータを使用すると、任意の関数のスパンを作成できます。このアプローチは、最小限の労力でコードにトレースを追加するためのシンプルかつ効果的な方法を提供します。

  • MLflow は、関数間の 親子関係 を検出し、自動トレース統合と互換性を持たせます。
  • 関数の実行中に 例外 をキャプチャし、スパンイベントとして記録します。
  • 関数の 名前、入力、出力、および実行時間 を自動的にログに記録します。
  • mlflow.openai.autologなどの 自動トレース 機能と併用できます。

@mlflow.trace デコレータは現在、次のタイプの関数をサポートしています。

機能タイプ

サポート

同期

Yes

非同期

はい (>= 2.16.0)

ジェネレータ

はい (>= 2.20.2)

非同期ジェネレーター

はい (>= 2.20.2)

次のコードは、Python 関数をトレースするためにデコレータを使用する最小限の例です。

ヒント

デコレーターの順序

完全な可観測性を確保するために、複数のデコレータを使用する場合は、通常、 @mlflow.trace デコレータを 最も外側 にする必要があります。詳細な説明と例については、 他のデコレータとの @mlflow.trace の使用 を参照してください。

Python
import mlflow


@mlflow.trace(span_type="func", attributes={"key": "value"})
def add_1(x):
return x + 1


@mlflow.trace(span_type="func", attributes={"key1": "value1"})
def minus_1(x):
return x - 1


@mlflow.trace(name="Trace Test")
def trace_test(x):
step1 = add_1(x)
return minus_1(step1)


trace_test(4)

トレーシングデコレータ

注記

トレースに同じ名前のスパンが複数含まれている場合、MLflow はそれらに自動インクリメント サフィックス ( _1_2など) を追加します。

スパンのカスタマイズ

@mlflow.traceデコレータは、作成するスパンをカスタマイズするために次の引数を受け入れます。

  • name デフォルト (装飾された関数の名前) からスパン名をオーバーライドするパラメーター
  • span_type パラメーターを使用して、スパン のタイプを設定します。 組み込みの スパン タイプ のいずれかまたは文字列を設定します。
  • attributes パラメーターを使用して、スパンにカスタム属性を追加します。
ヒント

デコレーターの順序

@mlflow.traceを他のデコレータ(Webフレームワークなど)と組み合わせる場合、最も外側であることが重要です。正しい順序と間違った順序の明確な例については、 他のデコレータでの @mlflow.trace の使用を参照してください。

Python
@mlflow.trace(
name="call-local-llm", span_type=SpanType.LLM, attributes={"model": "gpt-4o-mini"}
)
def invoke(prompt: str):
return client.invoke(
messages=[{"role": "user", "content": prompt}], model="gpt-4o-mini"
)

または、 mlflow.get_current_active_span API を使用して、関数内でスパンを動的に更新することもできます。

Python
@mlflow.trace(span_type=SpanType.LLM)
def invoke(prompt: str):
model_id = "gpt-4o-mini"
# Get the current span (created by the @mlflow.trace decorator)
span = mlflow.get_current_active_span()
# Set the attribute to the span
span.set_attributes({"model": model_id})
return client.invoke(messages=[{"role": "user", "content": prompt}], model=model_id)

他のデコレータと @mlflow.trace を併用する

複数のデコレーターを 1 つの関数に適用する場合は、 @mlflow.trace最も外側 のデコレーター (一番上のデコレーター) として配置することが重要です。これにより、MLflow は、内部デコレーターの動作を含む、関数の実行全体をキャプチャできます。

@mlflow.traceが最も外側のデコレータでない場合、関数の実行に対する可視性が制限されたり不正確になったりして、関数の入力、出力、実行時間のトレースが不完全になったり、誤って表現されたりする可能性があります。

次の概念的な例を考えてみます。

Python
import mlflow
import functools
import time

# A hypothetical additional decorator
def simple_timing_decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
print(f"{func.__name__} executed in {end_time - start_time:.4f} seconds by simple_timing_decorator.")
return result
return wrapper

# Correct order: @mlflow.trace is outermost
@mlflow.trace(name="my_decorated_function_correct_order")
@simple_timing_decorator
# @another_framework_decorator # e.g., @app.route("/mypath") from Flask
def my_complex_function(x, y):
# Function logic here
time.sleep(0.1) # Simulate work
return x + y

# Incorrect order: @mlflow.trace is NOT outermost
@simple_timing_decorator
@mlflow.trace(name="my_decorated_function_incorrect_order")
# @another_framework_decorator
def my_other_complex_function(x, y):
time.sleep(0.1)
return x * y

# Example calls
if __name__ == "__main__":
print("Calling function with correct decorator order:")
my_complex_function(5, 3)

print("\nCalling function with incorrect decorator order:")
my_other_complex_function(5, 3)

my_complex_function例(正しい順序)では、@mlflow.tracesimple_timing_decoratorによって追加された時間を含む完全な実行をキャプチャします。my_other_complex_function (順序が正しくない) 場合、MLflow によってキャプチャされたトレースに合計実行時間が正確に反映されていないか、simple_timing_decorator によって行われた入力/出力の変更が見つかる前に見逃@mlflow.trace可能性があります。

トレースタグの追加

タグをトレースに追加して、トレース レベルで追加のメタデータを提供できます。トレースにタグを設定するには、いくつかの方法があります。その他の方法については、 how-to guide をご参照ください。

Python
@mlflow.trace
def my_func(x):
mlflow.update_current_trace(tags={"fruit": "apple"})
return x + 1

UI での要求と応答のプレビューのカスタマイズ

MLflow UI の [トレース] タブにはトレースの一覧が表示され、[ Request ] 列と [ Response ] 列には、各トレースのエンド ツー エンドの入力と出力のプレビューが表示されます。これにより、各トレースが何を表しているのかをすばやく理解できます。

デフォルトでは、これらのプレビューは固定文字数に切り捨てられます。ただし、これらの列に表示される内容は、mlflow.update_current_trace() 関数内の [request_preview] パラメーターと [response_preview] パラメーターを使用してカスタマイズできます。これは、デフォルトの切り捨てで最も関連性の高い情報が表示されない可能性がある複雑な入力または出力に特に役立ちます。

以下は、長いドキュメントとユーザー指示を処理するトレースのカスタムリクエストプレビューを設定する例で、UI の Request 列に最も関連性の高い情報をレンダリングすることを目的としています。

Python
import mlflow

@mlflow.trace(name="Summarization Pipeline")
def summarize_document(document_content: str, user_instructions: str):
# Construct a custom preview for the request column
# For example, show beginning of document and user instructions
request_p = f"Doc: {document_content[:30]}... Instr: {user_instructions[:30]}..."
mlflow.update_current_trace(request_preview=request_p)

# Simulate LLM call
# messages = [
# {"role": "system", "content": "Summarize the following document based on user instructions."},
# {"role": "user", "content": f"Document: {document_content}\nInstructions: {user_instructions}"}
# ]
# completion = client.chat.completions.create(model="gpt-4o-mini", messages=messages)
# summary = completion.choices[0].message.content
summary = f"Summary of document starting with '{document_content[:20]}...' based on '{user_instructions}'"

# Customize the response preview
response_p = f"Summary: {summary[:50]}..."
mlflow.update_current_trace(response_preview=response_p)

return summary

# Example Call
long_document = "This is a very long document that contains many details about various topics..." * 10
instructions = "Focus on the key takeaways regarding topic X."
summary_result = summarize_document(long_document, instructions)
# print(summary_result)

トレース (通常はルート スパン) に request_previewresponse_preview を設定すると、メイン トレース リスト ビューでの全体的な相互作用の集計方法を制御し、トレースを一目で識別して理解しやすくなります。

自動例外処理

トレースが計測される操作の処理中に Exception が発生した場合、呼び出しがそうではなかったことを示すメッセージが UI 内に表示されます 成功し、デバッグを支援するためにデータの部分的なキャプチャが利用可能になります。さらに、発生した例外の詳細も含まれます 部分的に完了したスパンから Events 以内で、コード内で問題が発生している場所の特定にさらに役立ちます。

トレースエラー

オートトレースとの組み合わせ

@mlflow.traceデコレータは、自動トレースと組み合わせて使用 すると、自動トレースと手動で定義されたスパンを 1 つのまとまりのある統合トレースに組み合わせることができます。詳しくは こちらをご覧ください

ストリーミング

@mlflow.trace デコレーターは、MLflow 2.20.2 以降、ジェネレーターまたは反復子を返す関数をトレースするために使用できます。

Python
@mlflow.trace
def stream_data():
for i in range(5):
yield i

上記の例では、 stream_data 関数の 1 つのスパンを持つトレースを生成します。デフォルトでは、MLflow はジェネレーターによって生成されたすべての要素をスパンの出力のリストとしてキャプチャします。上記の例では、スパンの出力は [0, 1, 2, 3, 4]になります。

注記

ストリーム関数のスパンは、返されたイテレータが 消費 され始めたときに開始され、イテレータが使い果たされるか、イテレーション中に例外が発生したときに終了します。

出力レデューサーの使用

エレメントを 1 つのスパン出力として集約する場合は、 output_reducer パラメーターを使用して、エレメントを集約するカスタム関数を指定できます。カスタム関数は、生成された要素のリストを入力として受け取る必要があります。

Python
from typing import List, Any

@mlflow.trace(output_reducer=lambda x: ",".join(x))
def stream_data():
for c in "hello":
yield c

上記の例では、スパンの出力は "h,e,l,l,o"になります。未加工のチャンクは、MLflow トレース UI のスパンの [ Events ] タブに引き続き表示されるため、デバッグ時に生成された個々の値を検査できます。

一般的な出力レデューサー パターン

出力リデューサーを実装するための一般的なパターンを次に示します。

トークン集約
Python
from typing import List, Dict, Any

def aggregate_tokens(chunks: List[str]) -> str:
"""Concatenate streaming tokens into complete text"""
return "".join(chunks)

@mlflow.trace(output_reducer=aggregate_tokens)
def stream_text():
for word in ["Hello", " ", "World", "!"]:
yield word
メトリクスの集計
Python
def aggregate_metrics(chunks: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Aggregate streaming metrics into summary statistics"""
values = [c["value"] for c in chunks if "value" in c]
return {
"count": len(values),
"sum": sum(values),
"average": sum(values) / len(values) if values else 0,
"max": max(values) if values else None,
"min": min(values) if values else None
}

@mlflow.trace(output_reducer=aggregate_metrics)
def stream_metrics():
for i in range(10):
yield {"value": i * 2, "timestamp": time.time()}
エラー収集
Python
def collect_results_and_errors(chunks: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Separate successful results from errors"""
results = []
errors = []

for chunk in chunks:
if chunk.get("error"):
errors.append(chunk["error"])
else:
results.append(chunk.get("data"))

return {
"results": results,
"errors": errors,
"success_rate": len(results) / len(chunks) if chunks else 0,
"has_errors": len(errors) > 0
}

高度な例: OpenAI ストリーミング

以下は、 output_reducer を使用して OpenAI LLM からの ChatCompletionChunk 出力を 1 つのメッセージオブジェクトに統合する高度な例です。

ヒント

本番運用のユースケースでは、これを自動的に処理する OpenAIの自動トレース を使用することをお勧めします。 以下の例は、デモンストレーション用です。

Python
import mlflow
import openai
from openai.types.chat import *
from typing import Optional


def aggregate_chunks(outputs: list[ChatCompletionChunk]) -> Optional[ChatCompletion]:
"""Consolidate ChatCompletionChunks to a single ChatCompletion"""
if not outputs:
return None

first_chunk = outputs[0]
delta = first_chunk.choices[0].delta
message = ChatCompletionMessage(
role=delta.role, content=delta.content, tool_calls=delta.tool_calls or []
)
finish_reason = first_chunk.choices[0].finish_reason
for chunk in outputs[1:]:
delta = chunk.choices[0].delta
message.content += delta.content or ""
message.tool_calls += delta.tool_calls or []
finish_reason = finish_reason or chunk.choices[0].finish_reason

base = ChatCompletion(
id=first_chunk.id,
choices=[Choice(index=0, message=message, finish_reason=finish_reason)],
created=first_chunk.created,
model=first_chunk.model,
object="chat.completion",
)
return base


@mlflow.trace(output_reducer=aggregate_chunks)
def predict(messages: list[dict]):
stream = openai.OpenAI().chat.completions.create(
model="gpt-4o-mini",
messages=messages,
stream=True,
)
for chunk in stream:
yield chunk


for chunk in predict([{"role": "user", "content": "Hello"}]):
print(chunk)

上記の例では、生成された predict スパンには、カスタムレデューサー関数によって集計される 1 つのチャット完了メッセージが出力として含まれます。

実際のユースケース

一般的な 生成AI シナリオの出力レデューサーのその他の例を次に示します。

JSON解析によるLLM応答
Python
from typing import List, Dict, Any
import json

def parse_json_from_llm(content: str) -> str:
"""Extract and clean JSON from LLM responses that may include markdown"""
# Remove common markdown code block wrappers
if content.startswith("```json") and content.endswith("```"):
content = content[7:-3] # Remove ```json prefix and ``` suffix
elif content.startswith("```") and content.endswith("```"):
content = content[3:-3] # Remove generic ``` wrappers
return content.strip()

def json_stream_reducer(chunks: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Aggregate LLM streaming output and parse JSON response"""
full_content = ""
metadata = {}
errors = []

# Process different chunk types
for chunk in chunks:
chunk_type = chunk.get("type", "content")

if chunk_type == "content" or chunk_type == "token":
full_content += chunk.get("content", "")
elif chunk_type == "metadata":
metadata.update(chunk.get("data", {}))
elif chunk_type == "error":
errors.append(chunk.get("error"))

# Return early if errors occurred
if errors:
return {
"status": "error",
"errors": errors,
"raw_content": full_content,
**metadata
}

# Try to parse accumulated content as JSON
try:
cleaned_content = parse_json_from_llm(full_content)
parsed_data = json.loads(cleaned_content)

return {
"status": "success",
"data": parsed_data,
"raw_content": full_content,
**metadata
}
except json.JSONDecodeError as e:
return {
"status": "parse_error",
"error": f"Failed to parse JSON: {str(e)}",
"raw_content": full_content,
**metadata
}

@mlflow.trace(output_reducer=json_stream_reducer)
def generate_structured_output(prompt: str, schema: dict):
"""Generate structured JSON output from an LLM"""
# Simulate streaming JSON generation
yield {"type": "content", "content": '{"name": "John", '}
yield {"type": "content", "content": '"email": "john@example.com", '}
yield {"type": "content", "content": '"age": 30}'}

# Add metadata
trace_id = mlflow.get_current_active_span().request_id if mlflow.get_current_active_span() else None
yield {"type": "metadata", "data": {"trace_id": trace_id, "model": "gpt-4"}}
OpenAIによる構造化された出力生成

OpenAI で出力リデューサーを使用して、構造化された JSON 応答を生成および解析する完全な例を次に示します。

Python
import json
import mlflow
import openai
from typing import List, Dict, Any, Optional

def structured_output_reducer(chunks: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Aggregate streaming chunks into structured output with comprehensive error handling.
Handles token streaming, metadata collection, and JSON parsing.
"""
content_parts = []
trace_id = None
model_info = None
errors = []

for chunk in chunks:
chunk_type = chunk.get("type", "token")

if chunk_type == "token":
content_parts.append(chunk.get("content", ""))
elif chunk_type == "trace_info":
trace_id = chunk.get("trace_id")
model_info = chunk.get("model")
elif chunk_type == "error":
errors.append(chunk.get("message"))

# Join all content parts
full_content = "".join(content_parts)

# Base response
response = {
"trace_id": trace_id,
"model": model_info,
"raw_content": full_content
}

# Handle errors
if errors:
response["status"] = "error"
response["errors"] = errors
return response

# Try to extract and parse JSON
try:
# Clean markdown wrappers if present
json_content = full_content.strip()
if json_content.startswith("```json") and json_content.endswith("```"):
json_content = json_content[7:-3].strip()
elif json_content.startswith("```") and json_content.endswith("```"):
json_content = json_content[3:-3].strip()

parsed_data = json.loads(json_content)
response["status"] = "success"
response["data"] = parsed_data

except json.JSONDecodeError as e:
response["status"] = "parse_error"
response["error"] = f"JSON parsing failed: {str(e)}"
response["error_position"] = e.pos if hasattr(e, 'pos') else None

return response

@mlflow.trace(output_reducer=structured_output_reducer)
async def generate_customer_email(
customer_name: str,
issue: str,
sentiment: str = "professional"
) -> None:
"""
Generate a structured customer service email response.
Demonstrates real-world streaming with OpenAI and structured output parsing.
"""
client = openai.AsyncOpenAI()

system_prompt = """You are a customer service assistant. Generate a professional email response in JSON format:
{
"subject": "email subject line",
"greeting": "personalized greeting",
"body": "main email content addressing the issue",
"closing": "professional closing",
"priority": "high|medium|low"
}"""

user_prompt = f"Customer: {customer_name}\nIssue: {issue}\nTone: {sentiment}"

try:
# Stream the response
stream = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
stream=True,
temperature=0.7
)

# Yield streaming tokens
async for chunk in stream:
if chunk.choices[0].delta.content:
yield {
"type": "token",
"content": chunk.choices[0].delta.content
}

# Add trace metadata
if current_span := mlflow.get_current_active_span():
yield {
"type": "trace_info",
"trace_id": current_span.request_id,
"model": "gpt-4o-mini"
}

except Exception as e:
yield {
"type": "error",
"message": f"OpenAI API error: {str(e)}"
}

# Example usage
async def main():
# This will automatically aggregate the streamed output into structured JSON
async for chunk in generate_customer_email(
customer_name="John Doe",
issue="Product arrived damaged",
sentiment="empathetic"
):
# In practice, you might send these chunks to a frontend
print(chunk.get("content", ""), end="", flush=True)
注記

統合の利点

この例では、いくつかの実際のパターンを示しています。

  • ストリーミング UI の更新 : トークンは到着時に表示できます
  • 構造化された出力の検証 : JSON 解析により、応答形式が保証されます
  • エラーの回復性 : API エラーと解析の失敗を適切に処理します
  • トレースの相関関係 : デバッグのためにストリーミング出力を MLflow トレースにリンクします
マルチモデル応答の集約
Python
def multi_model_reducer(chunks: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Aggregate responses from multiple models"""
responses = {}
latencies = {}

for chunk in chunks:
model = chunk.get("model")
if model:
responses[model] = chunk.get("response", "")
latencies[model] = chunk.get("latency", 0)

return {
"responses": responses,
"latencies": latencies,
"fastest_model": min(latencies, key=latencies.get) if latencies else None,
"consensus": len(set(responses.values())) == 1
}

出力レデューサーのテスト

出力レデューサーは、トレースフレームワークとは独立してテストできるため、エッジケースを正しく処理することを簡単に確認できます。

Python
import unittest
from typing import List, Dict, Any

def my_reducer(chunks: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Example reducer to be tested"""
if not chunks:
return {"status": "empty", "total": 0}

total = sum(c.get("value", 0) for c in chunks)
errors = [c for c in chunks if c.get("error")]

return {
"status": "error" if errors else "success",
"total": total,
"count": len(chunks),
"average": total / len(chunks) if chunks else 0,
"error_count": len(errors)
}

class TestOutputReducer(unittest.TestCase):
def test_normal_case(self):
chunks = [
{"value": 10},
{"value": 20},
{"value": 30}
]
result = my_reducer(chunks)
self.assertEqual(result["status"], "success")
self.assertEqual(result["total"], 60)
self.assertEqual(result["average"], 20.0)

def test_empty_input(self):
result = my_reducer([])
self.assertEqual(result["status"], "empty")
self.assertEqual(result["total"], 0)

def test_error_handling(self):
chunks = [
{"value": 10},
{"error": "Network timeout"},
{"value": 20}
]
result = my_reducer(chunks)
self.assertEqual(result["status"], "error")
self.assertEqual(result["total"], 30)
self.assertEqual(result["error_count"], 1)

def test_missing_values(self):
chunks = [
{"value": 10},
{"metadata": "some info"}, # No value field
{"value": 20}
]
result = my_reducer(chunks)
self.assertEqual(result["total"], 30)
self.assertEqual(result["count"], 3)
ヒント

パフォーマンスに関する考慮事項

  • 出力レデューサーは、メモリ内のすべてのチャンクを一度に受け取ります。非常に大きなストリームの場合は、ストリーミングの代替手段またはチャンク戦略の実装を検討してください。
  • ジェネレータが完全に消費されるまでスパンは開いたままになり、レイテンシメトリクスに影響します。
  • レデューサーはステートレスであり、予測可能な動作の副作用を回避する必要があります。

関数のラッピング

関数ラッピングは、既存の関数の定義を変更せずに、その関数にトレースを追加する柔軟な方法を提供します。これは、サードパーティの関数や制御の外部で定義された関数にトレースを追加する場合に特に便利です。外部関数を @mlflow.traceでラップすると、その入力、出力、および実行コンテキストをキャプチャできます。

注記

関数を動的にラップする場合、「最も外側」の概念は引き続き適用されます。トレース ラッパーは、ラップされた関数への呼び出し全体をキャプチャするポイントに適用する必要があります。

Python
import math
import mlflow


def invocation(x, y, exp=2):
# Wrap an external function from the math library
traced_pow = mlflow.trace(math.pow)
raised = traced_pow(x, exp)

traced_factorial = mlflow.trace(math.factorial)
factorial = traced_factorial(int(raised))
return response


invocation(4)

コンテキスト マネージャー

デコレーターに加えて、MLflow では、 mlflow.start_span コンテキスト マネージャーを使用してカプセル化された任意のコード ブロック内でアクセスできるスパンを作成できます。これは、コード内の複雑な相互作用を、1 つの関数の境界をキャプチャするよりも詳細にキャプチャする場合に便利です。

デコレータと同様に、コンテキストマネージャは親子関係、例外、実行時間を自動的にキャプチャし、自動トレースと連携します。ただし、スパンの名前、入力、および出力は手動で指定する必要があります。これらは、コンテキストマネージャーから返される mlflow.entities.Span オブジェクトを使用して設定できます。

Python
with mlflow.start_span(name="my_span") as span:
span.set_inputs({"x": 1, "y": 2})
z = x + y
span.set_outputs(z)

以下は、 mlflow.start_span コンテキストマネージャーをOpenAIのデコレータと自動トレースの両方と組み合わせて使用する、少し複雑な例です。

Python
import mlflow
from mlflow.entities import SpanType


@mlflow.trace(span_type=SpanType.CHAIN)
def start_session():
messages = [{"role": "system", "content": "You are a friendly chat bot"}]
while True:
with mlflow.start_span(name="User") as span:
span.set_inputs(messages)
user_input = input(">> ")
span.set_outputs(user_input)

if user_input == "BYE":
break

messages.append({"role": "user", "content": user_input})

response = openai.OpenAI().chat.completions.create(
model="gpt-4o-mini",
max_tokens=100,
messages=messages,
)
answer = response.choices[0].message.content
print(f"🤖: {answer}")

messages.append({"role": "assistant", "content": answer})


mlflow.openai.autolog()
start_session()

マルチスレッディング

MLflow Tracingはスレッドセーフであるため、トレースはスレッドごとにデフォルトによって分離されます。 ただし、いくつかの追加の手順で複数のスレッドにまたがるトレースを作成することもできます。

MLflow は、 Pythonの組み込み ContextVar メカニズムを使用して、デフォルトによってスレッド間で伝播されないスレッド セーフを確保します。したがって、次の例に示すように、メインスレッドからワーカースレッドにコンテキストを手動でコピーする必要があります。

Python
import contextvars
from concurrent.futures import ThreadPoolExecutor, as_completed
import mlflow
from mlflow.entities import SpanType
import openai

client = openai.OpenAI()

# Enable MLflow Tracing for OpenAI
mlflow.openai.autolog()


@mlflow.trace
def worker(question: str) -> str:
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": question},
]
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
temperature=0.1,
max_tokens=100,
)
return response.choices[0].message.content


@mlflow.trace
def main(questions: list[str]) -> list[str]:
results = []
# Almost same as how you would use ThreadPoolExecutor, but two additional steps
# 1. Copy the context in the main thread using copy_context()
# 2. Use ctx.run() to run the worker in the copied context
with ThreadPoolExecutor(max_workers=2) as executor:
futures = []
for question in questions:
ctx = contextvars.copy_context()
futures.append(executor.submit(ctx.run, worker, question))
for future in as_completed(futures):
results.append(future.result())
return results


questions = [
"What is the capital of France?",
"What is the capital of Germany?",
]

main(questions)

マルチスレッドトレース

ヒント

これに対し、 ContextVar はデフォルトで 非同期 タスクにコピーされます。asyncioPythonしたがって、MLflow Tracing を使用するときにコンテキストを手動でコピーする必要はありません。 これは、 を使用して 並列 I/O バインド タスクを処理する簡単な方法かもしれません。

(上級者向け)低レベルのクライアント API

デコレータまたはコンテキストマネージャが要件を満たさない場合は、低レベルのクライアント APIsを使用できます。 たとえば、異なる関数からスパンを開始および終了する必要がある場合があります。クライアント API は、 MLflow REST APIsのシン ラッパーとして設計されており、トレースのライフサイクルをより詳細に制御できます。 詳細については、ガイドを参照してください。

警告

クライアントAPIを使用する場合は、次の制限に注意してください。

  • 親子関係は自動的にはキャプチャされません。親スパンの ID を手動で渡す必要があります。
  • クライアント API を使用して作成されたスパンは、自動トレース スパンと組み合わされません。
  • 試験的としてマークされた低レベルの APIs は、バックエンドの実装の更新に基づいて変更される可能性があります。

## Next steps
Continue your journey with these recommended actions and tutorials.

- [Low-level Client APIs](/mlflow3/genai/tracing/app-instrumentation/manual-tracing/low-level-api.md) - Learn advanced tracing control for complex scenarios
- [Debug & observe your app](/mlflow3/genai/tracing/observe-with-traces/index.md) - Use your traced app for debugging and analysis
- [Combine with automatic tracing](/mlflow3/genai/tracing/app-instrumentation/automatic.md) - Mix manual and automatic tracing

## Reference guides
Explore detailed documentation for concepts and features mentioned in this guide.

- [Tracing data model](/mlflow3/genai/tracing/data-model.md) - Understand the structure of traces and spans
- [Tracing concepts](/mlflow3/genai/tracing/tracing-101.md) - Learn fundamentals of distributed tracing
- [FAQ](/mlflow3/genai/tracing/faq.md) - Common questions about tracing implementation