MLflow を使用した非会話型 AI エージェント
レガシー ワークフロー : このノートブックでは、モデルサービングを通じてエージェントを展開する従来のアプローチについて説明します。 Databricks 、エージェント コード、サーバー構成、展開ワークフローを完全に制御するために、 Databricks Appsにエージェントを展開することをお勧めします。 推奨される方法については、 「 AIエージェントを作成してDatabricks Appsにデプロイする」を参照してください。
非会話型エージェントは、会話状態を維持せずに構造化された入力を処理して特定の出力を生成します。各リクエストは独立しており自己完結的であるため、これらのエージェントは、ドキュメント分類、データ抽出、バッチ分析、構造化された質問への回答などのタスク固有の操作に最適です。
複数ターンの対話を管理する会話エージェントとは異なり、非会話エージェントは明確に定義されたタスクを効率的に実行することに重点を置いています。この合理化されたアーキテクチャにより、独立したリクエストのスループットが向上します。
以下の方法を学習します:
- 非会話型エージェントを作成する
- 包括的なMLflowトレースと可観測性を実装する
- 自動トレース収集機能を備えたモデルサービングにエージェントをデプロイする
- MLflow 3 スコアラーを使用して本番運用モニタリングを設定する
要件
依存関係:
- MLflow 3.2.0以上
- databricks-agents 1.2.0 以上
- LLM統合用のdatabricks-sdk[openai]
- Python 3.10以上
ワークスペースへのアクセス:
- 基盤モデルAPIsへのアクセス (確実: Claude 4.5 Sonnet、構成可能)
- AIモデルの登録のためのカタログとスキーマへのアクセス
%pip install --upgrade mlflow[databricks]==3.6.0 pydantic databricks-sdk[openai] databricks-agents databricks-sdk
%restart_python
シナリオ例
この例のエージェントは、財務文書の内容に関する構造化された質問を処理し、理由とともに「はい/いいえ」の回答を提供します。この簡略化された例では、ユーザーはドキュメントのテキストと質問の両方を入力に直接指定するため、ベクトル検索インフラストラクチャは不要になります。これは、非会話エージェントが会話コンテキストなしで明確に定義されたタスクをどのように処理できるかを示しています。
追加のツールと機能を統合することで、この例を本番運用のユースケースに拡張できます。 例としては、ドキュメント取得のためのベクトル検索、外部統合のための MCP (モデル コンテキスト プロトコル) ツール、構造化データ アクセスのための Genie などの他の Databricks エージェントなどが挙げられます。
サービスプリンシパルを設定する
非会話型エージェントは、モデルサービングからトレースを書き込むための自動認証パススルーをサポートしていません。 代わりに、カスタムMLflow 3 トレース統合を実装し、サービスプリンシパルを使用して認証を手動で処理する必要があります。
- OAuth資格情報 を使用してサービス プリンシパル を作成し、ワークスペースへのユーザー アクセスを許可します 。
- 資格情報をシークレットスコープに保存します。
# TODO: Configuration constants - After running this cell, set these values for your environment
dbutils.widgets.text("catalog", "main")
dbutils.widgets.text("schema", "default")
dbutils.widgets.text("secret_scope", "")
dbutils.widgets.dropdown("clean_up_resources", "False", ["True", "False"])
CATALOG = dbutils.widgets.get("catalog")
SCHEMA = dbutils.widgets.get("schema")
SECRET_SCOPE = dbutils.widgets.get("secret_scope")
CLEAN_UP_RESOURCES = dbutils.widgets.get("clean_up_resources")
from databricks.sdk import WorkspaceClient
workspace = WorkspaceClient()
DATABRICKS_HOST = workspace.config.host # Your workspace URL
サービスプリンシパルのOAuthクライアント ID とクライアント シークレットをDatabricksシークレットとしてまだ保存していない場合は、次のコードのコメントを解除し、値をサービスプリンシパルの ID とシークレットに置き換えます。 Databricks CLI を使用してシークレットを作成することもできます。
# TODO: Store your service principal ID and secret.
# workspace.secrets.put_secret(SECRET_SCOPE, "client_id", string_value ="<YOUR_SERVICE_PRINCIPAL_CLIENT_ID>")
# workspace.secrets.put_secret(SECRET_SCOPE, "client_secret", string_value ="<YOUR_SERVICE_PRINCIPAL_CLIENT_SECRET>")
MLflowエクスペリメントを構成します。
- エクスペリメントが存在しない場合は作成します。
- サービスプリンシパル
CAN_EDIT権限をエクスペリメントに付与します。
# MLflow experiment to capture traces
EXPERIMENT_NAME = "/Workspace/Shared/non-conversational"
# LLM Configuration
LLM_MODEL = "databricks-claude-sonnet-4-5" # Change this to use different models
# Model and endpoint names - do not need to be changed
MODEL_NAME = "document_analyzer"
ENDPOINT_NAME = "document_analyzer_agent"
REGISTERED_MODEL_NAME = f"{CATALOG}.{SCHEMA}.{MODEL_NAME}"
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.ml import ExperimentAccessControlRequest
from databricks.sdk.service.iam import PermissionLevel
import mlflow
# Set experiment and get the experiment object directly
experiment = mlflow.set_experiment(EXPERIMENT_NAME)
experiment_id = experiment.experiment_id
# Fetch the service principal client_id from secret scope
client_id = dbutils.secrets.get(scope=SECRET_SCOPE, key="client_id")
# Set permissions for the SPN which will later write the traces from the serving endpoint
workspace = WorkspaceClient()
# Set CAN_EDIT permissions for the service principal
workspace.experiments.set_permissions(
experiment_id=experiment_id,
access_control_list=[
ExperimentAccessControlRequest(
service_principal_name=client_id,
permission_level=PermissionLevel.CAN_EDIT
)
]
)
print(f"✓ CAN_EDIT permissions granted to SPN {client_id[:8]}... for experiment: {experiment_id}")
入力および出力形式
柔軟なチャット メッセージ形式を使用する会話型エージェントとは異なり、非会話型エージェントでは、入力と出力に構造化された Pydantic モデルが必要です。
- タスク実行に必要なすべてのフィールドを含む入力スキーマを作成します。
- フィードバック ログを有効にするには、出力スキーマにトレース メタデータ (
trace_id、span_id) を含めます。 - 適切な場合に、詳細な推論や思考の連鎖の説明を提供する出力を設計します。
- 開発中にスキーマを検証し、展開前にエラーを検出します。
入力形式 ( AgentInput )
{
"document_text": "Document content to analyze...",
"questions": [
{ "text": "Do the documents contain a balance sheet?" },
{ "text": "Do the documents contain an income statement?" },
{ "text": "Do the documents contain a cash flow statement?" }
]
}
出力形式 ( AgentOutput )
{
"results": [
{
"question_text": "Do the documents contain a balance sheet?",
"answer": "Yes",
"chain_of_thought": "Detailed reasoning for the answer...",
"span_id": "abc123def456"
}
],
"trace_id": "tr-xyz789abc123"
}
- 構造化入力 : ユーザーは1回のリクエストでドキュメントのテキストと質問の両方を提供します。
- 詳細な推論 :各回答には段階的な思考の連鎖が含まれています
- トレーサビリティ : レスポンスにはフィードバック収集のための
trace_idとspan_idが含まれます
非会話型エージェントを構築する
MLflow トレースを使用して非会話型エージェントを作成します。エージェントは@mlflow.traceデコレータを使用して、LLM 呼び出しと完全なリクエスト フローを自動的にキャプチャし、完全な監視を実現します。
ユーザーは、ドキュメントのテキストと質問の両方を入力に直接入力します。
%%writefile model.py
import json
import logging
from typing import Optional
import uuid
import os
import sys
from databricks_openai import DatabricksOpenAI
import mlflow
from mlflow.pyfunc import PythonModel
from mlflow.tracing import set_destination
from mlflow.entities import SpanType
from mlflow.entities.trace_location import MlflowExperimentLocation
from pydantic import BaseModel, Field
class Question(BaseModel):
"""Represents a question in the input."""
text: str = Field(..., description="Yes/no question about document content")
class AgentInput(BaseModel):
"""Input model for the document analyzer agent."""
document_text: str = Field(..., description="The document text to analyze")
questions: list[Question] = Field(..., description="List of yes/no questions")
class Answer(BaseModel):
"""Represents a structured response from the LLM."""
answer: str = Field(..., description="Yes or No answer")
chain_of_thought: str = Field(..., description="Step-by-step reasoning for the answer")
class AnalysisResult(BaseModel):
"""Represents an analysis result in the output."""
question_text: str = Field(..., description="Original question text")
answer: str = Field(..., description="Yes or No answer")
chain_of_thought: str = Field(..., description="Step-by-step reasoning for the answer")
span_id: str | None = Field(None, description="MLflow span ID for this specific answer (None during offline evaluation)")
class AgentOutput(BaseModel):
"""Output model for the document analyzer agent."""
results: list[AnalysisResult] = Field(..., description="List of analysis results")
trace_id: str | None = Field(None, description="MLflow trace ID for user feedback collection (None during offline evaluation)")
class DocumentAnalyzer(PythonModel):
"""Non-conversational agent for document analysis using MLflow model serving."""
def __init__(self) -> None:
"""Initialize the document analyzer.
Sets up logging configuration, initializes model properties, and prepares
the model for serving.
"""
self._setup_logging()
self.model_name = "document_analyzer_v1"
self.logger.debug(f"Initialized {self.model_name}")
def _setup_logging(self) -> None:
"""Set up logging configuration for Model Serving.
Configures a logger that uses stderr for better visibility in Model Serving
environments. Log level can be controlled via MODEL_LOG_LEVEL environment
variable (defaults to INFO).
"""
self.logger = logging.getLogger("ModelLogger")
# Set log level from environment variable or default to INFO
log_level = os.getenv("MODEL_LOG_LEVEL", "INFO").upper()
self.logger.setLevel(getattr(logging, log_level, logging.INFO))
if not self.logger.handlers:
handler = logging.StreamHandler()
handler.setLevel(getattr(logging, log_level, logging.INFO))
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def load_context(self, context) -> None:
"""Load model context and initialize clients.
This method is called once when the model is loaded in the serving environment.
It sets up MLflow tracing destination and configures the OpenAI-compatible client
for LLM inference.
Args:
context: MLflow model context containing artifacts and configuration
"""
self.logger.debug("Loading model context")
set_destination(MlflowExperimentLocation(experiment_id=os.getenv("MONITORING_EXPERIMENT_ID")))
# You can load any artifacts here if needed
# self.artifacts = context.artifacts
self.logger.debug("Instantiate openai client")
# Get an OpenAI-compatible client configured for Databricks serving endpoints
self.openai_client = DatabricksOpenAI()
@mlflow.trace(name="answer_question", span_type=SpanType.LLM)
def answer_question(self, question_text: str, document_text: str) -> tuple[object, str | None]:
"""Answer a question using LLM with structured response format.
Uses the OpenAI-compatible client to call a language model with a structured
JSON response format. The LLM analyzes the provided document text and returns
a yes/no answer with reasoning.
Args:
question_text (str): The yes/no question to answer about the document
document_text (str): The document text to analyze
Returns:
tuple: (openai.ChatCompletion, str|None) - LLM response and span_id
"""
# Create a chat completion request with structured response for questions
question_prompt = f"""
You are a document analysis expert. Answer the following yes/no question based on the provided document.
Question: "{question_text}"
Document:
{document_text}
Analyze the document and provide a structured response.
"""
# Create a separate sub-span for the actual OpenAI API call
llm_response = self._call_openai_completion(question_prompt)
# Get the current span ID for this specific answer
current_span = mlflow.get_current_active_span()
span_id = current_span.span_id if current_span is not None else None
return llm_response, span_id
@mlflow.trace(name="openai_completion", span_type=SpanType.LLM)
def _call_openai_completion(self, prompt: str):
"""Make the actual OpenAI API call with its own sub-span.
Args:
prompt (str): The formatted prompt to send to the LLM
Returns:
OpenAI ChatCompletion response
"""
return self.openai_client.chat.completions.create(
model=os.getenv("LLM_MODEL", "databricks-claude-sonnet-4-5"), # Configurable LLM model
messages=[
{
"role": "user",
"content": prompt
}
],
response_format={
"type": "json_schema",
"json_schema": {
"name": "question_response",
"schema": Answer.model_json_schema()
}
}
)
@mlflow.trace(name="document_analysis")
def predict(self, context, model_input: list[AgentInput]) -> list[AgentOutput]:
"""Process document analysis questions with yes/no answers.
Args:
context: MLflow model context
model_input: List of structured inputs containing document text and questions
Returns:
List of AgentOutput with yes/no answers and reasoning
"""
self.logger.debug(f"Processing {len(model_input)} classification request(s)")
# Get the current trace ID for user feedback collection
# Will be None during offline evaluation when no active span exists
current_span = mlflow.get_current_active_span()
trace_id = current_span.trace_id if current_span is not None else None
results = []
for input_data in model_input:
self.logger.debug(f"Number of questions: {len(input_data.questions)}")
self.logger.debug(f"Document length: {len(input_data.document_text)} characters")
analysis_results = []
for question in input_data.questions:
self.logger.debug(f"Processing question: {question.text}")
# Answer the question using LLM with structured response
llm_response, answer_span_id = self.answer_question(question.text, input_data.document_text)
# Parse structured JSON response
try:
response_data = json.loads(llm_response.choices[0].message.content)
answer_obj = Answer(**response_data)
except Exception as e:
self.logger.debug(f"Failed to parse structured response: {e}")
# Fallback to default response
answer_obj = Answer(
answer="No",
chain_of_thought="Unable to process the question due to parsing error."
)
analysis_results.append(AnalysisResult(
question_text=question.text,
answer=answer_obj.answer,
chain_of_thought=answer_obj.chain_of_thought,
span_id=answer_span_id
))
self.logger.debug(f"Generated {len(analysis_results)} analysis results")
results.append(AgentOutput(
results=analysis_results,
trace_id=trace_id
))
return results
mlflow.models.set_model(DocumentAnalyzer())
エージェントをログに登録する
エージェントをサービスエンドポイントにデプロイする前に、エージェントをMLflowエクスペリメントにログ記録し、 Unity Catalogに登録する必要があります。
import os
import mlflow
import json
from mlflow.pyfunc import PythonModel
from pydantic import BaseModel, Field
from model import DocumentAnalyzer, AgentInput, Question
# Create example input for signature inference
def create_example_input() -> AgentInput:
"""Create example input for the non-conversational agent."""
return AgentInput(
document_text="Total assets: $2,300,000. Total liabilities: $1,200,000. Shareholder's equity: $1,100,000. Net income for the period was $450,000. Revenues: $1,700,000. Expenses: $1,250,000. Net cash provided by operating activities: $80,000. Cash flows from investing activities: -$20,000",
questions=[
Question(text="Do the documents contain a balance sheet?"),
Question(text="Do the documents contain an income statement?"),
Question(text="Do the documents contain a cash flow statement?"),
],
)
input_example = create_example_input()
with mlflow.start_run(run_name="deploy_non_conversational_agent"):
active_run = mlflow.active_run()
current_experiment_id = active_run.info.experiment_id
# Set environment variables for the model using current notebook experiment
os.environ["MONITORING_EXPERIMENT_ID"] = current_experiment_id
print(
f"✓ Using current notebook experiment ID for tracing: {current_experiment_id}"
)
# Log the non-conversational agent with auto-inferred dependencies
model_info = mlflow.pyfunc.log_model(
name=MODEL_NAME,
python_model="model.py", # Path to the model code file
input_example=[create_example_input().model_dump()],
registered_model_name=REGISTERED_MODEL_NAME,
)
# Set logged model as current active model to associate it with the below evaluation results
mlflow.set_active_model(model_id=mlflow.last_logged_model().model_id)
print(f"✓ Model logged and registered: {REGISTERED_MODEL_NAME}")
print(f"✓ Model version: {model_info.registered_model_version}")
エージェントを評価する
本番運用にデプロイする前に、事前に構築されたスコアラーを備えたMLflowの GenAI 評価フレームワークを使用してエージェントのパフォーマンスを評価します。 一部のスコアラーでは、グラウンドトゥルース データセットが必要です。
import mlflow
import mlflow.genai.datasets
from requests import HTTPError
# Create an evaluation dataset in Unity Catalog
uc_schema = f"{CATALOG}.{SCHEMA}"
evaluation_dataset_table_name = "document_analyzer_eval"
try:
# Try to create a new evaluation dataset
eval_dataset = mlflow.genai.datasets.create_dataset(
name=f"{uc_schema}.{evaluation_dataset_table_name}",
)
print(f"✓ Created evaluation dataset: {uc_schema}.{evaluation_dataset_table_name}")
except HTTPError as e:
# Check if it's a TABLE_ALREADY_EXISTS error
if e.response.status_code == 400 and "TABLE_ALREADY_EXISTS" in str(e):
print(
f"Dataset {uc_schema}.{evaluation_dataset_table_name} already exists, loading existing dataset..."
)
eval_dataset = mlflow.genai.datasets.get_dataset(
name=f"{uc_schema}.{evaluation_dataset_table_name}"
)
print(
f"✓ Loaded existing evaluation dataset: {uc_schema}.{evaluation_dataset_table_name}"
)
else:
# Different HTTP error, re-raise
raise
# Define comprehensive test cases with expected facts for ground truth comparison
sample_document = "Total assets: $2,300,000. Total liabilities: $1,200,000. Shareholder's equity: $1,100,000. Net income for the period was $450,000. Revenues: $1,700,000. Expenses: $1,250,000. Net cash provided by operating activities: $80,000. Cash flows from investing activities: -$20,000"
evaluation_examples = [
{
"inputs": {
"document_text": sample_document,
"questions": [{"text": "Do the documents contain a balance sheet?"}],
},
"expectations": {
"expected_facts": [
"answer is Yes",
"balance sheet information",
"total assets mentioned",
"total liabilities mentioned",
"shareholder's equity mentioned",
]
},
},
{
"inputs": {
"document_text": sample_document,
"questions": [{"text": "Do the documents contain an income statement?"}],
},
"expectations": {
"expected_facts": [
"answer is Yes",
"income statement information",
"net income mentioned",
"revenues mentioned",
"expenses mentioned",
]
},
},
{
"inputs": {
"document_text": sample_document,
"questions": [{"text": "Do the documents contain a cash flow statement?"}],
},
"expectations": {
"expected_facts": [
"answer is Yes",
"cash flow information",
"operating activities mentioned",
"investing activities mentioned",
"cash flows mentioned",
]
},
},
{
"inputs": {
"document_text": sample_document,
"questions": [
{
"text": "Do the documents contain information about employee benefits?"
}
],
},
"expectations": {
"expected_facts": [
"answer is No",
"no employee benefits information",
"financial statements focus",
"no HR-related content",
]
},
},
]
# Add the examples to the evaluation dataset
eval_dataset.merge_records(evaluation_examples)
print(f"✓ Added {len(evaluation_examples)} records to evaluation dataset")
# Preview the dataset
df = eval_dataset.to_df()
print(f"✓ Dataset preview - Total records: {len(df)}")
df.display()
import warnings
import mlflow
from mlflow.genai.scorers import (
RelevanceToQuery,
Correctness,
Guidelines,
)
# Suppress harmless threadpoolctl warnings that can appear in Databricks environments
warnings.filterwarnings("ignore", message=".*threadpoolctl.*")
warnings.filterwarnings("ignore", category=UserWarning, module="threadpoolctl")
# Load the logged model for evaluation
model_uri = f"models:/{REGISTERED_MODEL_NAME}/{model_info.registered_model_version}"
print(f"Loading model for evaluation: {model_uri}")
# Load the model as a predict function
loaded_model = mlflow.pyfunc.load_model(model_uri)
def my_app(document_text, questions):
"""Wrapper function for the model prediction."""
# The evaluation dataset's inputs field contains {"document_text": "...", "questions": [...]}
# but the predict_fn parameter names must match the keys in inputs
input_data = {"document_text": document_text, "questions": questions}
return loaded_model.predict([input_data])
# Define scorers for evaluation including ground truth comparison
correctness_scorer = Correctness() # Compares against expected_facts
relevance_scorer = RelevanceToQuery() # Evaluates relevance of response to question
response_schema_scorer = Guidelines(
name="response_schema",
guidelines="The response must be structured JSON with an 'answer' field containing 'Yes' or 'No' and a 'chain_of_thought' field with clear reasoning. There also needs to be a 'question_text' field that contains the question that was asked. All these fields are part of the 'results' array field.",
) # Validates structured output format
# This creates an evaluation run using the MLflow-managed dataset
results = mlflow.genai.evaluate(
data=eval_dataset, # Use the MLflow-managed dataset
predict_fn=my_app,
scorers=[
correctness_scorer,
relevance_scorer,
response_schema_scorer,
],
)
# Access the run ID
print(f"✓ Evaluation completed")
print(f"Evaluation run ID: {results.run_id}")
# Display evaluation results summary
if hasattr(results, "metrics") and results.metrics:
print("\n📊 Evaluation Results Summary:")
for metric_name, metric_value in results.metrics.items():
if isinstance(metric_value, (int, float)):
print(f" • {metric_name}: {metric_value:.3f}")
else:
print(f" • {metric_name}: {metric_value}")
else:
print("✓ Evaluation completed - view detailed results in the evaluation experiment")
# Display link to the evaluation dataset
print(f"\n📊 Evaluation Dataset: {uc_schema}.{evaluation_dataset_table_name}")
print(f"🔗 View dataset in Unity Catalog Data Explorer")
モデルサービングへのデプロイ
MLflow 3 トレースに必要な環境変数を使用して、評価されたエージェントをモデルサービング エンドポイントにデプロイします。 これにより、すべての本番運用リクエストが自動的に追跡され、指定されたMLflowエクスペリメントに記録されるようになります。
import mlflow
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.serving import (
ServedEntityInput,
ServingModelWorkloadType,
EndpointCoreConfigInput,
)
from model import DocumentAnalyzer, AgentInput, Question
workspace = WorkspaceClient()
# Use the model version from the logged model
model_version = model_info.registered_model_version
print(f"Using model version: {model_version}")
new_entity = ServedEntityInput(
entity_name=REGISTERED_MODEL_NAME,
entity_version=model_version,
name=f"{MODEL_NAME}-{model_version}",
workload_size="Small",
workload_type=ServingModelWorkloadType.CPU,
scale_to_zero_enabled=True,
environment_vars={
"DATABRICKS_CLIENT_ID": f"{secrets/{SECRET_SCOPE}/client_id}",
"DATABRICKS_CLIENT_SECRET": f"{secrets/{SECRET_SCOPE}/client_secret}",
"DATABRICKS_HOST": DATABRICKS_HOST,
"MLFLOW_TRACKING_URI": "databricks",
"MONITORING_EXPERIMENT_ID": current_experiment_id,
"MODEL_LOG_LEVEL": "INFO",
"LLM_MODEL": LLM_MODEL,
},
)
# Check if endpoint exists and create or update accordingly
try:
# Try to get the existing endpoint
existing_endpoint = workspace.serving_endpoints.get(ENDPOINT_NAME)
print(
f"Endpoint {ENDPOINT_NAME} exists, updating with model version {model_version}"
)
# Update existing endpoint with new model version
workspace.serving_endpoints.update_config(
name=ENDPOINT_NAME, served_entities=[new_entity]
)
print("Endpoint update initiated, waiting for completion...")
# Wait for update to complete
workspace.serving_endpoints.wait_get_serving_endpoint_not_updating(ENDPOINT_NAME)
print("Endpoint updated successfully and is ready")
except Exception as e:
# Endpoint does not exist or you do not have permissions. Try to create it.
print(f"Endpoint {ENDPOINT_NAME} does not exist or you lack permissions. Trying to create new endpoint...")
workspace.serving_endpoints.create(
name=ENDPOINT_NAME,
config=EndpointCoreConfigInput(name=ENDPOINT_NAME, served_entities=[new_entity]),
)
print("Endpoint creation initiated, waiting for completion...")
# Wait for creation to complete
workspace.serving_endpoints.wait_get_serving_endpoint_not_updating(ENDPOINT_NAME)
print("Endpoint created successfully and is ready")
# Final status check
endpoint_status = workspace.serving_endpoints.get(ENDPOINT_NAME)
print(f"Final endpoint status: {endpoint_status.state}")
print(
f"Endpoint URL: https://{DATABRICKS_HOST.replace('https://', '')}/serving-endpoints/{ENDPOINT_NAME}/invocations"
)
スコアラーを使用して本番運用モニタリングを設定する
MLflow 3 スコアラーを使用して本番運用トラフィックの自動品質評価を構成します。 スコアラーは本番運用リクエストから記録されたトレースを自動的に分析し、継続的な品質モニタリングを提供します。
from mlflow.genai.scorers import (
RelevanceToQuery,
Guidelines,
ScorerSamplingConfig,
list_scorers,
get_scorer,
)
# Set the active experiment for scoring (use the current notebook's experiment)
print(f"Setting experiment to: {current_experiment_id}")
mlflow.set_experiment(experiment_id=current_experiment_id)
# Verify the experiment is set correctly
current_experiment = mlflow.get_experiment(current_experiment_id)
print(
f"Current experiment: {current_experiment.name} (ID: {current_experiment.experiment_id})"
)
# Setup scorers for production monitoring
print("Setting up production monitoring scorers...")
# Relevance scorer - always create new to avoid conflicts
relevance_scorer = RelevanceToQuery().register(name="financial_relevance_check")
relevance_scorer = relevance_scorer.start(
sampling_config=ScorerSamplingConfig(sample_rate=0.5)
)
print("✅ Created relevance scorer (50% sampling)")
# Guidelines scorer for response schema validation
response_schema_scorer = Guidelines(
name="response_schema",
guidelines="The response must be structured JSON with an 'answer' field containing 'Yes' or 'No' and a 'chain_of_thought' field with clear reasoning.",
).register(name="response_schema_check")
response_schema_scorer = response_schema_scorer.start(
sampling_config=ScorerSamplingConfig(sample_rate=0.4)
)
print("✅ Created response schema scorer (40% sampling)")
# List all active scorers
print(f"\nActive Scorers in Experiment {current_experiment_id}:")
scorers = list_scorers()
for scorer in scorers:
print(f"• {scorer.name}: {scorer.sample_rate*100}% sampling")
デプロイしたエージェントをテストする
サンプル入力を使用してデプロイされたエージェントをテストします。各リクエストは完全なリクエスト フローをキャプチャするMLflow 3 トレースを自動的に生成し、本番運用スコアラーは品質モニタリングのためにこれらのトレースを評価します。
from databricks.sdk import WorkspaceClient
# Test the non-conversational agent endpoint using Databricks SDK
workspace = WorkspaceClient()
# Example payload with structured input for the non-conversational agent
test_input = {
"inputs": [
{
"document_text": "Total assets: $2,300,000. Total liabilities: $1,200,000. Shareholder's equity: $1,100,000. Net income for the period was $450,000. Revenues: $1,700,000. Expenses: $1,250,000. Net cash provided by operating activities: $80,000. Cash flows from investing activities: -$20,000",
"questions": [
{"text": "Do the documents contain a balance sheet?"},
{"text": "Do the documents contain an income statement?"},
{"text": "Do the documents contain a cash flow statement?"},
],
}
]
}
# Query the serving endpoint using the workspace client
response = workspace.serving_endpoints.query(
name=ENDPOINT_NAME, inputs=test_input["inputs"]
)
print("Endpoint Response:")
print(response.as_dict())
# Generate MLflow experiment URL
experiment_url = f"{DATABRICKS_HOST}/ml/experiments/{current_experiment_id}"
print(f"\nMLflow Experiment URL: {experiment_url}")
ユーザーフィードバックを記録する
非会話型エージェントの場合でも、継続的な改善にはユーザーからのフィードバックを収集することが重要です。ユーザー向けのフロントエンド アプリケーションでは、エージェントが提供する個々の回答をユーザーが受け入れたり拒否したりすることができます。このフィードバックは、応答に含まれるtrace_idとspan_idを使用して MLflow に記録できます。
非会話型エージェントの一般的なフィードバック シナリオ:
- 正確さのフィードバック :「このはい/いいえの回答は正しかったですか?」
- 関連性フィードバック :「質問に対する推論は適切でしたか?」
- 品質フィードバック :「裏付けとなる証拠は十分でしたか?」
- エラー報告 :「エージェントはドキュメントの内容を誤解しましたか?」
次のセルは、応答で返されるspan_idを使用して、個々の回答に対するフィードバックを記録する方法を示しています。
import mlflow
from mlflow.entities import AssessmentSource
import time
# Wait 1 second to make sure the trace is ready for feedback logging
time.sleep(1)
# Get the response from the previous test (extract span_id from first result)
# In a real application, this would come from the API response
response_dict = response.as_dict()
first_prediction = response_dict["predictions"][0]
first_result = first_prediction["results"][0]
# Assert we have the required IDs for feedback logging
assert (
first_result.get("span_id") is not None
), "span_id is required for feedback logging"
assert (
first_prediction.get("trace_id") is not None
), "trace_id is required for feedback logging"
span_id = first_result["span_id"]
trace_id = first_prediction["trace_id"]
question_text = first_result["question_text"]
answer = first_result["answer"]
print(f"Logging feedback for question: '{question_text}'")
print(f"Agent answer: {answer}")
print(f"Span ID: {span_id}")
print(f"Trace ID: {trace_id}")
try:
# Example: User provides positive feedback on this specific answer
mlflow.log_feedback(
trace_id=trace_id,
span_id=span_id,
name="user_feedback",
value=True, # True for positive, False for negative
source=AssessmentSource(source_type="HUMAN"),
rationale="Answer was accurate and well-reasoned",
)
print("✅ Feedback logged successfully!")
except Exception as e:
print(f"Note: Could not log feedback in this environment: {e}")
リソースをクリーンアップする
import os
from mlflow import MlflowClient
from mlflow.genai.scorers import delete_scorer
from mlflow.deployments import get_deploy_client
if CLEAN_UP_RESOURCES:
try:
# Uncomment the lines below to delete the Model Serving endpoint:
client = get_deploy_client("databricks")
client.delete_endpoint(endpoint=ENDPOINT_NAME)
# Model registered to UC
client = MlflowClient()
model_versions = client.search_model_versions(f"name='{REGISTERED_MODEL_NAME}'")
for mv in model_versions:
client.delete_model_version(name=REGISTERED_MODEL_NAME, version=mv.version)
client.delete_registered_model(name=REGISTERED_MODEL_NAME)
# Evaluation dataset registered to UC
mlflow.genai.datasets.delete_dataset(eval_dataset.name)
# Model file in workspace
os.remove(os.path.join(os.getcwd(), "model.py"))
# Scorers registered to MLflow experiment
delete_scorer(name="financial_relevance_check")
delete_scorer(name="response_schema_check")
except Exception as e:
print(f"Failed to clean up all resources. Error: {e}")
次のステップ
- エージェントにツールを追加して機能を拡張する方法を学びます
- 高度な可観測性機能については、 MLflow 3 トレースのドキュメントを確認してください。
- 本番運用モニタリングのドキュメント