Downstream RAG use case
The Microsoft SharePoint connector is in Beta.
Now that you've created your SharePoint pipeline, you can parse the raw documents to text, chunking the parsed data, creating embeddings from the chunks, and more. You can then use readStream
on the output table directly in your downstream pipeline.
To access the file data, we’re providing the following file access UDFs. You can run these UDFs on the output table from the ingestion pipeline.
Name | Description |
---|---|
| Downloads the file to the local disk and returns the file path. |
| Downloads the file to the local disk and returns the data as an array of bytes. |
Set up file access UDFs
Add the following cell to your downstream pipeline:
# DO NOT MODIFY this cell.
from pyspark.sql.functions import udf, struct
from pyspark.sql.types import BinaryType
# Copy to local disk and get file path.
def copy_to_disk(blob, filename) -> str:
fname = "/local_disk0/tmp/" + filename
with open(fname, "wb") as f:
f.write(blob.inline_content)
return fname
read_blob_as_file = udf(copy_to_disk)
# Get bytes directly.
def get_bytes(blob) -> bytes:
return blob.inline_content
read_blob_as_bytes = udf(get_bytes, BinaryType())
Examples
To return the file path:
# Suppose you have a simple UDF that converts a file's raw bytes to a UTF-8 string.
def file_bytes_to_text(fname):
with open(fname, "rb") as f:
return f.read().decode("utf-8")
file_bytes_to_text_udf = udf(file_bytes_to_text)
# Chain your UDF with the file access UDF for the file path.
df.withColumn("text_content", file_bytes_to_text_udf(read_blob_as_file("content", "file_metadata.name"))).collect()
To return the data as an array of bytes:
# Suppose you have a simple UDF that converts a file's raw bytes to a UTF-8 string.
def bytes_to_text(bytes_data):
return bytes_data.decode("utf-8")
bytes_to_text_udf = udf(bytes_to_text)
# Chain your UDF with the file access UDF for the byte array.
df.withColumn("text_content", bytes_to_text_udf(read_blob_as_bytes("content"))).collect()
The file access UDFs can't handle file content for files that are larger than 100 MB. You must filter out these rows before using the file access UDFs.
Because the file path UDF writes to the local disk, it only works on single-user clusters. If you want to run the downstream pipeline on classic clusters or serverless compute instead, you can update the UDF to write to a Unity Catalog volume instead of to your local disk. However, this will slow performance.
To write to a volume:
# Update the volume_path in the function below.
from pyspark.sql.functions import udf, struct
# copy to volume_path and get file path
def copy_to_disk(blob, filename) -> str:
# UPDATE THIS VALUE
volume_path = "/Volumes/<my_catalog>/<my schema>/<my volume name>/"
fname = volume_path + filename
with open(fname, "wb") as f:
f.write(blob.inline_content)
return fname
read_blob_as_file = udf(copy_to_disk)