メインコンテンツまでスキップ

バッチ推論パイプラインをデプロイする

備考

プレビュー

この機能はパブリック プレビュー段階です。リージョンごとの可用性については、「AI および機械学習機能の可用性」を参照してください。

このページではAI Functions他のDatabricksデータおよびAI製品に統合して、完全なバッチ推論パイプラインを構築する方法を示します。 これらのパイプラインは、取り込み、前処理、推論、後処理を含むエンドツーエンドのワークフローを実行できます。パイプラインは SQL または Python で作成し、次のようにデプロイできます。

  • Lakeflow 宣言型パイプライン
  • Databricks ワークフローを使用したスケジュールされたワークフロー
  • 構造化ストリーミングを使用したストリーミング推論ワークフロー

要件

  • 基盤モデルAPIがサポートされているリージョンに存在するワークスペース。
  • AI Functionsを使用したバッチ推論ワークロードには、 Databricks Runtime 15.4 LTS以降が必要です。
  • 使用するデータを含む Unity Catalog の Delta テーブルに対するクエリのアクセス許可。
  • ai_query()を使用するには、テーブル プロパティのpipelines.channel 「プレビュー」に設定します。クエリの例については、 「要件」を参照してください。

宣言型パイプラインでインクリメンタル バッチ推論を実行するLakeflow

次の例では、データが継続的に更新される場合に Lakeflow 宣言型パイプラインを使用してインクリメンタル バッチ推論を実行します。

ステップ 1: ボリュームから生のニュース データを取り込む

SQL

CREATE OR REFRESH STREAMING TABLE news_raw
COMMENT "Raw news articles ingested from volume."
AS SELECT *
FROM STREAM(read_files(
'/Volumes/databricks_news_summarization_benchmarking_data/v01/csv',
format => 'csv',
header => true,
mode => 'PERMISSIVE',
multiLine => 'true'
));

ステップ 2: LLM推論を適用してタイトルとカテゴリを抽出します

SQL

CREATE OR REFRESH MATERIALIZED VIEW news_categorized
COMMENT "Extract category and title from news articles using LLM inference."
AS
SELECT
inputs,
ai_query(
"databricks-meta-llama-3-3-70b-instruct",
"Extract the category of the following news article: " || inputs,
responseFormat => '{
"type": "json_schema",
"json_schema": {
"name": "news_extraction",
"schema": {
"type": "object",
"properties": {
"title": { "type": "string" },
"category": {
"type": "string",
"enum": ["Politics", "Sports", "Technology", "Health", "Entertainment", "Business"]
}
}
},
"strict": true
}
}'
) AS meta_data
FROM news_raw
LIMIT 2;

ステップ 3: 要約する前にLLM推論出力を検証する

SQL
CREATE OR REFRESH MATERIALIZED VIEW news_validated (
CONSTRAINT valid_title EXPECT (size(split(get_json_object(meta_data, '$.title'), ' ')) >= 3),
CONSTRAINT valid_category EXPECT (get_json_object(meta_data, '$.category') IN ('Politics', 'Sports', 'Technology', 'Health', 'Entertainment', 'Business'))
)
COMMENT "Validated news articles ensuring the title has at least 3 words and the category is valid."
AS
SELECT *
FROM news_categorized;

ステップ 4: 検証されたデータからニュース記事を要約する

SQL
CREATE OR REFRESH MATERIALIZED VIEW news_summarized
COMMENT "Summarized political news articles after validation."
AS
SELECT
get_json_object(meta_data, '$.category') as category,
get_json_object(meta_data, '$.title') as title,
ai_query(
"databricks-meta-llama-3-3-70b-instruct",
"Summarize the following political news article in 2-3 sentences: " || inputs
) AS summary
FROM news_validated;

Databricks ワークフローを使用してバッチ推論ジョブを自動化する

バッチ推論ジョブをスケジュールし、AI パイプラインを自動化します。

SQL
SELECT
*,
ai_query('databricks-meta-llama-3-3-70b-instruct', request => concat("You are an opinion mining service. Given a piece of text, output an array of json results that extracts key user opinions, a classification, and a Positive, Negative, Neutral, or Mixed sentiment about that subject.
AVAILABLE CLASSIFICATIONS
Quality, Service, Design, Safety, Efficiency, Usability, Price
Examples below:
DOCUMENT
I got soup. It really did take only 20 minutes to make some pretty good soup. The noises it makes when it's blending are somewhat terrifying, but it gives a little beep to warn you before it does that. It made three or four large servings of soup. It's a single layer of steel, so the outside gets pretty hot. It can be hard to unplug the lid without knocking the blender against the side, which is not a nice sound. The soup was good and the recipes it comes with look delicious, but I'm not sure I'll use it often. 20 minutes of scary noises from the kitchen when I already need comfort food is not ideal for me. But if you aren't sensitive to loud sounds it does exactly what it says it does..
RESULT
[
{'Classification': 'Efficiency', 'Comment': 'only 20 minutes','Sentiment': 'Positive'},
{'Classification': 'Quality','Comment': 'pretty good soup','Sentiment': 'Positive'},
{'Classification': 'Usability', 'Comment': 'noises it makes when it's blending are somewhat terrifying', 'Sentiment': 'Negative'},
{'Classification': 'Safety','Comment': 'outside gets pretty hot','Sentiment': 'Negative'},
{'Classification': 'Design','Comment': 'Hard to unplug the lid without knocking the blender against the side, which is not a nice sound', 'Sentiment': 'Negative'}
]
DOCUMENT
", REVIEW_TEXT, '\n\nRESULT\n')) as result
FROM catalog.schema.product_reviews
LIMIT 10

構造化ストリーミングを用いたAI関数

ai_query構造化ストリーミングを使用して、リアルタイムまたはマイクロバッチに近いシナリオでAI推論を適用します。

ステップ 1. 静的Deltaテーブルを読み取ります

静的 Delta テーブルをストリームのように読み取ります。

Python

from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.getOrCreate()

# Spark processes all existing rows exactly once in the first micro-batch.
df = spark.table("enterprise.docs") # Replace with your table name containing enterprise documents
df.repartition(50).write.format("delta").mode("overwrite").saveAsTable("enterprise.docs")
df_stream = spark.readStream.format("delta").option("maxBytesPerTrigger", "50K").table("enterprise.docs")

# Define the prompt outside the SQL expression.
prompt = (
"You are provided with an enterprise document. Summarize the key points in a concise paragraph. "
"Do not include extra commentary or suggestions. Document: "
)

ステップ2. 適用 ai_query

Spark は、テーブルに新しい行が到着しない限り、静的データに対してこれを 1 回だけ処理します。

Python

df_transformed = df_stream.select(
"document_text",
F.expr(f"""
ai_query(
'databricks-meta-llama-3-1-8b-instruct',
CONCAT('{prompt}', document_text)
)
""").alias("summary")
)

ステップ 3: 要約された出力を書き出す

要約された出力を別のDeltaテーブルに書き込む

Python

# Time-based triggers apply, but only the first trigger processes all existing static data.
query = df_transformed.writeStream \
.format("delta") \
.option("checkpointLocation", "/tmp/checkpoints/_docs_summary") \
.outputMode("append") \
.toTable("enterprise.docs_summary")

query.awaitTermination()