Realize a inferência de lotes LLM usando AI Functions
Visualização
Essa funcionalidade está na Pré-visualização Pública e é suportada em us-east1
e us-central1
.
Este artigo descreve como realizar a inferência de lotes usando AI Functions em escala. Os exemplos deste artigo são recomendados para cenários de produção, como o pipeline de inferência de lotes implantado como fluxo de trabalho programado e o uso do site ai_query
e de um modelo de fundação hospedado em Databrickspara transmissão estruturada.
Para começar a usar o AI Functions, a Databricks recomenda o uso de uma das seguintes opções:
Requisitos
- Um workspace em uma região com suporte do Foundation Model APIs.
- Databricks Runtime 15.4 ou superior é recomendado.
- Permissão de consulta na tabela Delta no Unity Catalog que contém os dados que o senhor deseja usar.
- Defina o
pipelines.channel
nas propriedades da tabela como 'visualização' para usarai_query()
. Consulte Requisitos para ver um exemplo de consulta. - Para cargas de trabalho de inferência de lotes usando AI Functions, Databricks recomenda Databricks Runtime 15.4 ML LTS para melhorar o desempenho.
lotes LLM inference using tarefa-specific AI Functions
O senhor pode executar lotes de inferência usando funções específicas da tarefa AI. Consulte o pipeline de inferência de lotes implantados para obter orientação sobre como incorporar a função AI específica de sua tarefa em um pipeline.
A seguir, um exemplo de uso da função AI específica da tarefa, ai_translate
:
SELECT
writer_summary,
ai_translate(writer_summary, "cn") as cn_translation
from user.batch.news_summaries
limit 500
;
lotes LLM inference using ai_query
O senhor pode usar a função de propósito geral AI, ai_query
para realizar a inferência de lotes. Veja quais tipos de modelo e os modelos associados são compatíveis com o ai_query
.
Os exemplos desta seção concentram-se na flexibilidade do ai_query
e em como usá-lo no pipeline de inferência de lotes e no fluxo de trabalho.
ai_query
e modelos de base hospedados pelo Databricks
Quando o senhor usa um modelo de fundação hospedado e pré-provisionado pelo Databrickspara inferência de lotes, o Databricks configura um provisionamento Taxa de transferência endpoint em seu nome que escala automaticamente com base na carga de trabalho.
Para usar esse método para inferência de lotes, especifique o seguinte em sua solicitação:
- O pré-provisionamento LLM que o senhor deseja usar em
ai_query
. Selecione entre os LLMs de pré-provisionamento compatíveis. Esses LLMs de pré-provisionamento estão sujeitos a licenças permissivas e políticas de uso, consulte Licenças e termos de desenvolvedor de modelos aplicáveis. - A tabela de entrada e a tabela de saída do Unity Catalog.
- O prompt do modelo e quaisquer parâmetros do modelo.
SELECT text, ai_query(
"databricks-meta-llama-3-1-8b-instruct",
"Summarize the given text comprehensively, covering key points and main ideas concisely while retaining relevant details and examples. Ensure clarity and accuracy without unnecessary repetition or omissions: " || text
) AS summary
FROM uc_catalog.schema.table;
ai_query
e modelos de fundação personalizados ou ajustados
Os exemplos do Notebook nesta seção demonstram cargas de trabalho de inferência de lotes que usam modelos de base personalizados ou ajustados para processar várias entradas. Os exemplos exigem um modelo de serviço existente endpoint que use o modelo da Fundação APIs provisionamento Taxa de transferência.
LLM inferência de lotes usando um modelo de fundação personalizado
O exemplo de notebook a seguir cria um provisionamento Taxa de transferência endpoint e execução lotes LLM inferência usando Python e o modelo Meta Llama 3.1 70B. Ele também oferece orientação sobre o benchmarking de sua carga de trabalho de inferência de lotes e a criação de um provisionamento Taxa de transferência servindo o modelo endpoint.
LLM inferência de lotes com um modelo de fundação personalizado e um provisionamento Taxa de transferência endpoint Notebook
LLM inferência de lotes usando um modelo de embeddings
O exemplo de Notebook a seguir cria um provisionamento Taxa de transferência endpoint e execução lotes LLM inferência usando Python e o modelo de embeddings GTE Large (inglês) ou BGE Large (inglês) que o senhor escolher.
LLM lotes inference embeddings with a provisionamento Taxa de transferência endpoint Notebook
inferência de lotes usando BERT para reconhecimento de entidades nomeadas
O Notebook a seguir mostra um exemplo tradicional de inferência de lotes do modelo ML usando BERT.
inferência de lotes usando BERT para reconhecimento de entidades nomeadas Notebook
implantado lotes inference pipeline
Esta seção mostra como o senhor pode integrar o AI Functions a outros dados do Databricks e ao produto AI para criar um pipeline completo de inferência de lotes. Esse pipeline pode executar um fluxo de trabalho de ponta a ponta que inclui ingestão, pré-processamento, inferência e pós-processamento. O pipeline pode ser criado em SQL ou Python e implantado como:
- Pipelines DLT
- Fluxo de trabalho programado usando Databricks fluxo de trabalho
- transmissão inference fluxo de trabalho usando transmissão estructurada
Realizar inferência incremental de lotes em DLT
O exemplo a seguir executa a inferência de lotes incremental usando DLT para quando os dados são atualizados continuamente.
Etapa 1: ingerir dados brutos de notícias de um volume
- SQL
- Python
CREATE OR REFRESH STREAMING TABLE news_raw
COMMENT "Raw news articles ingested from volume."
AS SELECT *
FROM STREAM(read_files(
'/Volumes/databricks_news_summarization_benchmarking_data/v01/csv',
format => 'csv',
header => true,
mode => 'PERMISSIVE',
multiLine => 'true'
));
Importe o pacote e defina o esquema JSON da resposta LLM como uma variável Python
import dlt
from pyspark.sql.functions import expr, get_json_object, concat
news_extraction_schema = (
'{"type": "json_schema", "json_schema": {"name": "news_extraction", '
'"schema": {"type": "object", "properties": {"title": {"type": "string"}, '
'"category": {"type": "string", "enum": ["Politics", "Sports", "Technology", '
'"Health", "Entertainment", "Business"], "strict": true}}'
)
Faça a ingestão de seus dados a partir de um volume do Unity Catalog.
@dlt.table(
comment="Raw news articles ingested from volume."
)
def news_raw():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", True)
.option("mode", "PERMISSIVE")
.option("multiLine", "true")
.load("/Volumes/databricks_news_summarization_benchmarking_data/v01/csv")
)
Etapa 2: Aplicar a inferência LLM para extrair o título e a categoria
- SQL
- Python
CREATE OR REFRESH MATERIALIZED VIEW news_categorized
COMMENT "Extract category and title from news articles using LLM inference."
AS
SELECT
inputs,
ai_query(
"databricks-meta-llama-3-3-70b-instruct",
"Extract the category of the following news article: " || inputs,
responseFormat => '{
"type": "json_schema",
"json_schema": {
"name": "news_extraction",
"schema": {
"type": "object",
"properties": {
"title": { "type": "string" },
"category": {
"type": "string",
"enum": ["Politics", "Sports", "Technology", "Health", "Entertainment", "Business"]
}
}
},
"strict": true
}
}'
) AS meta_data
FROM news_raw
LIMIT 2;
@dlt.table(
comment="Extract category and title from news articles using LLM inference."
)
def news_categorized():
# Limit the number of rows to 2 as in the SQL version
df_raw = spark.read.table("news_raw").limit(2)
# Inject the JSON schema variable into the ai_query call using an f-string.
return df_raw.withColumn(
"meta_data",
expr(
f"ai_query('databricks-meta-llama-3-3-70b-instruct', "
f"concat('Extract the category of the following news article: ', inputs), "
f"responseFormat => '{news_extraction_schema}')"
)
)
Etapa 3: validar o resultado da inferência LLM antes da compactação
- SQL
- Python
CREATE OR REFRESH MATERIALIZED VIEW news_validated (
CONSTRAINT valid_title EXPECT (size(split(get_json_object(meta_data, '$.title'), ' ')) >= 3),
CONSTRAINT valid_category EXPECT (get_json_object(meta_data, '$.category') IN ('Politics', 'Sports', 'Technology', 'Health', 'Entertainment', 'Business'))
)
COMMENT "Validated news articles ensuring the title has at least 3 words and the category is valid."
AS
SELECT *
FROM news_categorized;
@dlt.table(
comment="Validated news articles ensuring the title has at least 3 words and the category is valid."
)
@dlt.expect("valid_title", "size(split(get_json_object(meta_data, '$.title'), ' ')) >= 3")
@dlt.expect_or_fail("valid_category", "get_json_object(meta_data, '$.category') IN ('Politics', 'Sports', 'Technology', 'Health', 'Entertainment', 'Business')")
def news_validated():
return spark.read.table("news_categorized")
Etapa 4: Resumir artigos de notícias dos dados validados
- SQL
- Python
CREATE OR REFRESH MATERIALIZED VIEW news_summarized
COMMENT "Summarized political news articles after validation."
AS
SELECT
get_json_object(meta_data, '$.category') as category,
get_json_object(meta_data, '$.title') as title,
ai_query(
"databricks-meta-llama-3-3-70b-instruct",
"Summarize the following political news article in 2-3 sentences: " || inputs
) AS summary
FROM news_validated;
@dlt.table(
comment="Summarized political news articles after validation."
)
def news_summarized():
df = spark.read.table("news_validated")
return df.select(
get_json_object("meta_data", "$.category").alias("category"),
get_json_object("meta_data", "$.title").alias("title"),
expr(
"ai_query('databricks-meta-llama-3-3-70b-instruct', "
"concat('Summarize the following political news article in 2-3 sentences: ', inputs))"
).alias("summary")
)
Automatize o trabalho de inferência de lotes usando Databricks fluxo de trabalho
Programar lotes de inferência Job e automatizar o pipeline AI.
- SQL
- Python
SELECT
*,
ai_query('databricks-meta-llama-3-3-70b-instruct', request => concat("You are an opinion mining service. Given a piece of text, output an array of json results that extracts key user opinions, a classification, and a Positive, Negative, Neutral, or Mixed sentiment about that subject.
AVAILABLE CLASSIFICATIONS
Quality, Service, Design, Safety, Efficiency, Usability, Price
Examples below:
DOCUMENT
I got soup. It really did take only 20 minutes to make some pretty good soup. The noises it makes when it's blending are somewhat terrifying, but it gives a little beep to warn you before it does that. It made three or four large servings of soup. It's a single layer of steel, so the outside gets pretty hot. It can be hard to unplug the lid without knocking the blender against the side, which is not a nice sound. The soup was good and the recipes it comes with look delicious, but I'm not sure I'll use it often. 20 minutes of scary noises from the kitchen when I already need comfort food is not ideal for me. But if you aren't sensitive to loud sounds it does exactly what it says it does..
RESULT
[
{'Classification': 'Efficiency', 'Comment': 'only 20 minutes','Sentiment': 'Positive'},
{'Classification': 'Quality','Comment': 'pretty good soup','Sentiment': 'Positive'},
{'Classification': 'Usability', 'Comment': 'noises it makes when it's blending are somewhat terrifying', 'Sentiment': 'Negative'},
{'Classification': 'Safety','Comment': 'outside gets pretty hot','Sentiment': 'Negative'},
{'Classification': 'Design','Comment': 'Hard to unplug the lid without knocking the blender against the side, which is not a nice sound', 'Sentiment': 'Negative'}
]
DOCUMENT
", REVIEW_TEXT, '\n\nRESULT\n')) as result
FROM catalog.schema.product_reviews
LIMIT 10
import json
from pyspark.sql.functions import expr
# Define the opinion mining prompt as a multi-line string.
opinion_prompt = """You are an opinion mining service. Given a piece of text, output an array of json results that extracts key user opinions, a classification, and a Positive, Negative, Neutral, or Mixed sentiment about that subject.
AVAILABLE CLASSIFICATIONS
Quality, Service, Design, Safety, Efficiency, Usability, Price
Examples below:
DOCUMENT
I got soup. It really did take only 20 minutes to make some pretty good soup.The noises it makes when it's blending are somewhat terrifying, but it gives a little beep to warn you before it does that.It made three or four large servings of soup.It's a single layer of steel, so the outside gets pretty hot. It can be hard to unplug the lid without knocking the blender against the side, which is not a nice sound.The soup was good and the recipes it comes with look delicious, but I'm not sure I'll use it often. 20 minutes of scary noises from the kitchen when I already need comfort food is not ideal for me. But if you aren't sensitive to loud sounds it does exactly what it says it does.
RESULT
[
{'Classification': 'Efficiency', 'Comment': 'only 20 minutes','Sentiment': 'Positive'},
{'Classification': 'Quality','Comment': 'pretty good soup','Sentiment': 'Positive'},
{'Classification': 'Usability', 'Comment': 'noises it makes when it's blending are somewhat terrifying', 'Sentiment': 'Negative'},
{'Classification': 'Safety','Comment': 'outside gets pretty hot','Sentiment': 'Negative'},
{'Classification': 'Design','Comment': 'Hard to unplug the lid without knocking the blender against the side, which is not a nice sound', 'Sentiment': 'Negative'}
]
DOCUMENT
"""
# Escape the prompt so it can be safely embedded in the SQL expression.
escaped_prompt = json.dumps(opinion_prompt)
# Read the source table and limit to 10 rows.
df = spark.table("catalog.schema.product_reviews").limit(10)
# Apply the LLM inference to each row, concatenating the prompt, the review text, and the tail string.
result_df = df.withColumn(
"result",
expr(f"ai_query('databricks-meta-llama-3-3-70b-instruct', request => concat({escaped_prompt}, REVIEW_TEXT, '\\n\\nRESULT\\n'))")
)
# Display the result DataFrame.
display(result_df)
AI Functions usando transmissão estruturada
Aplique a inferência AI em cenários near tempo real ou micro-lotes usando ai_query
e transmissão estruturada.
Etapa 1. Leia sua tabela Delta estática
Leia sua tabela estática Delta como se fosse uma transmissão.
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.getOrCreate()
# Spark processes all existing rows exactly once in the first micro-batch.
df = spark.table("enterprise.docs") # Replace with your table name containing enterprise documents
df.repartition(50).write.format("delta").mode("overwrite").saveAsTable("enterprise.docs")
df_stream = spark.readStream.format("delta").option("maxBytesPerTrigger", "50K").table("enterprise.docs")
# Define the prompt outside the SQL expression.
prompt = (
"You are provided with an enterprise document. Summarize the key points in a concise paragraph. "
"Do not include extra commentary or suggestions. Document: "
)
Etapa 2. Aplique ai_query
O Spark processa isso apenas uma vez para dados estáticos, a menos que novas linhas cheguem à tabela.
df_transformed = df_stream.select(
"document_text",
F.expr(f"""
ai_query(
'databricks-meta-llama-3-1-8b-instruct',
CONCAT('{prompt}', document_text)
)
""").alias("summary")
)
Etapa 3: escrever a saída resumida
Escreva a saída resumida em outra tabela Delta
# Time-based triggers apply, but only the first trigger processes all existing static data.
query = df_transformed.writeStream \
.format("delta") \
.option("checkpointLocation", "/tmp/checkpoints/_docs_summary") \
.outputMode("append") \
.toTable("enterprise.docs_summary")
query.awaitTermination()