AI関数を使用したバッチLLM 推論の実行
プレビュー
この機能は パブリック プレビュー段階です。
この記事では、 AI関数 を使用してバッチ推論を実行する方法と、大規模および本番運用で AI関数を実行する方法の例を提供します。
必要条件
- 基盤モデルAPIがサポートされているリージョンに存在するワークスペース。
- Databricks Runtime 15.4 以降をお勧めします。
- 使用するデータを含む Unity Catalog の Delta テーブルに対するクエリのアクセス許可。
- テーブルプロパティの
pipelines.channel
を「preview」に設定して、ai_query()
を使用します。クエリの例については、「 要件 」を参照してください。 - AI関数を使用するバッチ推論ワークロードの場合、Databricksでは、パフォーマンスを向上させるためにDatabricks Runtime 15.4 ML LTSをお勧めします。
タスク固有のAI関数を使用したバッチLLM推論
タスク固有の AI 関数を使用してバッチ推論を実行できます。タスク固有の関数をパイプラインに組み込む方法のガイダンスについては、「 バッチ推論パイプラインのデプロイAI 」を参照してください。
以下は、タスク固有の AI 関数 ai_translate
の使用例です。
SELECT
writer_summary,
ai_translate(writer_summary, "cn") as cn_translation
from user.batch.news_summaries
limit 500
;
バッチ LLM 推論 ai_query
汎用のAI関数 ai_query
を使用して、バッチ推論を実行できます。ai_query
がサポートするモデルタイプと関連モデルを確認します。
このセクションの例では、 ai_query
の柔軟性と、バッチ推論パイプラインとワークフローでの使用方法に焦点を当てています。
ai_query
と Databricks でホストされる基盤モデル
Databricks でホストされ、事前にプロビジョニングされた基盤モデルをバッチ推論に使用すると、Databricks は、ワークロードに基づいて自動的にスケーリングするプロビジョニングされたスループットエンドポイントをユーザーに代わって構成します。
このメソッドをバッチ推論に使用するには、リクエストで以下を指定します。
ai_query
で使用する事前プロビジョニングされた LLM 。サポートされている 事前プロビジョニングされた LLM から選択します。これらの事前にプロビジョニングされた LLM には、制限の緩いライセンスと使用ポリシーが適用されます ( 適用可能なモデル開発者のライセンスと条件を参照してください)。- Unity Catalog の入力テーブルと出力テーブル。
- モデル プロンプトと任意のモデル パラメーター。
SELECT text, ai_query(
"databricks-meta-llama-3-1-8b-instruct",
"Summarize the given text comprehensively, covering key points and main ideas concisely while retaining relevant details and examples. Ensure clarity and accuracy without unnecessary repetition or omissions: " || text
) AS summary
FROM uc_catalog.schema.table;
ai_query
およびカスタムまたはファインチューンされた基盤モデル
このセクションのノートブックの例では、カスタムまたはファインチューンされた基盤モデルを使用して複数の入力を処理するバッチ推論ワークロードを示しています。この例では、 プロピジョン済み スループット 基盤モデルAPIを使用する既存のモデルサービング エンドポイントが必要です。
カスタム基盤モデルを使用した LLM バッチ推論
次のノートブック例では、プロビジョニングされたスループットエンドポイントを作成し、Python と Meta Llama 3.1 70B モデルを使用してバッチ LLM 推論を実行します。 また、バッチ推論ワークロードのベンチマークとプロビジョニング スループット モデルサービング エンドポイントの作成に関するガイダンスもあります。
カスタム基盤モデルとプロビジョニングされたスループットエンドポイントノートブックを使用した LLM バッチ推論
エンベディングモデルを使用した LLM バッチ推論
次のノートブックの例では、プロビジョニングされたスループットエンドポイントを作成し、Python と GTE Large (英語) または BGE Large (英語) 埋め込みモデルのいずれかを選択してバッチ LLM 推論を実行します。
プロビジョニングされたスループットエンドポイントを用いた LLM バッチ推論エンベディングのノートブック
バッチ推論と構造化データ抽出
次のノートブックの例は、ai_query
を使用して基本的な構造化データ抽出を実行し、自動抽出手法を使用して生の非構造化データを整理された使用可能な情報に変換する方法を示しています。このノートブックでは、Mosaic AI Agent Evaluation を活用して、グラウンド トゥルース データを使用して精度を評価する方法も示しています。
バッチ推論と構造化データ抽出ノートブック
名前付きエンティティ認識のためのBERTを使用したバッチ推論
次のノートブックは、BERT を使用した従来の ML モデルのバッチ推論の例を示しています。
名前付きエンティティ認識ノートブックの BERT を使用したバッチ推論
バッチ推論パイプラインをデプロイする
このセクションでは、 AI 関数 を他の Databricks データや AI 製品に統合して、完全なバッチ推論パイプラインを構築する方法を示します。 これらのパイプラインは、インジェスト、前処理、推論、後処理などのエンドツーエンドのワークフローを実行できます。パイプラインは、SQL または Python で作成し、次のようにデプロイできます。
- Delta Live Table パイプライン
- Databricks ワークフローを使用したスケジュールされたワークフロー
- 構造化ストリーミングを使用したストリーミング推論ワークフロー
DLT でのインクリメンタル バッチ推論の実行
次の例では、データが継続的に更新される場合に DLT を使用してインクリメンタル バッチ推論を実行します。
ステップ 1: ボリュームから生のニュース データを取り込む
- SQL
- Python
CREATE OR REFRESH STREAMING TABLE news_raw
COMMENT "Raw news articles ingested from volume."
AS SELECT *
FROM STREAM(read_files(
'/Volumes/databricks_news_summarization_benchmarking_data/v01/csv',
format => 'csv',
header => true,
mode => 'PERMISSIVE',
multiLine => 'true'
));
パッケージをインポートし、LLM 応答の JSON スキーマを Python 変数として定義します
import dlt
from pyspark.sql.functions import expr, get_json_object, concat
news_extraction_schema = (
'{"type": "json_schema", "json_schema": {"name": "news_extraction", '
'"schema": {"type": "object", "properties": {"title": {"type": "string"}, '
'"category": {"type": "string", "enum": ["Politics", "Sports", "Technology", '
'"Health", "Entertainment", "Business"], "strict": true}}'
)
Unity Catalog ボリュームからデータを取り込みます。
@dlt.table(
comment="Raw news articles ingested from volume."
)
def news_raw():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", True)
.option("mode", "PERMISSIVE")
.option("multiLine", "true")
.load("/Volumes/databricks_news_summarization_benchmarking_data/v01/csv")
)
ステップ 2: LLM 推論を適用してタイトルとカテゴリを抽出する
- SQL
- Python
CREATE OR REFRESH MATERIALIZED VIEW news_categorized
COMMENT "Extract category and title from news articles using LLM inference."
AS
SELECT
inputs,
ai_query(
"databricks-meta-llama-3-3-70b-instruct",
"Extract the category of the following news article: " || inputs,
responseFormat => '{
"type": "json_schema",
"json_schema": {
"name": "news_extraction",
"schema": {
"type": "object",
"properties": {
"title": { "type": "string" },
"category": {
"type": "string",
"enum": ["Politics", "Sports", "Technology", "Health", "Entertainment", "Business"]
}
}
},
"strict": true
}
}'
) AS meta_data
FROM news_raw
LIMIT 2;
@dlt.table(
comment="Extract category and title from news articles using LLM inference."
)
def news_categorized():
# Limit the number of rows to 2 as in the SQL version
df_raw = spark.read.table("news_raw").limit(2)
# Inject the JSON schema variable into the ai_query call using an f-string.
return df_raw.withColumn(
"meta_data",
expr(
f"ai_query('databricks-meta-llama-3-3-70b-instruct', "
f"concat('Extract the category of the following news article: ', inputs), "
f"responseFormat => '{news_extraction_schema}')"
)
)
ステップ 3: 要約前に LLM 推論出力を検証する
- SQL
- Python
CREATE OR REFRESH MATERIALIZED VIEW news_validated (
CONSTRAINT valid_title EXPECT (size(split(get_json_object(meta_data, '$.title'), ' ')) >= 3),
CONSTRAINT valid_category EXPECT (get_json_object(meta_data, '$.category') IN ('Politics', 'Sports', 'Technology', 'Health', 'Entertainment', 'Business'))
)
COMMENT "Validated news articles ensuring the title has at least 3 words and the category is valid."
AS
SELECT *
FROM news_categorized;
@dlt.table(
comment="Validated news articles ensuring the title has at least 3 words and the category is valid."
)
@dlt.expect("valid_title", "size(split(get_json_object(meta_data, '$.title'), ' ')) >= 3")
@dlt.expect_or_fail("valid_category", "get_json_object(meta_data, '$.category') IN ('Politics', 'Sports', 'Technology', 'Health', 'Entertainment', 'Business')")
def news_validated():
return spark.read.table("news_categorized")
ステップ4:検証済みのデータからニュース記事を要約する
- SQL
- Python
CREATE OR REFRESH MATERIALIZED VIEW news_summarized
COMMENT "Summarized political news articles after validation."
AS
SELECT
get_json_object(meta_data, '$.category') as category,
get_json_object(meta_data, '$.title') as title,
ai_query(
"databricks-meta-llama-3-3-70b-instruct",
"Summarize the following political news article in 2-3 sentences: " || inputs
) AS summary
FROM news_validated;
@dlt.table(
comment="Summarized political news articles after validation."
)
def news_summarized():
df = spark.read.table("news_validated")
return df.select(
get_json_object("meta_data", "$.category").alias("category"),
get_json_object("meta_data", "$.title").alias("title"),
expr(
"ai_query('databricks-meta-llama-3-3-70b-instruct', "
"concat('Summarize the following political news article in 2-3 sentences: ', inputs))"
).alias("summary")
)
Databricks ワークフローを使用したバッチ推論ジョブの自動化
バッチ推論ジョブをスケジュールし、AI パイプラインを自動化します。
- SQL
- Python
SELECT
*,
ai_query('databricks-meta-llama-3-3-70b-instruct', request => concat("You are an opinion mining service. Given a piece of text, output an array of json results that extracts key user opinions, a classification, and a Positive, Negative, Neutral, or Mixed sentiment about that subject.
AVAILABLE CLASSIFICATIONS
Quality, Service, Design, Safety, Efficiency, Usability, Price
Examples below:
DOCUMENT
I got soup. It really did take only 20 minutes to make some pretty good soup. The noises it makes when it's blending are somewhat terrifying, but it gives a little beep to warn you before it does that. It made three or four large servings of soup. It's a single layer of steel, so the outside gets pretty hot. It can be hard to unplug the lid without knocking the blender against the side, which is not a nice sound. The soup was good and the recipes it comes with look delicious, but I'm not sure I'll use it often. 20 minutes of scary noises from the kitchen when I already need comfort food is not ideal for me. But if you aren't sensitive to loud sounds it does exactly what it says it does..
RESULT
[
{'Classification': 'Efficiency', 'Comment': 'only 20 minutes','Sentiment': 'Positive'},
{'Classification': 'Quality','Comment': 'pretty good soup','Sentiment': 'Positive'},
{'Classification': 'Usability', 'Comment': 'noises it makes when it's blending are somewhat terrifying', 'Sentiment': 'Negative'},
{'Classification': 'Safety','Comment': 'outside gets pretty hot','Sentiment': 'Negative'},
{'Classification': 'Design','Comment': 'Hard to unplug the lid without knocking the blender against the side, which is not a nice sound', 'Sentiment': 'Negative'}
]
DOCUMENT
", REVIEW_TEXT, '\n\nRESULT\n')) as result
FROM catalog.schema.product_reviews
LIMIT 10
import json
from pyspark.sql.functions import expr
# Define the opinion mining prompt as a multi-line string.
opinion_prompt = """You are an opinion mining service. Given a piece of text, output an array of json results that extracts key user opinions, a classification, and a Positive, Negative, Neutral, or Mixed sentiment about that subject.
AVAILABLE CLASSIFICATIONS
Quality, Service, Design, Safety, Efficiency, Usability, Price
Examples below:
DOCUMENT
I got soup. It really did take only 20 minutes to make some pretty good soup.The noises it makes when it's blending are somewhat terrifying, but it gives a little beep to warn you before it does that.It made three or four large servings of soup.It's a single layer of steel, so the outside gets pretty hot. It can be hard to unplug the lid without knocking the blender against the side, which is not a nice sound.The soup was good and the recipes it comes with look delicious, but I'm not sure I'll use it often. 20 minutes of scary noises from the kitchen when I already need comfort food is not ideal for me. But if you aren't sensitive to loud sounds it does exactly what it says it does.
RESULT
[
{'Classification': 'Efficiency', 'Comment': 'only 20 minutes','Sentiment': 'Positive'},
{'Classification': 'Quality','Comment': 'pretty good soup','Sentiment': 'Positive'},
{'Classification': 'Usability', 'Comment': 'noises it makes when it's blending are somewhat terrifying', 'Sentiment': 'Negative'},
{'Classification': 'Safety','Comment': 'outside gets pretty hot','Sentiment': 'Negative'},
{'Classification': 'Design','Comment': 'Hard to unplug the lid without knocking the blender against the side, which is not a nice sound', 'Sentiment': 'Negative'}
]
DOCUMENT
"""
# Escape the prompt so it can be safely embedded in the SQL expression.
escaped_prompt = json.dumps(opinion_prompt)
# Read the source table and limit to 10 rows.
df = spark.table("catalog.schema.product_reviews").limit(10)
# Apply the LLM inference to each row, concatenating the prompt, the review text, and the tail string.
result_df = df.withColumn(
"result",
expr(f"ai_query('databricks-meta-llama-3-3-70b-instruct', request => concat({escaped_prompt}, REVIEW_TEXT, '\\n\\nRESULT\\n'))")
)
# Display the result DataFrame.
display(result_df)
構造化ストリーミングを用いたAI関数
ai_query
と構造化ストリーミングを使用して、リアルタイムまたはマイクロバッチに近いシナリオでAI推論を適用します。
ステップ1.静的 Delta テーブルの読み取り
静的 Delta テーブルをストリームのように読み取ります。
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.getOrCreate()
# Spark processes all existing rows exactly once in the first micro-batch.
df = spark.table("enterprise.docs") # Replace with your table name containing enterprise documents
df.repartition(50).write.format("delta").mode("overwrite").saveAsTable("enterprise.docs")
df_stream = spark.readStream.format("delta").option("maxBytesPerTrigger", "50K").table("enterprise.docs")
# Define the prompt outside the SQL expression.
prompt = (
"You are provided with an enterprise document. Summarize the key points in a concise paragraph. "
"Do not include extra commentary or suggestions. Document: "
)
ステップ2. 適用 ai_query
Spark は、新しい行がテーブルに到着しない限り、静的データに対してこれを 1 回だけ処理します。
df_transformed = df_stream.select(
"document_text",
F.expr(f"""
ai_query(
'databricks-meta-llama-3-1-8b-instruct',
CONCAT('{prompt}', document_text)
)
""").alias("summary")
)
ステップ 3: 要約された出力を書き込む
集計された出力を別の Delta テーブルに書き込む
# Time-based triggers apply, but only the first trigger processes all existing static data.
query = df_transformed.writeStream \
.format("delta") \
.option("checkpointLocation", "/tmp/checkpoints/_docs_summary") \
.outputMode("append") \
.toTable("enterprise.docs_summary")
query.awaitTermination()