コードで AI エージェントを作成する
この記事では、Mosaic AI Agent Framework と、LangGraph、PyFunc、OpenAI などの一般的なエージェントオーサリングライブラリを使用して、Python で AI エージェントを作成する方法について説明します。
必要条件
Databricks では、エージェントを開発するときに最新バージョンの MLflow Python クライアントをインストールすることをお勧めします。
この記事のアプローチを使用してエージェントを作成およびデプロイするには、次の最小パッケージ バージョンが必要です。
databricks-agents
バージョン0.16.0以降mlflow
バージョン 2.20.2 以降- Python 3.10 以降。この要件を満たすには、サーバレス コンピュートまたは Databricks Runtime 13.3 LTS 以降を使用できます。
%pip install -U -qqqq databricks-agents>=0.16.0 mlflow>=2.20.2
Databricks では、エージェントの作成時に Databricks AI Bridge 統合パッケージをインストールすることもお勧めします。 これらの統合パッケージ (databricks-langchain
、databricks-openai
など)は、エージェント作成フレームワークやSDKにおけるDatabricks AI/BI Genie、Vector Searchなどの 機能と対話するためのAPIの共有レイヤーを提供します。
- LangChain/LangGraph
- OpenAI
- Pure Python agents
%pip install -U -qqqq databricks-langchain
%pip install -U -qqqq databricks-openai
%pip install -U -qqqq databricks-ai-bridge
ChatAgent
を使用してエージェントを作成する
Databricksでは本番運用グレードのエージェントを作成するには、MLflowの ChatAgent
インターフェースを使用することをお勧めします。 このチャットスキーマ仕様は、エージェントのシナリオ向けに設計されており、OpenAI ChatCompletion
スキーマと似ていますが、厳密には互換性がありません。 また、ChatAgent
ではマルチターンのツールコールエージェントの機能も追加されています。
ChatAgent
を使用してエージェントを作成すると、次の利点があります。
-
高度なエージェント機能
- ストリーミング 出力: 小さなチャンクで出力をストリーミングすることで、対話型のユーザー エクスペリエンスを実現します。
- 包括的なツール呼び出しメッセージ履歴 : 中間ツール呼び出しメッセージを含む複数のメッセージを返し、品質と会話管理を向上させます。
- ツールコール確認支援
- マルチエージェントシステムのサポート
-
開発、デプロイ、モニタリングの効率化
- フレームワークに依存しない Databricks 機能の統合 :任意のフレームワークでエージェントを作成し、 AI Playground、エージェント評価、エージェントモニタリングとすぐに互換性を持たせることができます。
- 型付きオーサリングインターフェイス :型付きPythonクラスを使用してエージェントコードを記述し、IDEとノートブックのオートコンプリートの恩恵を受けます。
- シグネチャの自動推論 : MLflow は、エージェントのログ記録時に
ChatAgent
シグネチャを自動的に推論するため、登録とデプロイが簡素化されます。 ログ記録中のモデルのシグネチャの推論を参照してください。 - AI Gateway 拡張推論テーブル : AI Gateway 推論テーブルは、デプロイされたエージェントに対して自動的に有効になり、詳細なリクエストログメタデータにアクセスできます。
ChatAgent
の作成方法については、次のセクションの例と MLflow のドキュメント - ChatAgent インターフェイスとはを参照してください。
ChatAgent
の例
次のノートブックでは、一般的なライブラリである OpenAI と LangGraph を使用して、ストリーミングと非ストリーミングの ChatAgents
を作成する方法を示します。
LangGraphツールコールエージェント
OpenAIツールコールエージェント
OpenAI Responses API ツール呼び出しエージェント
OpenAIチャット専用エージェント
ツールを追加してこれらのエージェントの機能を拡張する方法については、 AI エージェント ツールを参照してください。
マルチエージェントシステム ChatAgent
例
Genie を使用してマルチエージェント・システムを作成する方法については、 マルチエージェント・システムでの Genie の使用を参照してください。
ストリーミング出力エージェントの作成
ストリーミング エージェントは、より小さな増分チャンクの連続ストリームで応答を配信します。ストリーミング出力により、エンドユーザーはエージェント出力が生成されたときに読み取ることができるため、知覚されるレイテンシーが短縮され、会話型エージェントの全体的なユーザーエクスペリエンスが向上します。
ストリーミングChatAgent
を作成するには、ChatAgentChunk
オブジェクトを生成するジェネレータを返す predict_stream
メソッドを定義します。各オブジェクトにはレスポンスの一部が含まれます。理想的な ChatAgent
ストリーミング動作の詳細については、 MLflow のドキュメントを参照してください。
次のコードは predict_stream
関数の例を示しています。ストリーミング エージェントの完全な例については、「 ChatAgent の例」を参照してください。
def predict_stream(
self,
messages: list[ChatAgentMessage],
context: Optional[ChatContext] = None,
custom_inputs: Optional[dict[str, Any]] = None,
) -> Generator[ChatAgentChunk, None, None]:
# Convert messages to a format suitable for your agent
request = {"messages": self._convert_messages_to_dict(messages)}
# Stream the response from your agent
for event in self.agent.stream(request, stream_mode="updates"):
for node_data in event.values():
# Yield each chunk of the response
yield from (
ChatAgentChunk(**{"delta": msg}) for msg in node_data["messages"]
)
Databricks モデルサービングにデプロイメントできるChatAgent
の開発
Databricks は、 Databricks モデルサービングの分散環境にChatAgent
をデプロイするため、マルチターンの会話中に、同じサービングレプリカがすべてのリクエストを処理できない場合があります。 エージェントの状態の管理には、次の影響に注意してください。
-
ローカル キャッシュの回避 :
ChatAgent
をデプロイするときは、同じレプリカが複数ターンの会話のすべての要求を処理すると想定しないでください。 各ターンのディクショナリChatAgentRequest
スキーマを使用して内部状態を再構築します。 -
スレッドセーフな状態 : エージェントの状態をスレッドセーフに設計し、マルチスレッド環境での競合を防ぎます。
-
predict
関数で状態を初期化 する:predict
関数が呼び出されるたびに、ChatAgent
初期化中ではなく、状態を初期化します。ChatAgent
レベルで状態を格納すると、1 つのChatAgent
レプリカが複数のメッセージ交換からの要求を処理できるため、会話間で情報が漏洩し、競合が発生する可能性があります。
カスタム入力と出力
一部のシナリオでは、 client_type
や session_id
などの追加のエージェント入力や、将来の対話のためにチャット履歴に含めるべきではない取得ソースリンクなどの出力が必要になる場合があります。
これらのシナリオでは、MLflow ChatAgent
は custom_inputs
フィールドと custom_outputs
フィールドをネイティブにサポートします。
Agent Evaluation レビュー アプリは、現在、追加の入力フィールドを持つエージェントのトレースのレンダリングをサポートしていません。
OpenAI/PyFuncおよびLangGraphエージェントのカスタム入力と出力を設定する方法については、次の例を参照してください。
OpenAI + PyFuncカスタムスキーマエージェントノートブック
LangGraph カスタム スキーマ エージェント ノートブック
AI Playground とエージェントレビューアプリで custom_inputs
を提供する
エージェントが custom_inputs
フィールドを使用して追加の入力を受け入れる場合は、 AI Playground と エージェントレビューアプリの両方でこれらの入力を手動で提供できます。
-
AI Playground または Agent Review App で、歯車アイコン
を選択します。
-
custom_inputs を有効にします。
-
エージェントの定義済み入力スキーマに一致するJSONオブジェクトを提供します。
カスタム取得者スキーマの指定
AIエージェントは一般的に、レトリーバーを使用して、ベクトル検索インデックスから非構造化データを検索し、クエリを実行します。たとえば、取得ツールについては、非構造化取得 AI エージェント ツールを参照してください。
MLflow RETRIEVER スパンを使用してエージェント内でこれらのレトリーバーをトレースし、次のような Databricks 製品の機能を有効にします。
- 取得したソース ドキュメントへのリンクを AI Playground UI に自動的に表示する
- Agent Evaluation で自動的に実行される検索の根拠と関連性のジャッジ
Databricks では、 databricks_langchain.VectorSearchRetrieverTool
や databricks_openai.VectorSearchRetrieverTool
などの Databricks AI Bridge パッケージによって提供されるレトリーバー ツールの使用をお勧めします。これは、既に MLflow レトリーバー スキーマに準拠しているためです。 AI Bridge を使用したベクトル検索取得ツールをローカルで開発するを参照してください。
エージェントがカスタムスキーマを持つレトリーバースパンを含んでいる場合は、 mlflow.models.set_retriever_schemaエージェントをコードで定義する場合。これにより、レトリーバーの出力列が MLflow の想定フィールド (primary_key
、 text_column
、 doc_uri
) にマップされます。
import mlflow
# Define the retriever's schema by providing your column names
# For example, the following call specifies the schema of a retriever that returns a list of objects like
# [
# {
# 'document_id': '9a8292da3a9d4005a988bf0bfdd0024c',
# 'chunk_text': 'MLflow is an open-source platform, purpose-built to assist machine learning practitioners...',
# 'doc_uri': 'https://mlflow.org/docs/latest/index.html',
# 'title': 'MLflow: A Tool for Managing the Machine Learning Lifecycle'
# },
# {
# 'document_id': '7537fe93c97f4fdb9867412e9c1f9e5b',
# 'chunk_text': 'A great way to get started with MLflow is to use the autologging feature. Autologging automatically logs your model...',
# 'doc_uri': 'https://mlflow.org/docs/latest/getting-started/',
# 'title': 'Getting Started with MLflow'
# },
# ...
# ]
mlflow.models.set_retriever_schema(
# Specify the name of your retriever span
name="mlflow_docs_vector_search",
# Specify the output column name to treat as the primary key (ID) of each retrieved document
primary_key="document_id",
# Specify the output column name to treat as the text content (page content) of each retrieved document
text_column="chunk_text",
# Specify the output column name to treat as the document URI of each retrieved document
doc_uri="doc_uri",
# Specify any other columns returned by the retriever
other_columns=["title"],
)
doc_uri
列は、レトリーバーのパフォーマンスを評価するときに特に重要です。doc_uri
は、レトリーバーによって返されるドキュメントの主な識別子であり、それらをグラウンド トゥルース評価セットと比較できます。 評価セットを参照してください。
環境間でのデプロイのためのエージェントコードのパラメータ化
エージェント・コードをパラメーター化して、異なる環境で同じエージェント・コードを再利用できます。
パラメータは、Pythonディクショナリーまたは.yaml
ファイルで定義するキーと値のペアです。
コードを構成するには、Python ディクショナリまたは .yaml
ファイルを使用してModelConfig
を作成します。ModelConfig
は、柔軟な構成管理を可能にするキーと値のパラメーターのセットです。 たとえば、開発中にディクショナリを使用し、それを本番運用デプロイメントおよび CI/CD用の.yaml
ファイルに変換できます。
ModelConfig
の詳細については、MLflow のドキュメントを参照してください。
ModelConfig
の例を以下に示します。
llm_parameters:
max_tokens: 500
temperature: 0.01
model_serving_endpoint: databricks-dbrx-instruct
vector_search_index: ml.docs.databricks_docs_index
prompt_template: 'You are a hello world bot. Respond with a reply to the user''s
question that indicates your prompt template came from a YAML file. Your response
must use the word "YAML" somewhere. User''s question: {question}'
prompt_template_input_vars:
- question
エージェントコードでは、 .yaml
ファイルまたはディクショナリからデフォルト(開発)設定を参照できます。
import mlflow
# Example for loading from a .yml file
config_file = "configs/hello_world_config.yml"
model_config = mlflow.models.ModelConfig(development_config=config_file)
# Example of using a dictionary
config_dict = {
"prompt_template": "You are a hello world bot. Respond with a reply to the user's question that is fun and interesting to the user. User's question: {question}",
"prompt_template_input_vars": ["question"],
"model_serving_endpoint": "databricks-dbrx-instruct",
"llm_parameters": {"temperature": 0.01, "max_tokens": 500},
}
model_config = mlflow.models.ModelConfig(development_config=config_dict)
# Use model_config.get() to retrieve a parameter value
# You can also use model_config.to_dict() to convert the loaded config object
# into a dictionary
value = model_config.get('sample_param')
次に、エージェントをログに記録するときに、log_model
するパラメーターmodel_config
を指定します
ログに記録されたエージェントをロードするときに使用するパラメーターのカスタムセットを指定します。MLflow のドキュメント - ModelConfigをご覧ください。
ストリーミング エラーの伝播
Mosaic AI は、ストリーミング中に発生したエラーを伝播し、最後のトークンを databricks_output.error
未満にします。 このエラーを適切に処理して表示するかどうかは、呼び出し元のクライアント次第です。
{
"delta": …,
"databricks_output": {
"trace": {...},
"error": {
"error_code": BAD_REQUEST,
"message": "TimeoutException: Tool XYZ failed to execute."
}
}
}
ユーザー代理認証
ベータ版
この機能は ベータ版です。
ユーザー認証は、機密データ アクセスへの安全なアクセスを強制するための強力なツールですが、ワークスペース ユーザーは、Databricks で他のユーザーに代わって行動するエージェントを作成できます。そのため、ベータ版ではデフォルトで無効になっており、ワークスペース管理者が有効にする必要があります。この機能を有効にする前に、ユーザー代理認証のセキュリティ に関する考慮事項 を確認してください。
ユーザー代理認証では、 Mosaic AI モデルサービングを介してデプロイされたエージェントは、エージェント Databricks クエリを実行した Databricks エンドユーザーのIDを使用してリソースにアクセスできます。 これにより、ユーザーごとに機密情報にアクセスできるようになり、Unity Catalog でのデータ アクセス制御がきめ細かく実施されます。
On-behalf-of-user認証は、ダウンスコープによって受信ユーザートークンをさらに制限し、エージェントコードに公開されるトークンがエージェント作成者によって定義された特定の APIs のみにアクセスするように制限されるようにします。 これにより、不正なアクションを防ぎ、トークンの誤用のリスクを減らすことで、セキュリティが向上します。
エージェントを作成するときは、既存の SDK を引き続き使用して、ベクトル検索インデックスなどの Databricks リソースにアクセスできます。リソースへのユーザーアクセスの代わりに実行するには、次の 2 つの手順があります。
- エージェント コードで、SDK 呼び出しを更新して、エージェント エンド ユーザーに代わってリソースにアクセスする必要があることを示します
- エージェントのログ記録時 (エージェントのデプロイ前) に、エージェントに必要なエンドユーザーの REST API スコープを指定します。詳細については、「ユーザー代理認証」を参照してください
ユーザー代理認証のエンドツーエンドの例については、 エンドツーエンドの例を参照してください。
次のリソースは、エージェントによるユーザー代理認証と互換性があります。
Databricks リソース | 互換性のあるクライアント |
---|---|
ベクトル検索インデックス |
|
モデルサービングエンドポイント |
|
SQLウェアハウス |
|
UC接続 |
|
UC テーブルと UC 関数 | 現在、ユーザー代理認証を使用して UC テーブルまたは UC 関数に直接アクセスするクライアントはサポートされていません。代わりに、ユーザーにはGenieを使用して、ユーザーに代わって認証で構造化データにアクセスすることをお勧めします |
Genieスペース |
|
ユーザー・クライアントの代わりにツールを初期化する場合は、ツールの初期化をtry-exceptブロックでラップするか、エラーをユーザーに公開することができます。エラーを処理することで、エンドユーザーが必要なすべてのツールにアクセスできない場合でも、エージェントはベストエフォートの対応を行うことができます。ただし、ツールの初期化中にエラーを処理しないことを選択した場合、ユーザーに必要なリソースが不足していると、エージェントはエラーをスローします。
SDK の構成
以下のスニペットは、さまざまな SDK を使用して、エンドユーザーに代わってさまざまな Databricks リソースへのアクセスを構成する方法を示しています
- Vector Search Retriever Tool
- Vector Search Client
- Model Serving Endpoint
- UC Connections
- Genie Spaces
from databricks.sdk import WorkspaceClient
from databricks.sdk.credentials_provider import ModelServingUserCredentials
from databricks_langchain import VectorSearchRetrieverTool
# Configure a Databricks SDK WorkspaceClient to use on behalf of end
# user authentication
user_client = WorkspaceClient(credentials_strategy = ModelServingUserCredentials())
vector_search_tools = []
# Exclude exception handling if you want the agent to fail
# when users lack access to all required Databricks resources
try:
tool = VectorSearchRetrieverTool(
index_name="<index_name>",
description="...",
tool_name="...",
workspace_client=user_client # Specify the user authenticated client
)
vector_search_tools.append(tool)
except Exception as e:
_logger.debug("Skipping adding tool as user does not have permissions)
from databricks.vector_search.client import VectorSearchClient
from databricks.vector_search.utils import CredentialStrategy
# Configure a VectorSearch Client to use on behalf of end
# user authentication
user_authenticated_vsc = VectorSearchClient(credential_strategy=CredentialStrategy.MODEL_SERVING_USER_CREDENTIALS)
# Exclude exception handling if you want the agent to fail when
# users lack access to all required Databricks resources
try:
vs_index = user_authenticated_vsc.get_index(endpoint_name="endpoint_name", index_name="index_name")
...
except Exception as e:
_logger.debug("Skipping Vector Index because user does not have permissions)
from databricks.sdk import WorkspaceClient
from databricks.sdk.credentials_provider import ModelServingUserCredentials
# Configure a Databricks SDK WorkspaceClient to use on behalf of end
# user authentication
user_client = WorkspaceClient(credentials_strategy=ModelServingUserCredentials())
# Exclude exception handling if you want the agent to fail
# when users lack access to all required Databricks resources
try:
user_client.serving_endpoints.query("endpoint_name", input="")
except Exception as e:
_logger.debug("Skipping Model Serving Endpoint due to no permissions")
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.serving import ExternalFunctionRequestHttpMethod
# Configure a Databricks SDK WorkspaceClient to use on behalf of end
# user authentication
user_client = WorkspaceClient(credentials_strategy=ModelServingUserCredentials())
user_client.serving_endpoints.http_request(
conn="connection_name",
method=ExternalFunctionRequestHttpMethod.POST,
path="/api/v1/resource",
json={"key": "value"},
headers={"extra_header_key": "extra_header_value"},
)
from databricks.sdk import WorkspaceClient
from databricks.sdk.credentials_provider import ModelServingUserCredentials
from databricks_langchain.genie import GenieAgent
# Configure a Databricks SDK WorkspaceClient to use on behalf of end
# user authentication
user_client = WorkspaceClient(credentials_strategy=ModelServingUserCredentials())
genie_agent = GenieAgent(
genie_space_id="<genie_space_id>",
genie_agent_name="Genie",
description="Genie_description",
client=user_client, # Specify the user client here
)
エージェントの初期化
ユーザー代理認証は、 ChatAgent インターフェースと互換性があります。On-Behalf-of-User認証を使用する場合、エンドユーザーのIDは、デプロイされたエージェントがクエリされたとき、つまりChatAgentインターフェースの predict
および predict_stream
機能内でのみ認識されます。その結果、ユーザーに代わってリソースへのアクセス (例:エンドユーザーがアクセスできるベクトル検索インデックスをリストします)ChatAgent実装の __init__
メソッドではなく、これらのメソッド内から。これにより、リソースが呼び出し間で分離されます
from mlflow.pyfunc import ChatAgent
class LangGraphChatAgent(ChatAgent):
def initialize_agent():
user_client = WorkspaceClient(
credentials_strategy=ModelServingUserCredentials()
)
system_authorized_client = WorkspaceClient()
### Use the clients above to access resources with either system or user authorization
def predict(
self,
messages: list[ChatAgentMessage],
context: Optional[ChatContext] = None,
custom_inputs: Optional[dict[str, Any]] = None,
) -> ChatAgentResponse:
agent = initialize_agent() # Initialize the Agent in Predict
request = {"messages": self._convert_messages_to_dict(messages)}
messages = []
for event in self.agent.stream(request, stream_mode="updates"):
for node_data in event.values():
messages.extend(
ChatAgentMessage(**msg) for msg in node_data.get("messages", [])
)
return ChatAgentResponse(messages=messages)
セキュリティに関する考慮事項
エージェントによるユーザー代理認証を有効にする前に、セキュリティへの影響を考慮する必要があります。
- 機密性の高い Databricks リソースへのアクセス: ユーザー代理認証を有効にすると、エージェントは機密性の高い Databricks リソースにアクセスできます。 開発者がアクセスできるリソースを制限し、トークンの誤用のリスクを軽減するためにAPIスコープを実装しましたが、それでもいくつかのリスクが残っています。たとえば、
serving.serving-endpoints
API スコープは、ユーザーに代わってサービスエンドポイントを実行する権限をエージェントに付与します。ただし、配信エンドポイント自体が、元のエージェントが使用を許可されていない追加の API スコープにアクセスできる場合があります。 - エンド ユーザーの同意はサポートされていません: 現在のベータ フェーズでは、エージェント ユーザーは、エージェントに必要な Databricks REST API スコープを表示したり、同意したりすることはできません。ユーザーは、サービスエンドポイントに対する "Can Manage" 権限を持つユーザーが、ユーザーに代わって Databricks でアクションを実行することを信頼する責任があります。
エンドツーエンドの例
次のノートブックは、ユーザー代理認証を使用してベクトル検索でエージェントを作成する方法を示しています。