コードで AI エージェントを作成する
このページでは、Mosaic AI Agent Framework と、LangGraph や OpenAI などの一般的なエージェント オーサリング ライブラリを使用して、Python で AI エージェントを作成する方法を示します。
必要条件
Databricks では、エージェントを開発するときに最新バージョンの MLflow Python クライアントをインストールすることをお勧めします。
このページのアプローチを使用してエージェントを作成およびデプロイするには、次のものをインストールします。
databricks-agents1.2.0以上mlflow3.1.3以上- Python 3.10 以降。
- この要件を満たすには、サーバレス コンピュートまたは Databricks Runtime 13.3 LTS 以上を使用してください。
%pip install -U -qqqq databricks-agents mlflow
Databricks では、エージェントを作成するために Databricks AI Bridge 統合パッケージをインストールすることもお勧めします。これらの統合パッケージは、エージェントオーサリングフレームワークとSDK間で、Databricks AI/BI Genieやベクトル検索などのDatabricks AI機能と対話するAPIsの共有レイヤーを提供します。
- OpenAI
- LangChain/LangGraph
- DSPy
- Pure Python agents
%pip install -U -qqqq databricks-openai
%pip install -U -qqqq databricks-langchain
%pip install -U -qqqq databricks-dspy
%pip install -U -qqqq databricks-ai-bridge
ResponsesAgentを使用してエージェントを作成する
Databricksは、本番運用グレードのエージェントを作成するためにResponsesAgentMLflowインターフェイスを推奨しています。ResponsesAgent 、任意のサードパーティフレームワークを使用してエージェントを構築し、それを Databricks AI 機能と統合して、堅牢なログ記録、トレース、評価、デプロイ、およびモニタリング機能を実現できます。
ResponsesAgentスキーマは、OpenAI Responsesスキーマと互換性があります。OpenAI Responsesの詳細については、「 OpenAI: 応答と ChatCompletion」を参照してください。
古い ChatAgent インターフェイスは、Databricks で引き続きサポートされています。ただし Databricks 、新しいエージェントの場合は、最新バージョンの MLflow と ResponsesAgent インターフェイスを使用することをお勧めします。
「 レガシー入出力エージェントスキーマ」を参照してください。
ResponsesAgent 次の利点があります。
-
高度なエージェント機能
- マルチエージェントのサポート
- ストリーミング出力 : 出力を小さなチャンクでストリームします。
- 包括的なツール呼び出しメッセージ履歴 : 中間ツール呼び出しメッセージを含む複数のメッセージを返し、品質と会話管理を向上させます。
- ツールコール確認支援
- 長期実行ツールのサポート
-
開発、デプロイ、モニタリングの効率化
- 任意のフレームワークを使用してエージェントを作成する :
ResponsesAgentインターフェイスを使用して既存のエージェントをラップし、 AI Playground、エージェント評価、エージェントモニタリングとのすぐに使用できる互換性を実現します。 - 型付きオーサリングインターフェイス :型付きPythonクラスを使用してエージェントコードを記述し、IDEとノートブックのオートコンプリートの恩恵を受けます。
- 自動署名推論 : MLflow は、エージェントのログ記録時に
ResponsesAgent署名を自動的に推測し、登録とデプロイを簡素化します。「ログ記録中のモデル署名の推論」を参照してください。 - 自動トレース : MLflow は、
predict関数とpredict_stream関数を自動的にトレースし、ストリーム応答を集約して評価と表示を容易にします。 - AI Gateway 拡張推論テーブル : AI Gateway 推論テーブルは、デプロイされたエージェントに対して自動的に有効になり、詳細なリクエストログメタデータにアクセスできます。
- 任意のフレームワークを使用してエージェントを作成する :
ResponsesAgentの作成方法については、次のセクションの例と、MLflowドキュメント - ResponsesAgent for モデルサービングを参照してください。
ResponsesAgent 例
次のノートブックは、一般的なライブラリを使用してストリーミングおよび非ストリーミング ResponsesAgent を作成する方法を示しています。これらのエージェントの機能を拡張する方法については、「 AI エージェント ツール」を参照してください。
- OpenAI
- LangGraph
- DSPy
Databricks でホストされるモデルを使用した OpenAI のシンプルなチャット エージェント
Databricks でホストされるモデルを使用した OpenAI ツール呼び出しエージェント
OpenAI がホストするモデルを使用した OpenAI ツール呼び出しエージェント
LangGraphツールコールエージェント
DSPy シングルターンツール呼び出しエージェント
マルチエージェントの例
マルチエージェントシステムを作成する方法については、 マルチエージェントシステムでの Genie の使用を参照してください。
ステートフルエージェントの例
Lakebase をメモリ ストアとして使用して、短期メモリと長期メモリを備えたステートフル エージェントを作成する方法については、 「AI エージェント メモリ」を参照してください。
非会話エージェントの例
複数ターンの対話を管理する会話エージェントとは異なり、非会話エージェントは明確に定義されたタスクを効率的に実行することに重点を置いています。この合理化されたアーキテクチャにより、独立したリクエストのスループットが向上します。
非会話型エージェントを作成する方法については、 「MLflow を使用した非会話型 AI エージェント」を参照してください。
すでにエージェントがいる場合はどうなりますか?
LangChain、LangGraph、または同様のフレームワークで構築されたエージェントを既にお持ちの場合は、Databricks で使用するためにエージェントを書き直す必要はありません。代わりに、既存のエージェントを MLflow ResponsesAgent インターフェイスでラップするだけです。
-
Python
mlflow.pyfunc.ResponsesAgent.ラッパークラス内で、既存のエージェントを属性
self.agent = your_existing_agentとして参照します。 -
ResponsesAgentpredictクラスでは、非ストリーミング要求を処理するためにResponsesAgentResponseを返す メソッドを実装する必要があります。ResponsesAgentResponsesスキーマの例を次に示します。Pythonimport uuid
# input as a dict
{"input": [{"role": "user", "content": "What did the data scientist say when their Spark job finally completed?"}]}
# output example
ResponsesAgentResponse(
output=[
{
"type": "message",
"id": str(uuid.uuid4()),
"content": [{"type": "output_text", "text": "Well, that really sparked joy!"}],
"role": "assistant",
},
]
) -
predict関数で、ResponsesAgentRequestからの受信メッセージをエージェントが期待する形式に変換します。エージェントが応答を生成したら、その出力をResponsesAgentResponseオブジェクトに変換します。
既存のエージェントを ResponsesAgentに変換する方法については、次のコード例を参照してください。
- Basic conversion
- Streaming with code re-use
- Migrate from ChatCompletions
非ストリーミングエージェントの場合は、 predict 関数で入力と出力を変換します。
from uuid import uuid4
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
ResponsesAgentRequest,
ResponsesAgentResponse,
)
class MyWrappedAgent(ResponsesAgent):
def __init__(self, agent):
# Reference your existing agent
self.agent = agent
def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
# Convert incoming messages to your agent's format
# prep_msgs_for_llm is a function you write to convert the incoming messages
messages = self.prep_msgs_for_llm([i.model_dump() for i in request.input])
# Call your existing agent (non-streaming)
agent_response = self.agent.invoke(messages)
# Convert your agent's output to ResponsesAgent format, assuming agent_response is a str
output_item = (self.create_text_output_item(text=agent_response, id=str(uuid4())),)
# Return the response
return ResponsesAgentResponse(output=[output_item])
ストリーミングエージェントの場合は、メッセージを変換するコードの重複を避けるために、賢くロジックを再利用できます。
from typing import Generator
from uuid import uuid4
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
ResponsesAgentRequest,
ResponsesAgentResponse,
ResponsesAgentStreamEvent,
)
class MyWrappedStreamingAgent(ResponsesAgent):
def __init__(self, agent):
# Reference your existing agent
self.agent = agent
def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
"""Non-streaming predict: collects all streaming chunks into a single response."""
# Reuse the streaming logic and collect all output items
output_items = []
for stream_event in self.predict_stream(request):
if stream_event.type == "response.output_item.done":
output_items.append(stream_event.item)
# Return all collected items as a single response
return ResponsesAgentResponse(output=output_items)
def predict_stream(
self, request: ResponsesAgentRequest
) -> Generator[ResponsesAgentStreamEvent, None, None]:
"""Streaming predict: the core logic that both methods use."""
# Convert incoming messages to your agent's format
# prep_msgs_for_llm is a function you write to convert the incoming messages, included in full examples linked below
messages = self.prep_msgs_for_llm([i.model_dump() for i in request.input])
# Stream from your existing agent
item_id = str(uuid4())
aggregated_stream = ""
for chunk in self.agent.stream(messages):
# Convert each chunk to ResponsesAgent format
yield self.create_text_delta(delta=chunk, item_id=item_id)
aggregated_stream += chunk
# Emit an aggregated output_item for all the text deltas with id=item_id
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=self.create_text_output_item(text=aggregated_stream, id=item_id),
)
既存のエージェントが OpenAI ChatCompletions API を使用している場合は、コア ロジックを書き直すことなく ResponsesAgent に移行できます。次のラッパーを追加します。
- 受信
ResponsesAgentRequestメッセージをエージェントが期待するChatCompletions形式に変換します。 ChatCompletions出力をResponsesAgentResponseスキーマに変換します。- オプションで、
ChatCompletionsからResponsesAgentStreamEventオブジェクトに増分デルタをマッピングすることで、ストリーミングをサポートします。
from typing import Generator
from uuid import uuid4
from databricks.sdk import WorkspaceClient
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
ResponsesAgentRequest,
ResponsesAgentResponse,
ResponsesAgentStreamEvent,
)
# Legacy agent that outputs ChatCompletions objects
class LegacyAgent:
def __init__(self):
self.w = WorkspaceClient()
self.OpenAI = self.w.serving_endpoints.get_open_ai_client()
def stream(self, messages):
for chunk in self.OpenAI.chat.completions.create(
model="databricks-claude-sonnet-4-5",
messages=messages,
stream=True,
):
yield chunk.to_dict()
# Wrapper that converts the legacy agent to a ResponsesAgent
class MyWrappedStreamingAgent(ResponsesAgent):
def __init__(self, agent):
# `agent` is your existing ChatCompletions agent
self.agent = agent
def prep_msgs_for_llm(self, messages):
# dummy example of prep_msgs_for_llm
# real example of prep_msgs_for_llm included in full examples linked below
return [{"role": "user", "content": "Hello, how are you?"}]
def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
"""Non-streaming predict: collects all streaming chunks into a single response."""
# Reuse the streaming logic and collect all output items
output_items = []
for stream_event in self.predict_stream(request):
if stream_event.type == "response.output_item.done":
output_items.append(stream_event.item)
# Return all collected items as a single response
return ResponsesAgentResponse(output=output_items)
def predict_stream(
self, request: ResponsesAgentRequest
) -> Generator[ResponsesAgentStreamEvent, None, None]:
"""Streaming predict: the core logic that both methods use."""
# Convert incoming messages to your agent's format
messages = self.prep_msgs_for_llm([i.model_dump() for i in request.input])
# process the ChatCompletion output stream
agent_content = ""
tool_calls = []
msg_id = None
for chunk in self.agent.stream(messages): # call the underlying agent's stream method
delta = chunk["choices"][0]["delta"]
msg_id = chunk.get("id", None)
content = delta.get("content", None)
if tc := delta.get("tool_calls"):
if not tool_calls: # only accommodate for single tool call right now
tool_calls = tc
else:
tool_calls[0]["function"]["arguments"] += tc[0]["function"]["arguments"]
elif content is not None:
agent_content += content
yield ResponsesAgentStreamEvent(**self.create_text_delta(content, item_id=msg_id))
# aggregate the streamed text content
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=self.create_text_output_item(agent_content, msg_id),
)
for tool_call in tool_calls:
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=self.create_function_call_item(
str(uuid4()),
tool_call["id"],
tool_call["function"]["name"],
tool_call["function"]["arguments"],
),
)
agent = MyWrappedStreamingAgent(LegacyAgent())
for chunk in agent.predict_stream(
ResponsesAgentRequest(input=[{"role": "user", "content": "Hello, how are you?"}])
):
print(chunk)
完全な例については、「 ResponsesAgent 例」を参照してください。
応答のストリーミング
ストリーミングを使用すると、エージェントは完全な応答を待つ代わりに、リアルタイムのチャンクで応答を送信できます。ResponsesAgentを使用してストリーミングを実装するには、一連のデルタイベントとそれに続く最終完了イベントを発行します。
- デルタイベントを出力する : 同じ
item_idで複数のoutput_text.deltaイベントを送信して、 ストリームテキストチャンク リアルタイム。 - 完了イベントで終了 : 完全な最終出力テキストを含むデルタ イベントと同じ
item_idで最終response.output_item.doneイベントを送信します。
各デルタ イベントは、テキストのチャンクをクライアントにストリームします。 最終的な done イベントには完全な応答テキストが含まれ、Databricks に次の操作を行うように通知します。
- MLflow トレースを使用してエージェントの出力をトレースする
- AI Gateway推論テーブルでのストリームレスポンスの集計
- AI Playground UI で完全な出力を表示する
ストリーミング エラーの伝播
Mosaic AI は、ストリーミング中に発生したエラーを databricks_output.errorの下の最後のトークンで伝播します。このエラーを適切に処理して表示するのは、呼び出し元のクライアント次第です。
{
"delta": …,
"databricks_output": {
"trace": {...},
"error": {
"error_code": BAD_REQUEST,
"message": "TimeoutException: Tool XYZ failed to execute."
}
}
}
高度な機能
カスタム入力と出力
一部のシナリオでは、 client_type や session_idなどの追加のエージェント入力や、将来の対話のためにチャット履歴に含めるべきではない取得ソースリンクなどの出力が必要になる場合があります。
これらのシナリオでは、MLflow ResponsesAgent は custom_inputs フィールドと custom_outputsフィールドをネイティブにサポートします。カスタム入力には、上記の ResponsesAgent の例でリンクされているすべての例のrequest.custom_inputsを介してアクセスできます。
エージェント評価レビューアプリは、追加の入力フィールドを持つエージェントのレンダリングトレースをサポートしていません。
カスタムの入力と出力を設定する方法については、次のノートブックを参照してください。
AI Playground アプリで custom_inputs を提供し、レビューする
エージェントが custom_inputs フィールドを使用して追加の入力を受け入れる場合は、 AI Playground と レビューアプリの両方でこれらの入力を手動で提供できます。
-
AI Playground または Agent Review App で、歯車アイコン
を選択します。
-
custom_inputs を有効にします。
-
エージェントの定義済み入力スキーマに一致するJSONオブジェクトを提供します。

カスタム取得者スキーマの指定
AIエージェントは一般的に、レトリーバーを使用して、ベクトル検索インデックスから非構造化データを検索し、クエリを実行します。取得ツールの例については、「 非構造化データの取得ツールの構築とトレース」を参照してください。
MLflow RETRIEVER スパンを使用してエージェント内でこれらのレトリーバーをトレースし、次のような Databricks 製品の機能を有効にします。
- 取得したソース ドキュメントへのリンクを AI Playground UI に自動的に表示する
- Agent Evaluation で自動的に実行される検索の根拠と関連性のジャッジ
Databricks では、 databricks_langchain.VectorSearchRetrieverTool や databricks_openai.VectorSearchRetrieverTool などの Databricks AI Bridge パッケージによって提供されるレトリーバー ツールの使用をお勧めします。これは、既に MLflow レトリーバー スキーマに準拠しているためです。 AI Bridge を使用したベクトル検索取得ツールをローカルで開発するを参照してください。
エージェントにカスタムスキーマのレトリーバースパンが含まれている場合は、コードでエージェントを定義するときに mlflow.models.set_retriever_schema を呼び出します。これにより、レトリーバーの出力列が MLflow の予期されるフィールド (primary_key、 text_column、 doc_uri) にマップされます。
import mlflow
# Define the retriever's schema by providing your column names
# For example, the following call specifies the schema of a retriever that returns a list of objects like
# [
# {
# 'document_id': '9a8292da3a9d4005a988bf0bfdd0024c',
# 'chunk_text': 'MLflow is an open-source platform, purpose-built to assist machine learning practitioners...',
# 'doc_uri': 'https://mlflow.org/docs/latest/index.html',
# 'title': 'MLflow: A Tool for Managing the Machine Learning Lifecycle'
# },
# {
# 'document_id': '7537fe93c97f4fdb9867412e9c1f9e5b',
# 'chunk_text': 'A great way to get started with MLflow is to use the autologging feature. Autologging automatically logs your model...',
# 'doc_uri': 'https://mlflow.org/docs/latest/getting-started/',
# 'title': 'Getting Started with MLflow'
# },
# ...
# ]
mlflow.models.set_retriever_schema(
# Specify the name of your retriever span
name="mlflow_docs_vector_search",
# Specify the output column name to treat as the primary key (ID) of each retrieved document
primary_key="document_id",
# Specify the output column name to treat as the text content (page content) of each retrieved document
text_column="chunk_text",
# Specify the output column name to treat as the document URI of each retrieved document
doc_uri="doc_uri",
# Specify any other columns returned by the retriever
other_columns=["title"],
)
doc_uri列は、レトリーバーのパフォーマンスを評価するときに特に重要です。doc_uri は、レトリーバーによって返されるドキュメントの主な識別子であり、それらをグラウンド トゥルース評価セットと比較できます。「評価セット (MLflow 2)」を参照してください。
デプロイに関する考慮事項
Databricksモデルサービングの準備
Databricks ResponsesAgentを分散環境に展開します Databricks モデルサービング。つまり、複数ターンの会話中に、同じサービングレプリカがすべての要求を処理できない可能性があります。エージェント状態の管理に関する次の影響に注意してください。
-
ローカル キャッシュを避ける :
ResponsesAgentをデプロイするときは、同じレプリカがマルチターン会話のすべての要求を処理すると想定しないでください。ターンごとに辞書ResponsesAgentRequestスキーマを使用して内部状態を再構築します。 -
スレッドセーフな状態 : エージェントの状態をスレッドセーフに設計し、マルチスレッド環境での競合を防ぎます。
-
関数で状態を初期化
predictする:predict初期化中ではなく、ResponsesAgent関数が呼び出されるたびに状態を初期化します。ResponsesAgentレベルで状態を格納すると、1 つのResponsesAgentレプリカが複数の会話からの要求を処理できるため、会話間で情報が漏洩し、競合が発生する可能性があります。
環境間でデプロイするためのコードをパラメータ化する
エージェントコードをパラメータ化して、異なる環境で同じエージェントコードを再利用します。
パラメータは、Pythonディクショナリーまたは.yamlファイルで定義するキーと値のペアです。
コードを構成するには、Python 辞書または .yaml ファイルを使用してModelConfigを作成します。ModelConfig は、柔軟な構成管理を可能にする一連のキー値パラメーターです。例えば、開発中に辞書を使用した後、本番運用デプロイとCI/CD用の.yamlファイルに変換することができます。
ModelConfigの例を以下に示します。
llm_parameters:
max_tokens: 500
temperature: 0.01
model_serving_endpoint: databricks-meta-llama-3-3-70b-instruct
vector_search_index: ml.docs.databricks_docs_index
prompt_template: 'You are a hello world bot. Respond with a reply to the user''s
question that indicates your prompt template came from a YAML file. Your response
must use the word "YAML" somewhere. User''s question: {question}'
prompt_template_input_vars:
- question
エージェントコードでは、 .yaml ファイルまたはディクショナリからデフォルト(開発)設定を参照できます。
import mlflow
# Example for loading from a .yml file
config_file = "configs/hello_world_config.yml"
model_config = mlflow.models.ModelConfig(development_config=config_file)
# Example of using a dictionary
config_dict = {
"prompt_template": "You are a hello world bot. Respond with a reply to the user's question that is fun and interesting to the user. User's question: {question}",
"prompt_template_input_vars": ["question"],
"model_serving_endpoint": "databricks-meta-llama-3-3-70b-instruct",
"llm_parameters": {"temperature": 0.01, "max_tokens": 500},
}
model_config = mlflow.models.ModelConfig(development_config=config_dict)
# Use model_config.get() to retrieve a parameter value
# You can also use model_config.to_dict() to convert the loaded config object
# into a dictionary
value = model_config.get('sample_param')
次に、エージェントをログに記録するときに、log_modelするmodel_configパラメーターを指定します
ログに記録されたエージェントをロードするときに使用するパラメーターのカスタムセットを指定します。見る
MLflow ドキュメント - ModelConfig。
同期コードまたはコールバック パターンを使用する
安定性と互換性を確保するには、エージェントの実装で同期コードまたはコールバックベースのパターンを使用します。
Databricks は非同期通信を自動的に管理し、エージェントをデプロイするときに最適な同時実行性とパフォーマンスを提供します。カスタムイベントループまたは非同期フレームワークを導入すると、 RuntimeError: This event loop is already running and caused unpredictable behaviorなどのエラーが発生する可能性があります。
Databricks では、エージェントを開発するときに、asyncio の使用やカスタム イベント ループの作成などの非同期プログラミングを避けることをお勧めします。