Caso de uso do RAG downstream
Visualização
O conector do Microsoft SharePoint está em versão beta.
Agora que criou seu pipeline do SharePoint, o senhor pode analisar os documentos brutos para texto, dividindo os dados analisados em pedaços, criando embeddings a partir dos pedaços e muito mais. Em seguida, o senhor pode usar readStream
na tabela de saída diretamente no pipeline downstream.
Para acessar os dados do arquivo, estamos fornecendo as seguintes UDFs de acesso ao arquivo. O senhor pode executar esses UDFs na tabela de saída a partir da ingestão pipeline.
Nome | Descrição |
---|---|
| faz o download do arquivo para o disco local e retorna o caminho do arquivo. |
| baixa o arquivo para o disco local e retorna os dados como uma matriz de bytes. |
Configurar UDFs de acesso a arquivos
Adicione a seguinte célula ao seu pipeline de downstream:
# DO NOT MODIFY this cell.
from pyspark.sql.functions import udf, struct
from pyspark.sql.types import BinaryType
# Copy to local disk and get file path.
def copy_to_disk(blob, filename) -> str:
fname = "/local_disk0/tmp/" + filename
with open(fname, "wb") as f:
f.write(blob.inline_content)
return fname
read_blob_as_file = udf(copy_to_disk)
# Get bytes directly.
def get_bytes(blob) -> bytes:
return blob.inline_content
read_blob_as_bytes = udf(get_bytes, BinaryType())
Exemplos
Para retornar o caminho do arquivo:
# Suppose you have a simple UDF that converts a file's raw bytes to a UTF-8 string.
def file_bytes_to_text(fname):
with open(fname, "rb") as f:
return f.read().decode("utf-8")
file_bytes_to_text_udf = udf(file_bytes_to_text)
# Chain your UDF with the file access UDF for the file path.
df.withColumn("text_content", file_bytes_to_text_udf(read_blob_as_file("content", "file_metadata.name"))).collect()
Para retornar os dados como uma matriz de bytes:
# Suppose you have a simple UDF that converts a file's raw bytes to a UTF-8 string.
def bytes_to_text(bytes_data):
return bytes_data.decode("utf-8")
bytes_to_text_udf = udf(bytes_to_text)
# Chain your UDF with the file access UDF for the byte array.
df.withColumn("text_content", bytes_to_text_udf(read_blob_as_bytes("content"))).collect()
Os UDFs de acesso a arquivos não conseguem lidar com o conteúdo de arquivos maiores que 100 MB. Você deve filtrar essas linhas antes de usar as UDFs de acesso ao arquivo.
Como o caminho do arquivo UDF grava no disco local, ele só funciona em clustering de usuário único. Se o senhor quiser executar o downstream pipeline no clustering clássico ou serverless compute , poderá atualizar o UDF para gravar em um volume Unity Catalog em vez de no disco local. No entanto, isso reduzirá o desempenho.
Para gravar em um volume:
# Update the volume_path in the function below.
from pyspark.sql.functions import udf, struct
# copy to volume_path and get file path
def copy_to_disk(blob, filename) -> str:
# UPDATE THIS VALUE
volume_path = "/Volumes/<my_catalog>/<my schema>/<my volume name>/"
fname = volume_path + filename
with open(fname, "wb") as f:
f.write(blob.inline_content)
return fname
read_blob_as_file = udf(copy_to_disk)