AI関数を使用したバッチLLM 推論の実行
プレビュー
この機能は パブリック プレビュー段階です。
この記事では、AI 関数を使用してバッチ推論を実行する方法について説明します。
バッチ推論は、 タスク固有の AI 関数 または汎用関数 ai_query
を使用して実行できます。この記事の例では、 ai_query
の柔軟性と、バッチ推論パイプラインとワークフローでの使用方法に焦点を当てています。
バッチ推論に ai_query
を使用するには、主に 2 つの方法があります。
ai_query
と Databricksがホストする基盤モデル を使用したバッチ推論: この方法を使用すると、Databricks はワークロードに基づいて自動的にスケーリングするモデルサービング エンドポイントを構成します。サポートされている 事前プロビジョニングされた LLM を確認します。ai_query
と自分で構成したモデルサービング エンドポイント を使用したバッチ推論: この方法は、Databricksモデル、ファインチューンされた基盤モデル、または従来のMLモデルの外部でホストされている基盤モデルを使用するバッチ推論ワークフローに必要です。デプロイ後、エンドポイントはai_query
で直接使用できます。「カスタムモデルを使用したバッチ推論」または「ファインチューンされた基盤モデル」を参照してください。
必要条件
- 基盤モデルAPIがサポートされているリージョンに存在するワークスペース。
- 使用するデータを含む Unity Catalog の Delta テーブルに対するクエリのアクセス許可。
- テーブルプロパティの
pipelines.channel
を「preview」に設定して、ai_query()
を使用します。クエリの例については、「 要件 」を参照してください。
ai_query
と Databricks でホストされる基盤モデルを使用したバッチ LLM 推論
Databricks でホストされ、事前にプロビジョニングされた基盤モデルをバッチ推論に使用すると、Databricks は、ワークロードに基づいて自動的にスケーリングするプロビジョニングされたスループットエンドポイントをユーザーに代わって構成します。
このメソッドをバッチ推論に使用するには、リクエストで以下を指定します。
ai_query
で使用する事前プロビジョニングされた LLM 。サポートされている 事前プロビジョニングされた LLM から選択します。- Unity Catalog の入力テーブルと出力テーブル。
- モデル プロンプトと任意のモデル パラメーター。
SELECT text, ai_query(
"databricks-meta-llama-3-1-8b-instruct",
"Summarize the given text comprehensively, covering key points and main ideas concisely while retaining relevant details and examples. Ensure clarity and accuracy without unnecessary repetition or omissions: " || text
) AS summary
FROM uc_catalog.schema.table;
バッチ推論パイプラインをデプロイする
このセクションでは、 AI 関数 を他の Databricks データや AI 製品に統合して、完全なバッチ推論パイプラインを構築する方法を示します。 これらのパイプラインは、インジェスト、前処理、推論、後処理などのエンドツーエンドのワークフローを実行できます。パイプラインは、SQL または Python で作成し、次のようにデプロイできます。
- Databricks ワークフローを使用したスケジュールされたワークフロー
- 構造化ストリーミングを使用したストリーミング推論ワークフロー
Databricks ワークフローを使用したバッチ推論ジョブの自動化
バッチ推論ジョブをスケジュールし、AI パイプラインを自動化します。
- SQL
- Python
SELECT
*,
ai_query('databricks-meta-llama-3-3-70b-instruct', request => concat("You are an opinion mining service. Given a piece of text, output an array of json results that extracts key user opinions, a classification, and a Positive, Negative, Neutral, or Mixed sentiment about that subject.
AVAILABLE CLASSIFICATIONS
Quality, Service, Design, Safety, Efficiency, Usability, Price
Examples below:
DOCUMENT
I got soup. It really did take only 20 minutes to make some pretty good soup. The noises it makes when it's blending are somewhat terrifying, but it gives a little beep to warn you before it does that. It made three or four large servings of soup. It's a single layer of steel, so the outside gets pretty hot. It can be hard to unplug the lid without knocking the blender against the side, which is not a nice sound. The soup was good and the recipes it comes with look delicious, but I'm not sure I'll use it often. 20 minutes of scary noises from the kitchen when I already need comfort food is not ideal for me. But if you aren't sensitive to loud sounds it does exactly what it says it does..
RESULT
[
{'Classification': 'Efficiency', 'Comment': 'only 20 minutes','Sentiment': 'Positive'},
{'Classification': 'Quality','Comment': 'pretty good soup','Sentiment': 'Positive'},
{'Classification': 'Usability', 'Comment': 'noises it makes when it's blending are somewhat terrifying', 'Sentiment': 'Negative'},
{'Classification': 'Safety','Comment': 'outside gets pretty hot','Sentiment': 'Negative'},
{'Classification': 'Design','Comment': 'Hard to unplug the lid without knocking the blender against the side, which is not a nice sound', 'Sentiment': 'Negative'}
]
DOCUMENT
", REVIEW_TEXT, '\n\nRESULT\n')) as result
FROM catalog.schema.product_reviews
LIMIT 10
import json
from pyspark.sql.functions import expr
# Define the opinion mining prompt as a multi-line string.
opinion_prompt = """You are an opinion mining service. Given a piece of text, output an array of json results that extracts key user opinions, a classification, and a Positive, Negative, Neutral, or Mixed sentiment about that subject.
AVAILABLE CLASSIFICATIONS
Quality, Service, Design, Safety, Efficiency, Usability, Price
Examples below:
DOCUMENT
I got soup. It really did take only 20 minutes to make some pretty good soup.The noises it makes when it's blending are somewhat terrifying, but it gives a little beep to warn you before it does that.It made three or four large servings of soup.It's a single layer of steel, so the outside gets pretty hot. It can be hard to unplug the lid without knocking the blender against the side, which is not a nice sound.The soup was good and the recipes it comes with look delicious, but I'm not sure I'll use it often. 20 minutes of scary noises from the kitchen when I already need comfort food is not ideal for me. But if you aren't sensitive to loud sounds it does exactly what it says it does.
RESULT
[
{'Classification': 'Efficiency', 'Comment': 'only 20 minutes','Sentiment': 'Positive'},
{'Classification': 'Quality','Comment': 'pretty good soup','Sentiment': 'Positive'},
{'Classification': 'Usability', 'Comment': 'noises it makes when it's blending are somewhat terrifying', 'Sentiment': 'Negative'},
{'Classification': 'Safety','Comment': 'outside gets pretty hot','Sentiment': 'Negative'},
{'Classification': 'Design','Comment': 'Hard to unplug the lid without knocking the blender against the side, which is not a nice sound', 'Sentiment': 'Negative'}
]
DOCUMENT
"""
# Escape the prompt so it can be safely embedded in the SQL expression.
escaped_prompt = json.dumps(opinion_prompt)
# Read the source table and limit to 10 rows.
df = spark.table("catalog.schema.product_reviews").limit(10)
# Apply the LLM inference to each row, concatenating the prompt, the review text, and the tail string.
result_df = df.withColumn(
"result",
expr(f"ai_query('databricks-meta-llama-3-3-70b-instruct', request => concat({escaped_prompt}, REVIEW_TEXT, '\\n\\nRESULT\\n'))")
)
# Display the result DataFrame.
display(result_df)
構造化ストリーミングを用いたAI関数
ai_query
と構造化ストリーミングを使用して、リアルタイムまたはマイクロバッチに近いシナリオでAI推論を適用します。
ステップ1.静的 Delta テーブルの読み取り
静的 Delta テーブルをストリームのように読み取ります
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.getOrCreate()
# Spark processes all existing rows exactly once in the first micro-batch.
df = spark.table("enterprise.docs") # Replace with your table name containing enterprise documents
df.repartition(50).write.format("delta").mode("overwrite").saveAsTable("enterprise.docs")
df_stream = spark.readStream.format("delta").option("maxBytesPerTrigger", "50K").table("enterprise.docs")
# Define the prompt outside the SQL expression.
prompt = (
"You are provided with an enterprise document. Summarize the key points in a concise paragraph. "
"Do not include extra commentary or suggestions. Document: "
)
ステップ2. 適用 ai_query
Spark は、新しい行がテーブルに到着しない限り、静的データに対してこれを 1 回だけ処理します。
df_transformed = df_stream.select(
"document_text",
F.expr(f"""
ai_query(
'llama_3_8b',
CONCAT('{prompt}', document_text)
)
""").alias("summary")
)
ステップ 3: 要約された出力を書き込む
集計された出力を別の Delta テーブルに書き込む
# Time-based triggers apply, but only the first trigger processes all existing static data.
query = df_transformed.writeStream \
.format("delta") \
.option("checkpointLocation", "/tmp/checkpoints/_docs_summary") \
.outputMode("append") \
.toTable("enterprise.docs_summary")
query.awaitTermination()
カスタムモデルまたはファインチューンされた基盤モデルを使用したバッチ推論
このセクションのノートブックの例では、カスタムまたはファインチューンされた基盤モデルを使用して複数の入力を処理するバッチ推論ワークロードを示しています。この例では、 プロピジョン済み スループット 基盤モデルAPIを使用する既存のモデルサービング エンドポイントが必要です。
カスタム基盤モデルを使用した LLM バッチ推論
次のノートブック例では、プロビジョニングされたスループットエンドポイントを作成し、Python と Meta Llama 3.1 70B モデルを使用してバッチ LLM 推論を実行します。 また、バッチ推論ワークロードのベンチマークとプロビジョニング スループット モデルサービング エンドポイントの作成に関するガイダンスもあります。
カスタム基盤モデルとプロビジョニングされたスループットエンドポイントノートブックを使用した LLM バッチ推論
エンベディングモデルを使用した LLM バッチ推論
次のノートブックの例では、プロビジョニングされたスループットエンドポイントを作成し、Python と GTE Large (英語) または BGE Large (英語) 埋め込みモデルのいずれかを選択してバッチ LLM 推論を実行します。
プロビジョニングされたスループットエンドポイントノートブックを使用した LLM バッチ推論の埋め込み
バッチ推論と構造化データ抽出
次のノートブックの例は、ai_query
を使用して基本的な構造化データ抽出を実行し、自動抽出手法を使用して生の非構造化データを整理された使用可能な情報に変換する方法を示しています。このノートブックでは、Mosaic AI Agent Evaluation を活用して、グラウンド トゥルース データを使用して精度を評価する方法も示しています。
バッチ推論と構造化データ抽出ノートブック
名前付きエンティティ認識のためのBERTを使用したバッチ推論
次のノートブックは、BERT を使用した従来の ML モデルのバッチ推論の例を示しています。