Pular para o conteúdo principal

pipeline de inferência de lotes implantado

info

Visualização

Este recurso está em Visualização Pública.

Esta página mostra como você pode integrar AI Functions em outros dados e produtos de AI Databricks para criar um pipeline completo de inferência de lotes. Esses pipelines podem executar fluxos de trabalho de ponta a ponta, que incluem ingestão, pré-processamento, inferência e pós-processamento. pipeline pode ser criado em SQL ou Python e distribuído como:

  • Pipelines Declarativos do LakeFlow
  • Fluxo de trabalho agendado usando Databricks fluxo de trabalho
  • inferência de transmissão fluxo de trabalho usando transmissão estruturada

Requisitos

  • Um workspace em uma região com suporte APIs do Foundation Model.
  • Databricks Runtime 15.4 LTS ou superior é necessário para cargas de trabalho de inferência de lotes usando AI Functions.
  • Permissão de consulta na tabela Delta no Unity Catalog que contém os dados que você deseja usar.
  • Defina pipelines.channel nas propriedades da tabela como 'preview' para usar ai_query(). Veja Requisitos para um exemplo de consulta.

Executar inferência de lotes incrementais no pipeline declarativo LakeFlow

O exemplo a seguir executa inferência de lotes incrementais usando o pipeline declarativo LakeFlow para quando os dados são atualizados continuamente.

o passo 1: Ingerir dados de notícias brutos de um volume

SQL
CREATE OR REFRESH STREAMING TABLE news_raw
COMMENT "Raw news articles ingested from volume."
AS SELECT *
FROM STREAM(read_files(
'/Volumes/databricks_news_summarization_benchmarking_data/v01/csv',
format => 'csv',
header => true,
mode => 'PERMISSIVE',
multiLine => 'true'
));

o passo 2: Aplicar inferência LLM para extrair título e categoria

SQL

CREATE OR REFRESH MATERIALIZED VIEW news_categorized
COMMENT "Extract category and title from news articles using LLM inference."
AS
SELECT
inputs,
ai_query(
"databricks-meta-llama-3-3-70b-instruct",
"Extract the category of the following news article: " || inputs,
responseFormat => '{
"type": "json_schema",
"json_schema": {
"name": "news_extraction",
"schema": {
"type": "object",
"properties": {
"title": { "type": "string" },
"category": {
"type": "string",
"enum": ["Politics", "Sports", "Technology", "Health", "Entertainment", "Business"]
}
}
},
"strict": true
}
}'
) AS meta_data
FROM news_raw
LIMIT 2;

o passo 3: Validar a saída da inferência LLM antes da sumarização

SQL
CREATE OR REFRESH MATERIALIZED VIEW news_validated (
CONSTRAINT valid_title EXPECT (size(split(get_json_object(meta_data, '$.title'), ' ')) >= 3),
CONSTRAINT valid_category EXPECT (get_json_object(meta_data, '$.category') IN ('Politics', 'Sports', 'Technology', 'Health', 'Entertainment', 'Business'))
)
COMMENT "Validated news articles ensuring the title has at least 3 words and the category is valid."
AS
SELECT *
FROM news_categorized;

o passo 4: Resuma as notícias a partir dos dados validados

SQL
CREATE OR REFRESH MATERIALIZED VIEW news_summarized
COMMENT "Summarized political news articles after validation."
AS
SELECT
get_json_object(meta_data, '$.category') as category,
get_json_object(meta_data, '$.title') as title,
ai_query(
"databricks-meta-llama-3-3-70b-instruct",
"Summarize the following political news article in 2-3 sentences: " || inputs
) AS summary
FROM news_validated;

Automatize o trabalho de inferência de lotes usando Databricks fluxo de trabalho

programar lotes de inferência Job e automatizar pipeline AI .

SQL
SELECT
*,
ai_query('databricks-meta-llama-3-3-70b-instruct', request => concat("You are an opinion mining service. Given a piece of text, output an array of json results that extracts key user opinions, a classification, and a Positive, Negative, Neutral, or Mixed sentiment about that subject.
AVAILABLE CLASSIFICATIONS
Quality, Service, Design, Safety, Efficiency, Usability, Price
Examples below:
DOCUMENT
I got soup. It really did take only 20 minutes to make some pretty good soup. The noises it makes when it's blending are somewhat terrifying, but it gives a little beep to warn you before it does that. It made three or four large servings of soup. It's a single layer of steel, so the outside gets pretty hot. It can be hard to unplug the lid without knocking the blender against the side, which is not a nice sound. The soup was good and the recipes it comes with look delicious, but I'm not sure I'll use it often. 20 minutes of scary noises from the kitchen when I already need comfort food is not ideal for me. But if you aren't sensitive to loud sounds it does exactly what it says it does..
RESULT
[
{'Classification': 'Efficiency', 'Comment': 'only 20 minutes','Sentiment': 'Positive'},
{'Classification': 'Quality','Comment': 'pretty good soup','Sentiment': 'Positive'},
{'Classification': 'Usability', 'Comment': 'noises it makes when it's blending are somewhat terrifying', 'Sentiment': 'Negative'},
{'Classification': 'Safety','Comment': 'outside gets pretty hot','Sentiment': 'Negative'},
{'Classification': 'Design','Comment': 'Hard to unplug the lid without knocking the blender against the side, which is not a nice sound', 'Sentiment': 'Negative'}
]
DOCUMENT
", REVIEW_TEXT, '\n\nRESULT\n')) as result
FROM catalog.schema.product_reviews
LIMIT 10

AI Functions usando transmissão estruturada

Aplique inferência AI em cenários quase tempo real ou microlotes usando ai_query e transmissão estruturada.

o passo 1. Leia sua tabela Delta estática

Leia sua tabela Delta estática como se fosse uma transmissão.

Python

from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.getOrCreate()

# Spark processes all existing rows exactly once in the first micro-batch.
df = spark.table("enterprise.docs") # Replace with your table name containing enterprise documents
df.repartition(50).write.format("delta").mode("overwrite").saveAsTable("enterprise.docs")
df_stream = spark.readStream.format("delta").option("maxBytesPerTrigger", "50K").table("enterprise.docs")

# Define the prompt outside the SQL expression.
prompt = (
"You are provided with an enterprise document. Summarize the key points in a concise paragraph. "
"Do not include extra commentary or suggestions. Document: "
)

o passo 2. Aplicar ai_query

O Spark processa isso apenas uma vez para dados estáticos, a menos que novas linhas cheguem à tabela.

Python

df_transformed = df_stream.select(
"document_text",
F.expr(f"""
ai_query(
'databricks-meta-llama-3-1-8b-instruct',
CONCAT('{prompt}', document_text)
)
""").alias("summary")
)

o passo 3: Escreva a saída resumida

Escreva a saída resumida em outra tabela Delta

Python

# Time-based triggers apply, but only the first trigger processes all existing static data.
query = df_transformed.writeStream \
.format("delta") \
.option("checkpointLocation", "/tmp/checkpoints/_docs_summary") \
.outputMode("append") \
.toTable("enterprise.docs_summary")

query.awaitTermination()