メインコンテンツまでスキップ

ユーザーからのフィードバックを収集する

ユーザーフィードバックを収集して記録することは、生成AIアプリケーションの実際の品質を理解するために不可欠です。MLflow は、フィードバックをトレースの評価としてキャプチャする構造化された方法を提供し、時間の経過に伴う品質の追跡、改善領域の特定、本番運用データからの評価データセットの構築を可能にします。

前提 条件

環境に基づいて適切なインストール方法を選択します。

本番運用デプロイメントの場合は、 mlflow-tracing パッケージをインストールします。

Bash
pip install --upgrade mlflow-tracing

mlflow-tracingパッケージは、本番運用での使用に最適化されており、依存関係が最小限に抑えられ、パフォーマンス特性が向上しています。

log_feedback API は両方のパッケージで使用できるため、どちらのインストール方法を選択しても、ユーザーからのフィードバックを収集できます。

注記

MLflow 3 は、ユーザー フィードバックを収集するために必要です。MLflow 2.x は、パフォーマンスの制限と、本番運用での使用に不可欠な機能が不足しているため、サポートされていません。

なぜユーザーフィードバックを収集するのですか?

ユーザーフィードバックは、アプリケーションのパフォーマンスに関するグラウンドトゥルースを提供します。

  1. 実世界の品質シグナル - 実際のユーザーがアプリケーションの出力をどのように認識しているかを理解する
  2. 継続的な改善 - ネガティブなフィードバックのパターンを特定し、開発を導きます
  3. トレーニング データの作成 - フィードバックを使用して、高品質の評価データセットを構築します
  4. 品質モニタリング - 満足度メトリクスを経時的に、さまざまなユーザーセグメントにわたって追跡します。
  5. モデルのファインチューニング - フィードバックデータを活用して、基礎となるモデルを改善します

フィードバックの種類

MLflow は、評価システムを通じてさまざまな種類のフィードバックをサポートしています。

フィードバックの種類

説明

一般的なユースケース

バイナリフィードバック

単純な親指のアップ/ダウンまたは修正/不正解

迅速なユーザー満足度シグナル

数値スコア

スケールでの評価(例:1〜5つ星)

詳細な品質評価

カテゴリ別フィードバック

多肢選択式

問題または対応タイプの分類

テキストフィードバック

自由形式のコメント

詳細なユーザー説明

フィードバックデータモデルを理解する

MLflow では、ユーザー フィードバックは、トレースまたは特定のスパンにアタッチできる評価 の一種である Feedback エンティティを使用してキャプチャされます。Feedback エンティティは、以下を格納する構造化された方法を提供します。

  • : 実際のフィードバック (ブール値、数値、テキスト、または構造化データ)
  • ソース : フィードバックを提供したユーザーまたは内容に関する情報 (人間のユーザー、LLM の審査員、またはコード)
  • 理由: フィードバックの説明 (オプション)
  • メタデータ : タイムスタンプやカスタム属性などの追加のコンテキスト

このデータ・モデルを理解することは、 MLflowの評価およびモニタリング機能とシームレスに統合する効果的なフィードバック収集システムを設計するのに役立ちます。 フィードバック エンティティ スキーマと使用可能なすべてのフィールドの詳細については、 トレース データ モデルのフィードバック セクションを参照してください。

エンドユーザーのフィードバック収集

本番運用でフィードバック収集を実装する場合、ユーザーのフィードバックを特定のトレースに紐づける必要があります。 次の 2 つの方法があります。

  1. クライアント要求 ID の使用 - 要求を処理するときに独自の一意の ID を生成し、後でフィードバックのために参照します
  2. MLflow トレース ID の使用 - MLflow によって自動的に生成されたトレース ID を使用します

フィードバック収集フローの理解

どちらのアプローチも同様のパターンに従います。

  1. 最初の要求中 : アプリケーションは、一意のクライアント要求 ID を生成するか、MLflow で生成されたトレース ID を取得します

  2. 応答を受け取った後 : ユーザーは、いずれかの ID を参照してフィードバックを提供できます どちらのアプローチも同様のパターンに従います。

  3. 最初の要求中 : アプリケーションは、一意のクライアント要求 ID を生成するか、MLflow で生成されたトレース ID を取得します

  4. 応答を受け取った後 : ユーザーは、いずれかの ID を参照してフィードバックを提供できます

  5. フィードバックがログに記録される : MLflow の log_feedback API は、元のトレースにアタッチされた評価を作成します

  6. 分析とモニタリング : すべてのトレースでフィードバックをクエリおよび分析できます

フィードバック収集の実装

最も簡単な方法は、MLflow がトレースごとに自動的に生成するトレース ID を使用することです。この ID は、要求処理中に取得し、クライアントに返すことができます。

バックエンドの実装

Python
import mlflow
from fastapi import FastAPI, Query
from mlflow.client import MlflowClient
from mlflow.entities import AssessmentSource
from pydantic import BaseModel
from typing import Optional

app = FastAPI()

class ChatRequest(BaseModel):
message: str

class ChatResponse(BaseModel):
response: str
trace_id: str # Include the trace ID in the response

@app.post("/chat", response_model=ChatResponse)
def chat(request: ChatRequest):
"""
Process a chat request and return the trace ID for feedback collection.
"""
# Your GenAI application logic here
response = process_message(request.message) # Replace with your actual processing logic

# Get the current trace ID
trace_id = mlflow.get_current_active_span().trace_id

return ChatResponse(
response=response,
trace_id=trace_id
)

class FeedbackRequest(BaseModel):
is_correct: bool # True for thumbs up, False for thumbs down
comment: Optional[str] = None

@app.post("/feedback")
def submit_feedback(
trace_id: str = Query(..., description="The trace ID from the chat response"),
feedback: FeedbackRequest = ...,
user_id: Optional[str] = Query(None, description="User identifier")
):
"""
Collect user feedback using the MLflow trace ID.
"""
# Log the feedback directly using the trace ID
mlflow.log_feedback(
trace_id=trace_id,
name="user_feedback",
value=feedback.is_correct,
source=AssessmentSource(
source_type="HUMAN",
source_id=user_id
),
rationale=feedback.comment
)

return {
"status": "success",
"trace_id": trace_id,
}

フロントエンドの実装例

以下は、React ベースのアプリケーションのフロントエンド実装の例です。

JavaScript
// React example for chat with feedback
import React, { useState } from 'react';

function ChatWithFeedback() {
const [message, setMessage] = useState('');
const [response, setResponse] = useState('');
const [traceId, setTraceId] = useState(null);
const [feedbackSubmitted, setFeedbackSubmitted] = useState(false);

const sendMessage = async () => {
try {
const res = await fetch('/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message }),
});

const data = await res.json();
setResponse(data.response);
setTraceId(data.trace_id);
setFeedbackSubmitted(false);
} catch (error) {
console.error('Chat error:', error);
}
};

const submitFeedback = async (isCorrect, comment = null) => {
if (!traceId || feedbackSubmitted) return;

try {
const params = new URLSearchParams({
trace_id: traceId,
...(userId && { user_id: userId }),
});

const res = await fetch(`/feedback?${params}`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
is_correct: isCorrect,
comment: comment,
}),
});

if (res.ok) {
setFeedbackSubmitted(true);
// Optionally show success message
}
} catch (error) {
console.error('Feedback submission error:', error);
}
};

return (
<div>
<input value={message} onChange={(e) => setMessage(e.target.value)} placeholder="Ask a question..." />
<button onClick={sendMessage}>Send</button>

{response && (
<div>
<p>{response}</p>
<div className="feedback-buttons">
<button onClick={() => submitFeedback(true)} disabled={feedbackSubmitted}>
👍
</button>
<button onClick={() => submitFeedback(false)} disabled={feedbackSubmitted}>
👎
</button>
</div>
{feedbackSubmitted && Thanks for your feedback!</span>}
</div>
)}
</div>
);
}

主な実装の詳細

AssessmentSource : AssessmentSource オブジェクトは、フィードバックを提供したユーザーまたは内容を識別します。

  • source_type:ユーザーフィードバックの場合は「HUMAN」、自動評価の場合は「LLM_JUDGE」にすることができます
  • source_id: フィードバックを提供する特定のユーザーまたはシステムを識別します

フィードバックの保存 : フィードバックはトレースに評価として保存されます。

  • これは、特定のインタラクションに永続的に関連付けられています
  • トレースデータと一緒にクエリを実行できます
  • これは、トレースを表示するときに MLflow UI に表示されます

さまざまなフィードバックの種類の処理

どちらのアプローチも拡張して、より複雑なフィードバックをサポートできます。トレース ID の使用例を次に示します。

Python
from mlflow.entities import AssessmentSource

@app.post("/detailed-feedback")
def submit_detailed_feedback(
trace_id: str,
accuracy: int = Query(..., ge=1, le=5, description="Accuracy rating from 1-5"),
helpfulness: int = Query(..., ge=1, le=5, description="Helpfulness rating from 1-5"),
relevance: int = Query(..., ge=1, le=5, description="Relevance rating from 1-5"),
user_id: str = Query(..., description="User identifier"),
comment: Optional[str] = None
):
"""
Collect multi-dimensional feedback with separate ratings for different aspects.
Each aspect is logged as a separate assessment for granular analysis.
"""
# Log each dimension as a separate assessment
dimensions = {
"accuracy": accuracy,
"helpfulness": helpfulness,
"relevance": relevance
}

for dimension, score in dimensions.items():
mlflow.log_feedback(
trace_id=trace_id,
name=f"user_{dimension}",
value=score / 5.0, # Normalize to 0-1 scale
source=AssessmentSource(
source_type="HUMAN",
source_id=user_id
),
rationale=comment if dimension == "accuracy" else None
)

return {
"status": "success",
"trace_id": trace_id,
"feedback_recorded": dimensions
}

ストリーミング応答によるフィードバックの処理

ストリーミング応答 (Server-Sent Events または WebSockets) を使用する場合、トレース ID はストリームが完了するまで使用できません。これは、フィードバック収集に独自の課題をもたらし、異なるアプローチを必要とします。

ストリーミングが異なる理由

従来の要求 - 応答パターンでは、完全な応答とトレース ID を一緒に受け取ります。ストリーミングの場合:

  1. トークン は増分的に到着 します: 応答は、LLM からのトークン ストリームとして時間の経過と共に構築されます
  2. トレースの完了が遅延される : トレース ID は、ストリーム全体が終了した後にのみ生成されます
  3. フィードバック UI は待機する必要があります : ユーザーは、完全な応答とトレース ID の両方を取得するまでフィードバックを提供できません

SSEによるバックエンドの実装

ストリームの最後にトレース ID 配信を使用してストリーミングを実装する方法を次に示します。

Python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import mlflow
import json
import asyncio
from typing import AsyncGenerator

@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
"""
Stream chat responses with trace ID sent at completion.
"""
async def generate() -> AsyncGenerator[str, None]:
try:
# Start MLflow trace
with mlflow.start_span(name="streaming_chat") as span:
# Update trace with request metadata
mlflow.update_current_trace(
request_message=request.message,
stream_start_time=datetime.now().isoformat()
)

# Stream tokens from your LLM
full_response = ""
async for token in your_llm_stream_function(request.message):
full_response += token
yield f"data: {json.dumps({'type': 'token', 'content': token})}\n\n"
await asyncio.sleep(0.01) # Prevent overwhelming the client

# Log the complete response to the trace
span.set_attribute("response", full_response)
span.set_attribute("token_count", len(full_response.split()))

# Get trace ID after completion
trace_id = span.trace_id

# Send trace ID as final event
yield f"data: {json.dumps({'type': 'done', 'trace_id': trace_id})}\n\n"

except Exception as e:
# Log error to trace if available
if mlflow.get_current_active_span():
mlflow.update_current_trace(error=str(e))

yield f"data: {json.dumps({'type': 'error', 'error': str(e)})}\n\n"

return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Disable proxy buffering
}
)

ストリーミングのフロントエンド実装

ストリーミング イベントを処理し、トレース ID を受け取った後にのみフィードバックを有効にします。

JavaScript
// React hook for streaming chat with feedback
import React, { useState, useCallback } from 'react';

function useStreamingChat() {
const [isStreaming, setIsStreaming] = useState(false);
const [streamingContent, setStreamingContent] = useState('');
const [traceId, setTraceId] = useState(null);
const [error, setError] = useState(null);

const sendStreamingMessage = useCallback(async (message) => {
// Reset state
setIsStreaming(true);
setStreamingContent('');
setTraceId(null);
setError(null);

try {
const response = await fetch('/chat/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message }),
});

if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}

const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';

while (true) {
const { done, value } = await reader.read();
if (done) break;

buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');

// Keep the last incomplete line in the buffer
buffer = lines.pop() || '';

for (const line of lines) {
if (line.startsWith('data: ')) {
try {
const data = JSON.parse(line.slice(6));

switch (data.type) {
case 'token':
setStreamingContent((prev) => prev + data.content);
break;
case 'done':
setTraceId(data.trace_id);
setIsStreaming(false);
break;
case 'error':
setError(data.error);
setIsStreaming(false);
break;
}
} catch (e) {
console.error('Failed to parse SSE data:', e);
}
}
}
}
} catch (error) {
setError(error.message);
setIsStreaming(false);
}
}, []);

return {
sendStreamingMessage,
streamingContent,
isStreaming,
traceId,
error,
};
}

// Component using the streaming hook
function StreamingChatWithFeedback() {
const [message, setMessage] = useState('');
const [feedbackSubmitted, setFeedbackSubmitted] = useState(false);
const { sendStreamingMessage, streamingContent, isStreaming, traceId, error } = useStreamingChat();

const handleSend = () => {
if (message.trim()) {
setFeedbackSubmitted(false);
sendStreamingMessage(message);
setMessage('');
}
};

const submitFeedback = async (isPositive) => {
if (!traceId || feedbackSubmitted) return;

try {
const response = await fetch(`/feedback?trace_id=${traceId}`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
is_correct: isPositive,
comment: null,
}),
});

if (response.ok) {
setFeedbackSubmitted(true);
}
} catch (error) {
console.error('Feedback submission failed:', error);
}
};

return (
<div className="streaming-chat">
<div className="chat-messages">
{streamingContent && (
<div className="message assistant">
{streamingContent}
{isStreaming && ...</span>}
</div>
)}
{error && <div className="error-message">Error: {error}</div>}
</div>

{/* Feedback buttons - only enabled when trace ID is available */}
{streamingContent && !isStreaming && traceId && (
<div className="feedback-section">
Was this response helpful?</span>
<button onClick={() => submitFeedback(true)} disabled={feedbackSubmitted} className="feedback-btn positive">
👍 Yes
</button>
<button onClick={() => submitFeedback(false)} disabled={feedbackSubmitted} className="feedback-btn negative">
👎 No
</button>
{feedbackSubmitted && Thank you!</span>}
</div>
)}

<div className="chat-input-section">
<input
type="text"
value={message}
onChange={(e) => setMessage(e.target.value)}
onKeyPress={(e) => e.key === 'Enter' && !isStreaming && handleSend()}
placeholder="Type your message..."
disabled={isStreaming}
/>
<button onClick={handleSend} disabled={isStreaming || !message.trim()}>
{isStreaming ? 'Streaming...' : 'Send'}
</button>
</div>
</div>
);
}

ストリーミングに関する主な考慮事項

ストリーミング応答を使用してフィードバック収集を実装する場合は、次の点に注意してください。

  1. トレース ID のタイミング : トレース ID は、ストリーミングが完了した後にのみ使用できます。トレース ID が受信されるまでフィードバック コントロールを無効にすることで、これを適切に処理するように UI を設計します。

  2. イベント構造 : type フィールドを持つ一貫したイベント形式を使用して、コンテンツトークン、完了イベント、およびエラーを区別します。これにより、イベントの解析と処理の信頼性が向上します。

  3. 状態管理 : ストリーミング コンテンツとトレース ID の両方を別々に追跡します。新しいインタラクションの開始時にすべての状態をリセットして、古いデータの問題を防ぎます。

  4. エラー処理 : エラーを適切に処理するために、ストリームにエラー イベントを含めます。デバッグのために可能な場合は、エラーがトレースに記録されるようにします。

  5. バッファ管理 :

    • X-Accel-Buffering: noヘッダーを使用してプロキシバッファリングを無効にする
    • フロントエンドに適切なラインバッファリングを実装して、部分的なSSEメッセージを処理します
    • ネットワーク中断に対する再接続ロジックの実装を検討する
  6. パフォーマンスの最適化 :

    • トークン(asyncio.sleep(0.01))間に小さな遅延を追加して、クライアントの過負荷を防ぎます
    • 複数のトークンが到着が早すぎる場合のバッチ処理
    • 低速クライアントに対するバックプレッシャー メカニズムの実装を検討する

フィードバックデータの分析

フィードバックを収集したら、それを分析して、アプリケーションの品質とユーザーの満足度について理解を深めることができます。

トレースUIでのフィードバックの表示

SDK によるフィードバック付きのトレースの取得

トレースUIでのフィードバックの表示

トレースフィードバック

SDK によるフィードバック付きのトレースの取得

まず、特定の時間枠からトレースを取得します。

Python
from mlflow.client import MlflowClient
from datetime import datetime, timedelta

def get_recent_traces(experiment_name: str, hours: int = 24):
"""Get traces from the last N hours."""
client = MlflowClient()

# Calculate cutoff time
cutoff_time = datetime.now() - timedelta(hours=hours)
cutoff_timestamp_ms = int(cutoff_time.timestamp() * 1000)

# Query traces
traces = client.search_traces(
experiment_names=[experiment_name],
filter_string=f"trace.timestamp_ms > {cutoff_timestamp_ms}"
)

return traces

SDK を使用したフィードバック パターンの分析

トレースからフィードバックを抽出して分析します。

Python
def analyze_user_feedback(traces):
"""Analyze feedback patterns from traces."""

client = MlflowClient()

# Initialize counters
total_traces = len(traces)
traces_with_feedback = 0
positive_count = 0
negative_count = 0

# Process each trace
for trace in traces:
# Get full trace details including assessments
trace_detail = client.get_trace(trace.info.trace_id)

if trace_detail.data.assessments:
traces_with_feedback += 1

# Count positive/negative feedback
for assessment in trace_detail.data.assessments:
if assessment.name == "user_feedback":
if assessment.value:
positive_count += 1
else:
negative_count += 1

# Calculate metrics
if traces_with_feedback > 0:
feedback_rate = (traces_with_feedback / total_traces) * 100
positive_rate = (positive_count / traces_with_feedback) * 100
else:
feedback_rate = 0
positive_rate = 0

return {
"total_traces": total_traces,
"traces_with_feedback": traces_with_feedback,
"feedback_rate": feedback_rate,
"positive_rate": positive_rate,
"positive_count": positive_count,
"negative_count": negative_count
}

# Example usage
traces = get_recent_traces("/Shared/production-genai-app", hours=24)
results = analyze_user_feedback(traces)

print(f"Feedback rate: {results['feedback_rate']:.1f}%")
print(f"Positive feedback: {results['positive_rate']:.1f}%")
print(f"Total feedback: {results['traces_with_feedback']} out of {results['total_traces']} traces")

多次元フィードバックの分析

評価に関する詳細なフィードバックについては、以下をご覧ください。

Python
def analyze_ratings(traces):
"""Analyze rating-based feedback."""

client = MlflowClient()
ratings_by_dimension = {}

for trace in traces:
trace_detail = client.get_trace(trace.info.trace_id)

if trace_detail.data.assessments:
for assessment in trace_detail.data.assessments:
# Look for rating assessments
if assessment.name.startswith("user_") and assessment.name != "user_feedback":
dimension = assessment.name.replace("user_", "")

if dimension not in ratings_by_dimension:
ratings_by_dimension[dimension] = []

ratings_by_dimension[dimension].append(assessment.value)

# Calculate averages
average_ratings = {}
for dimension, scores in ratings_by_dimension.items():
if scores:
average_ratings[dimension] = sum(scores) / len(scores)

return average_ratings

# Example usage
ratings = analyze_ratings(traces)
for dimension, avg_score in ratings.items():
print(f"{dimension}: {avg_score:.2f}/1.0")

本番運用に関する考慮事項

本番運用のデプロイについては、 トレースによる本番運用の可観測性 に関するガイドを参照してください。

  • フィードバック収集エンドポイントの実装
  • クライアント要求 ID を使用したトレースへのフィードバックのリンク
  • リアルタイム 品質 モニタリングの設定
  • 大量のフィードバック処理のベストプラクティス

次のステップ

これらの推奨アクションとチュートリアルで旅を続けてください。

リファレンスガイド

このガイドで説明されている概念と機能の詳細なドキュメントをご覧ください。