Pular para o conteúdo principal

Caso de uso do RAG downstream

info

Beta

O conector do SharePoint está em versão Beta. Os administradores do espaço de trabalho podem controlar o acesso a este recurso na página de Pré-visualizações . Veja as prévias do Gerenciador Databricks.

Agora que criou seu pipeline do SharePoint, o senhor pode analisar os documentos brutos para texto, dividindo os dados analisados em pedaços, criando embeddings a partir dos pedaços e muito mais. Em seguida, o senhor pode usar readStream na tabela de saída diretamente no pipeline downstream.

Analisar documentos não estruturados

Muitas cargas de trabalho subsequentes de RAG (Registro de Agregação de Dados) e de compreensão de documentos exigem a conversão de arquivos brutos não estruturados (como PDFs, PPTX, documentos do Word e imagens) em representações estruturadas e consultáveis. Databricks fornece ai_parse_document, uma função integrada que extrai automaticamente texto, tabelas, informações de disposição, metadados e outros sinais estruturados do conteúdo de arquivos binários.

Você pode aplicar ai_parse_document diretamente à coluna inline_content produzida pelo pipeline de ingestão do SharePoint. Essa é a abordagem recomendada para a maioria dos casos de uso subsequentes não estruturados, incluindo geração aumentada por recuperação (RAG), classificação, extração de entidades e construção de agentes centrados em documentos.

Para mais informações, consulte ai_parse_document.

Exemplo: Transformar arquivos do SharePoint

Você pode transformar incrementalmente seus arquivos do SharePoint ingeridos em saídas estruturadas e analisadas usando o pipeline declarativo LakeFlow Spark (por exemplo, visualização materializada ou tabelas de transmissão). O exemplo a seguir mostra como criar uma view materializada que analisa cada documento recém-chegado:

SQL
CREATE OR REFRESH MATERIALIZED VIEW documents_parsed
AS
SELECT
*,
ai_parse_document(content.inline_content) AS parsed
FROM <your_catalog>.<your_schema>.<your_destination_table>;

Essa view mantém suas representações de documentos analisados atualizadas à medida que novos arquivos chegam pelo pipeline de ingestão do SharePoint. A coluna parsed pode então ser usada para seus casos de uso subsequentes.

Acesse o conteúdo de arquivos individuais.

Se você preferir trabalhar diretamente com arquivos, por exemplo, ao integrar com bibliotecas ou ferramentas personalizadas, Databricks fornece UDFs adicionais de acesso a arquivos que podem ser executadas na tabela de saída do pipeline de ingestão.

Nome

Descrição

read_blob_as_file(coluna blob, coluna de nome de arquivo)

faz o download do arquivo para o disco local e retorna o caminho do arquivo.

read_blob_as_bytes(coluna blob)

baixa o arquivo para o disco local e retorna os dados como uma matriz de bytes.

Configurar UDFs de acesso a arquivos

Para configurar UDFs de acesso a arquivos, adicione a seguinte célula ao seu pipeline downstream:

Python
# DO NOT MODIFY this cell.

from pyspark.sql.functions import udf, struct
from pyspark.sql.types import BinaryType

# Copy to local disk and get file path.

def copy_to_disk(blob, filename) -> str:
fname = "/local_disk0/tmp/" + filename
with open(fname, "wb") as f:
f.write(blob.inline_content)
return fname

read_blob_as_file = udf(copy_to_disk)

# Get bytes directly.

def get_bytes(blob) -> bytes:
return blob.inline_content

read_blob_as_bytes = udf(get_bytes, BinaryType())

Exemplos de acesso a arquivos

Para retornar o caminho do arquivo:

Python
# Suppose you have a simple UDF that converts a file's raw bytes to a UTF-8 string.

def file_bytes_to_text(fname):
with open(fname, "rb") as f:
return f.read().decode("utf-8")
file_bytes_to_text_udf = udf(file_bytes_to_text)

# Chain your UDF with the file access UDF for the file path.

df.withColumn("text_content",
file_bytes_to_text_udf(read_blob_as_file("content",
"file_metadata.name"))).collect()

Para retornar os dados como uma matriz de bytes:

Python
# Suppose you have a simple UDF that converts a file's raw bytes to a UTF-8 string.

def bytes_to_text(bytes_data):
return bytes_data.decode("utf-8")
bytes_to_text_udf = udf(bytes_to_text)

# Chain your UDF with the file access UDF for the byte array.

df.withColumn("text_content",
bytes_to_text_udf(read_blob_as_bytes("content"))).collect()
nota

Os UDFs de acesso a arquivos não conseguem lidar com o conteúdo de arquivos maiores que 100 MB. Você deve filtrar essas linhas antes de usar as UDFs de acesso ao arquivo.

Como o caminho do arquivo UDF grava no disco local, ele só funciona em clustering de usuário único. Se o senhor quiser executar o downstream pipeline no clustering clássico ou serverless compute , poderá atualizar o UDF para gravar em um volume Unity Catalog em vez de no disco local. No entanto, isso reduzirá o desempenho.

Para gravar em um volume:

Python
# Update the volume_path in the function below.
from pyspark.sql.functions import udf, struct


# copy to volume_path and get file path
def copy_to_disk(blob, filename) -> str:
# UPDATE THIS VALUE
volume_path = "/Volumes/<my_catalog>/<my schema>/<my volume name>/"


fname = volume_path + filename
with open(fname, "wb") as f:
f.write(blob.inline_content)
return fname


read_blob_as_file = udf(copy_to_disk)