Downstream RAG use case
The managed SharePoint connector is in Beta. Workspace admins can control access to this feature from the Previews page. See Manage Databricks previews.
Now that you've created your SharePoint pipeline, you can parse the raw documents to text, chunking the parsed data, creating embeddings from the chunks, and more. You can then use readStream on the output table directly in your downstream pipeline.
Parse unstructured documents
Many downstream RAG and document-understanding workloads require converting raw unstructured files (such as PDFs, PPTX, Word documents, and images) into structured, queryable representations. Databricks provides ai_parse_document, a built-in function that automatically extracts text, tables, layout information, metadata, and other structured signals from binary file content.
You can apply ai_parse_document directly to the inline_content column produced by the SharePoint ingestion pipeline. This is the recommended approach for most unstructured downstream use cases, including retrieval-augmented generation (RAG), classification, entity extraction, and building document-centric agents.
For more information, see ai_parse_document.
Example: Transform Sharepoint files
You can incrementally transform your ingested SharePoint files into parsed, structured outputs using Lakeflow Spark Declarative Pipelines (for example, materialized views, or streaming tables). The following example shows how to create a materialized view that parses each newly-arrived document:
CREATE OR REFRESH MATERIALIZED VIEW documents_parsed
AS
SELECT
*,
ai_parse_document(content.inline_content) AS parsed
FROM <your_catalog>.<your_schema>.<your_destination_table>;
This view keeps your parsed document representations up to date as new files arrive through the SharePoint ingestion pipeline. The parsed column can then be used for your downstream use cases.
Access individual file content
If you prefer to work with files directly, for example, when integrating with custom libraries or tools, Databricks provides additional file-access UDFs that you can run on the output table from the ingestion pipeline.
Name | Description |
|---|---|
| Downloads the file to the local disk and returns the file path. |
| Downloads the file to the local disk and returns the data as an array of bytes. |
Set up file access UDFs
To set up file-access UDFs, add the following cell to your downstream pipeline:
# DO NOT MODIFY this cell.
from pyspark.sql.functions import udf, struct
from pyspark.sql.types import BinaryType
# Copy to local disk and get file path.
def copy_to_disk(blob, filename) -> str:
fname = "/local_disk0/tmp/" + filename
with open(fname, "wb") as f:
f.write(blob.inline_content)
return fname
read_blob_as_file = udf(copy_to_disk)
# Get bytes directly.
def get_bytes(blob) -> bytes:
return blob.inline_content
read_blob_as_bytes = udf(get_bytes, BinaryType())
File access examples
To return the file path:
# Suppose you have a simple UDF that converts a file's raw bytes to a UTF-8 string.
def file_bytes_to_text(fname):
with open(fname, "rb") as f:
return f.read().decode("utf-8")
file_bytes_to_text_udf = udf(file_bytes_to_text)
# Chain your UDF with the file access UDF for the file path.
df.withColumn("text_content",
file_bytes_to_text_udf(read_blob_as_file("content",
"file_metadata.name"))).collect()
To return the data as an array of bytes:
# Suppose you have a simple UDF that converts a file's raw bytes to a UTF-8 string.
def bytes_to_text(bytes_data):
return bytes_data.decode("utf-8")
bytes_to_text_udf = udf(bytes_to_text)
# Chain your UDF with the file access UDF for the byte array.
df.withColumn("text_content",
bytes_to_text_udf(read_blob_as_bytes("content"))).collect()
The file access UDFs can't handle file content for files that are larger than 100 MB. You must filter out these rows before using the file access UDFs.
Because the file path UDF writes to the local disk, it only works on single-user clusters. If you want to run the downstream pipeline on classic clusters or serverless compute instead, you can update the UDF to write to a Unity Catalog volume instead of to your local disk. However, this will slow performance.
To write to a volume:
# Update the volume_path in the function below.
from pyspark.sql.functions import udf, struct
# copy to volume_path and get file path
def copy_to_disk(blob, filename) -> str:
# UPDATE THIS VALUE
volume_path = "/Volumes/<my_catalog>/<my schema>/<my volume name>/"
fname = volume_path + filename
with open(fname, "wb") as f:
f.write(blob.inline_content)
return fname
read_blob_as_file = udf(copy_to_disk)