Skip to main content

Downstream RAG use case

Preview

The Microsoft SharePoint connector is in Beta.

Now that you've created your SharePoint pipeline, you can parse the raw documents to text, chunking the parsed data, creating embeddings from the chunks, and more. You can then use readStream on the output table directly in your downstream pipeline.

To access the file data, we’re providing the following file access UDFs. You can run these UDFs on the output table from the ingestion pipeline.

Name

Description

read_blob_as_file(blob column, file name column)

Downloads the file to the local disk and returns the file path.

read_blob_as_bytes(blob column)

Downloads the file to the local disk and returns the data as an array of bytes.

Set up file access UDFs

Add the following cell to your downstream pipeline:

Python
# DO NOT MODIFY this cell.

from pyspark.sql.functions import udf, struct
from pyspark.sql.types import BinaryType

# Copy to local disk and get file path.

def copy_to_disk(blob, filename) -> str:
fname = "/local_disk0/tmp/" + filename
with open(fname, "wb") as f:
f.write(blob.inline_content)
return fname

read_blob_as_file = udf(copy_to_disk)

# Get bytes directly.

def get_bytes(blob) -> bytes:
return blob.inline_content

read_blob_as_bytes = udf(get_bytes, BinaryType())

Examples

To return the file path:

Python
# Suppose you have a simple UDF that converts a file's raw bytes to a UTF-8 string.

def file_bytes_to_text(fname):
with open(fname, "rb") as f:
return f.read().decode("utf-8")
file_bytes_to_text_udf = udf(file_bytes_to_text)

# Chain your UDF with the file access UDF for the file path.

df.withColumn("text_content", file_bytes_to_text_udf(read_blob_as_file("content", "file_metadata.name"))).collect()

To return the data as an array of bytes:

Python
# Suppose you have a simple UDF that converts a file's raw bytes to a UTF-8 string.

def bytes_to_text(bytes_data):
return bytes_data.decode("utf-8")
bytes_to_text_udf = udf(bytes_to_text)

# Chain your UDF with the file access UDF for the byte array.

df.withColumn("text_content", bytes_to_text_udf(read_blob_as_bytes("content"))).collect()
note

The file access UDFs can't handle file content for files that are larger than 100 MB. You must filter out these rows before using the file access UDFs.

Because the file path UDF writes to the local disk, it only works on single-user clusters. If you want to run the downstream pipeline on classic clusters or serverless compute instead, you can update the UDF to write to a Unity Catalog volume instead of to your local disk. However, this will slow performance.

To write to a volume:

Python
# Update the volume_path in the function below.
from pyspark.sql.functions import udf, struct


# copy to volume_path and get file path
def copy_to_disk(blob, filename) -> str:
# UPDATE THIS VALUE
volume_path = "/Volumes/<my_catalog>/<my schema>/<my volume name>/"


fname = volume_path + filename
with open(fname, "wb") as f:
f.write(blob.inline_content)
return fname


read_blob_as_file = udf(copy_to_disk)