メインコンテンツまでスキップ

AI関数を使用したバッチLLM 推論の実行

備考

プレビュー

この機能は パブリック プレビュー段階です。

この記事では、 AI関数 を使用してバッチ推論を実行する方法と、大規模および本番運用で AI関数を実行する方法の例を提供します。

必要条件

  • 基盤モデルAPIがサポートされているリージョンに存在するワークスペース。
  • Databricks Runtime 15.4 以降をお勧めします。
  • 使用するデータを含む Unity Catalog の Delta テーブルに対するクエリのアクセス許可。
  • テーブルプロパティの pipelines.channel を「preview」に設定して、 ai_query()を使用します。クエリの例については、「 要件 」を参照してください。
  • AI関数を使用するバッチ推論ワークロードの場合、Databricksでは、パフォーマンスを向上させるためにDatabricks Runtime 15.4 ML LTSをお勧めします。

タスク固有のAI関数を使用したバッチLLM推論

タスク固有の AI 関数を使用してバッチ推論を実行できます。タスク固有の関数をパイプラインに組み込む方法のガイダンスについては、「 バッチ推論パイプラインのデプロイAI 」を参照してください。

以下は、タスク固有の AI 関数 ai_translateの使用例です。

SQL
SELECT
writer_summary,
ai_translate(writer_summary, "cn") as cn_translation
from user.batch.news_summaries
limit 500
;

バッチ LLM 推論 ai_query

汎用のAI関数 ai_query を使用して、バッチ推論を実行できます。ai_queryがサポートするモデルタイプと関連モデルを確認します。

このセクションの例では、 ai_query の柔軟性と、バッチ推論パイプラインとワークフローでの使用方法に焦点を当てています。

ai_query と Databricks でホストされる基盤モデル

Databricks でホストされ、事前にプロビジョニングされた基盤モデルをバッチ推論に使用すると、Databricks は、ワークロードに基づいて自動的にスケーリングするプロビジョニングされたスループットエンドポイントをユーザーに代わって構成します。

このメソッドをバッチ推論に使用するには、リクエストで以下を指定します。

  • ai_queryで使用する事前プロビジョニングされた LLM 。サポートされている 事前プロビジョニングされた LLM から選択します。これらの事前にプロビジョニングされた LLM には、制限の緩いライセンスと使用ポリシーが適用されます ( 適用可能なモデル開発者のライセンスと条件を参照してください)。
  • Unity Catalog の入力テーブルと出力テーブル。
  • モデル プロンプトと任意のモデル パラメーター。
SQL
SELECT text, ai_query(
"databricks-meta-llama-3-1-8b-instruct",
"Summarize the given text comprehensively, covering key points and main ideas concisely while retaining relevant details and examples. Ensure clarity and accuracy without unnecessary repetition or omissions: " || text
) AS summary
FROM uc_catalog.schema.table;

ai_queryおよびカスタムまたはファインチューンされた基盤モデル

このセクションのノートブックの例では、カスタムまたはファインチューンされた基盤モデルを使用して複数の入力を処理するバッチ推論ワークロードを示しています。この例では、 プロピジョン済み スループット 基盤モデルAPIを使用する既存のモデルサービング エンドポイントが必要です。

カスタム基盤モデルを使用した LLM バッチ推論

次のノートブック例では、プロビジョニングされたスループットエンドポイントを作成し、Python と Meta Llama 3.1 70B モデルを使用してバッチ LLM 推論を実行します。 また、バッチ推論ワークロードのベンチマークとプロビジョニング スループット モデルサービング エンドポイントの作成に関するガイダンスもあります。

カスタム基盤モデルとプロビジョニングされたスループットエンドポイントノートブックを使用した LLM バッチ推論

Open notebook in new tab

エンベディングモデルを使用した LLM バッチ推論

次のノートブックの例では、プロビジョニングされたスループットエンドポイントを作成し、Python と GTE Large (英語) または BGE Large (英語) 埋め込みモデルのいずれかを選択してバッチ LLM 推論を実行します。

プロビジョニングされたスループットエンドポイントを用いた LLM バッチ推論エンベディングのノートブック

Open notebook in new tab

バッチ推論と構造化データ抽出

次のノートブックの例は、ai_query を使用して基本的な構造化データ抽出を実行し、自動抽出手法を使用して生の非構造化データを整理された使用可能な情報に変換する方法を示しています。このノートブックでは、Mosaic AI Agent Evaluation を活用して、グラウンド トゥルース データを使用して精度を評価する方法も示しています。

バッチ推論と構造化データ抽出ノートブック

Open notebook in new tab

名前付きエンティティ認識のためのBERTを使用したバッチ推論

次のノートブックは、BERT を使用した従来の ML モデルのバッチ推論の例を示しています。

名前付きエンティティ認識ノートブックの BERT を使用したバッチ推論

Open notebook in new tab

バッチ推論パイプラインをデプロイする

このセクションでは、 AI 関数 を他の Databricks データや AI 製品に統合して、完全なバッチ推論パイプラインを構築する方法を示します。 これらのパイプラインは、インジェスト、前処理、推論、後処理などのエンドツーエンドのワークフローを実行できます。パイプラインは、SQL または Python で作成し、次のようにデプロイできます。

  • Delta Live Table パイプライン
  • Databricks ワークフローを使用したスケジュールされたワークフロー
  • 構造化ストリーミングを使用したストリーミング推論ワークフロー

DLT でのインクリメンタル バッチ推論の実行

次の例では、データが継続的に更新される場合に DLT を使用してインクリメンタル バッチ推論を実行します。

ステップ 1: ボリュームから生のニュース データを取り込む

SQL

CREATE OR REFRESH STREAMING TABLE news_raw
COMMENT "Raw news articles ingested from volume."
AS SELECT *
FROM STREAM(read_files(
'/Volumes/databricks_news_summarization_benchmarking_data/v01/csv',
format => 'csv',
header => true,
mode => 'PERMISSIVE',
multiLine => 'true'
));

ステップ 2: LLM 推論を適用してタイトルとカテゴリを抽出する

SQL

CREATE OR REFRESH MATERIALIZED VIEW news_categorized
COMMENT "Extract category and title from news articles using LLM inference."
AS
SELECT
inputs,
ai_query(
"databricks-meta-llama-3-3-70b-instruct",
"Extract the category of the following news article: " || inputs,
responseFormat => '{
"type": "json_schema",
"json_schema": {
"name": "news_extraction",
"schema": {
"type": "object",
"properties": {
"title": { "type": "string" },
"category": {
"type": "string",
"enum": ["Politics", "Sports", "Technology", "Health", "Entertainment", "Business"]
}
}
},
"strict": true
}
}'
) AS meta_data
FROM news_raw
LIMIT 2;

ステップ 3: 要約前に LLM 推論出力を検証する

SQL
CREATE OR REFRESH MATERIALIZED VIEW news_validated (
CONSTRAINT valid_title EXPECT (size(split(get_json_object(meta_data, '$.title'), ' ')) >= 3),
CONSTRAINT valid_category EXPECT (get_json_object(meta_data, '$.category') IN ('Politics', 'Sports', 'Technology', 'Health', 'Entertainment', 'Business'))
)
COMMENT "Validated news articles ensuring the title has at least 3 words and the category is valid."
AS
SELECT *
FROM news_categorized;

ステップ4:検証済みのデータからニュース記事を要約する

SQL
CREATE OR REFRESH MATERIALIZED VIEW news_summarized
COMMENT "Summarized political news articles after validation."
AS
SELECT
get_json_object(meta_data, '$.category') as category,
get_json_object(meta_data, '$.title') as title,
ai_query(
"databricks-meta-llama-3-3-70b-instruct",
"Summarize the following political news article in 2-3 sentences: " || inputs
) AS summary
FROM news_validated;

Databricks ワークフローを使用したバッチ推論ジョブの自動化

バッチ推論ジョブをスケジュールし、AI パイプラインを自動化します。

SQL
SELECT
*,
ai_query('databricks-meta-llama-3-3-70b-instruct', request => concat("You are an opinion mining service. Given a piece of text, output an array of json results that extracts key user opinions, a classification, and a Positive, Negative, Neutral, or Mixed sentiment about that subject.


AVAILABLE CLASSIFICATIONS
Quality, Service, Design, Safety, Efficiency, Usability, Price


Examples below:


DOCUMENT
I got soup. It really did take only 20 minutes to make some pretty good soup. The noises it makes when it's blending are somewhat terrifying, but it gives a little beep to warn you before it does that. It made three or four large servings of soup. It's a single layer of steel, so the outside gets pretty hot. It can be hard to unplug the lid without knocking the blender against the side, which is not a nice sound. The soup was good and the recipes it comes with look delicious, but I'm not sure I'll use it often. 20 minutes of scary noises from the kitchen when I already need comfort food is not ideal for me. But if you aren't sensitive to loud sounds it does exactly what it says it does..


RESULT
[
{'Classification': 'Efficiency', 'Comment': 'only 20 minutes','Sentiment': 'Positive'},
{'Classification': 'Quality','Comment': 'pretty good soup','Sentiment': 'Positive'},
{'Classification': 'Usability', 'Comment': 'noises it makes when it's blending are somewhat terrifying', 'Sentiment': 'Negative'},
{'Classification': 'Safety','Comment': 'outside gets pretty hot','Sentiment': 'Negative'},
{'Classification': 'Design','Comment': 'Hard to unplug the lid without knocking the blender against the side, which is not a nice sound', 'Sentiment': 'Negative'}
]


DOCUMENT
", REVIEW_TEXT, '\n\nRESULT\n')) as result
FROM catalog.schema.product_reviews
LIMIT 10

構造化ストリーミングを用いたAI関数

ai_query構造化ストリーミングを使用して、リアルタイムまたはマイクロバッチに近いシナリオでAI推論を適用します。

ステップ1.静的 Delta テーブルの読み取り

静的 Delta テーブルをストリームのように読み取ります。

Python

from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.getOrCreate()

# Spark processes all existing rows exactly once in the first micro-batch.
df = spark.table("enterprise.docs") # Replace with your table name containing enterprise documents
df.repartition(50).write.format("delta").mode("overwrite").saveAsTable("enterprise.docs")
df_stream = spark.readStream.format("delta").option("maxBytesPerTrigger", "50K").table("enterprise.docs")

# Define the prompt outside the SQL expression.
prompt = (
"You are provided with an enterprise document. Summarize the key points in a concise paragraph. "
"Do not include extra commentary or suggestions. Document: "
)

ステップ2. 適用 ai_query

Spark は、新しい行がテーブルに到着しない限り、静的データに対してこれを 1 回だけ処理します。

Python

df_transformed = df_stream.select(
"document_text",
F.expr(f"""
ai_query(
'databricks-meta-llama-3-1-8b-instruct',
CONCAT('{prompt}', document_text)
)
""").alias("summary")
)

ステップ 3: 要約された出力を書き込む

集計された出力を別の Delta テーブルに書き込む

Python

# Time-based triggers apply, but only the first trigger processes all existing static data.
query = df_transformed.writeStream \
.format("delta") \
.option("checkpointLocation", "/tmp/checkpoints/_docs_summary") \
.outputMode("append") \
.toTable("enterprise.docs_summary")

query.awaitTermination()