Step 6 (pipelines). Implement data pipeline fixes
Follow these steps to modify your data pipeline and run it to:
Create a new vector index.
Create an MLflow run with the data pipeline’s metadata.
The resulting MLflow run is referenced by the `B_quality_iteration/02_evaluate_fixes` notebook.
There are two approaches to modifying the data pipeline:
Implement a single fix at a time In this approach, you configure and run a single data pipeline at once. This mode is best if you want to try a single embedding model and test out a single new parser. Databricks suggests starting here to get familiar with these notebooks.
Implement multiple fix at once In this approach, also called a sweep, you, in parallel, run multiple data pipelines that each have a different configuration. This mode is best if you want to “sweep” across many different strategies, for example, evaluate three PDF parsers or evaluate many different chunk sizes.
See the GitHub repository for the sample code in this section.
Approach 1: Implement a single fix at a time
Open the B_quality_iteration/data_pipeline_fixes/single_fix/00_config notebook
Follow the instructions in one of the below:
Follow the instructions to implement a new configuration provided by this cookbook.
Follow the steps to implement custom code for a parsing or chunking.
Run the pipeline, by either:
Opening & running the 00_Run_Entire_Pipeline notebook.
Following the steps to run each step of the pipeline manually.
Add the name of the resulting MLflow Run that is outputted to the
DATA_PIPELINE_FIXES_RUN_NAMES
variable in B_quality_iteration/02_evaluate_fixes notebook
Note
The data preparation pipeline employs Spark Structured Streaming to incrementally load and process files. This entails that files already loaded and prepared are tracked in checkpoints and won’t be reprocessed. Only newly added files will be loaded, prepared, and appended to the corresponding tables.
Therefore, if you wish to rerun the entire pipeline from scratch and reprocess all documents, you need to delete the checkpoints and tables. You can accomplish this by using the reset_tables_and_checkpoints notebook.
Approach 2: Implement multiple fix at once
Open the B_quality_iteration/data_pipeline_fixes/multiple_fixes/00_Run_Multiple_Pipelines notebook.
Follow the instructions in the notebook to add two or more configurations of the data pipeline to run.
Run the notebook to execute these pipelines.
Add the names of the resulting MLflow runs that are outputted to the
DATA_PIPELINE_FIXES_RUN_NAMES
variable in B_quality_iteration/02_evaluate_fixes notebook.
Appendix
Note
You can find the notebooks referenced below in the single_fix and multiple_fixes directories depending on whether you are implementing a single fix or multiple fixes at a time.
Configuration settings deep dive
The various pre-implemented configuration options for the data pipeline are listed below. Alternatively, you can implement a custom parser/chunker.
vectorsearch_config
: Specify the vector search endpoint (must be up and running) and the name of the index to be created. Additionally, define the synchronization type between the source table and the index (default isTRIGGERED
).embedding_config
: Specify the embedding model to be used, along with the tokenizer. For a complete list of options see the `supporting_configs/embedding_models` notebook. The embedding model has to be deployed to a running model serving endpoint. Depending on chunking strategy, the tokenizer is also during splitting to make sure the chunks do not exceed the token limit of the embedding model. Tokenizers are used here to count the number of tokens in the text chunks to ensure that they don’t exceed the maximum context length of the selected embedding model.
The following shows a tokenizer from HuggingFace:
"embedding_tokenizer": {
"tokenizer_model_name": "BAAI/bge-large-en-v1.5",
"tokenizer_source": "hugging_face",
}
The following shows a tokenizer from TikToken:
"embedding_tokenizer": {
"tokenizer_model_name": "text-embedding-small",
"tokenizer_source": "tiktoken",
}
pipeline_config
: Defines the file parser, chunker and path to the sources field. Parsers and chunkers are defined in theparser_library
andchunker_library
notebooks, respectively. These can be found in the single_fix and multiple_fixes directories. For a complete list of options see thesupporting_configs/parser_chunker_strategies
notebook, which is again available in both the single and multiple fix directories. Different parsers or chunkers may require different configuration parameters where<param x>
represent the potential parameters required for a specific chunker. Parsers can also be passed configuration values using the same format.
"chunker": {
"name": <chunker-name>,
"config": {
"<param 1>": "...",
"<param 2>": "...",
...
}
}
Implementing a custom parser/chunker
This project is structured to facilitate the addition of custom parsers or chunkers to the data preparation pipeline.
Add a new parser
Suppose you want to incorporate a new parser using the PyMuPDF library to transform parsed text into Markdown format. Follow these steps:
Install the required dependencies by adding the following code to the
parser_library
notebook in thesingle_fix
ormultiple_fix
directory:# Dependencies for PyMuPdf %pip install pymupdf pymupdf4llm
In the
parser_library
notebook in thesingle_fix
ormultiple_fix
directory, add a new section for thePyMuPdfMarkdown
parser and implement the parsing function. Ensure the output of the function complies with theParserReturnValue
class defined at the beginning of the notebook. This ensures compatibility with Spark UDFs. Thetry
orexcept
block prevents Spark from failing the entire parsing job due to errors in individual documents when applying the parser as a UDF in02_parse_docs
notebook in thesingle_fix
ormultiple_fix
directory. This notebook will check if parsing failed for any document, quarantine the corresponding rows and raise a warning.import fitz import pymupdf4llm def parse_bytes_pymupdfmarkdown( raw_doc_contents_bytes: bytes, ) -> ParserReturnValue: try: pdf_doc = fitz.Document(stream=raw_doc_contents_bytes, filetype="pdf") md_text = pymupdf4llm.to_markdown(pdf_doc) output = { "num_pages": str(pdf_doc.page_count), "parsed_content": md_text.strip(), } return { OUTPUT_FIELD_NAME: output, STATUS_FIELD_NAME: "SUCCESS", } except Exception as e: warnings.warn(f"Exception {e} has been thrown during parsing") return { OUTPUT_FIELD_NAME: {"num_pages": "", "parsed_content": ""}, STATUS_FIELD_NAME: f"ERROR: {e}", }
Add your new parsing function to the
parser_factory
in theparser_library
notebook in thesingle_fix
ormultiple_fix
directory to make it configurable in thepipeline_config
of the00_config
notebook.In the
02_parse_docs
notebook, parser functions are turned into Spark Python UDFs (arrow-optimized for Databricks Runtime 14.0 or above) and applied to the dataframe containing the new binary PDF files. For testing and development, add a simple testing function to the parser_library notebook that loads the test-document.pdf file and asserts successful parsing:with open("./test_data/test-document.pdf", "rb") as file: file_bytes = file.read() test_result_pymupdfmarkdown = parse_bytes_pymupdfmarkdown(file_bytes) assert test_result_pymupdfmarkdown[STATUS_FIELD_NAME] == "SUCCESS"
Add a new chunker
The process for adding a new chunker follows similar steps to those explained above for a new parser.
Add the required dependencies in the chunker_library notebook.
Add a new section for your chunker and implement a function, e.g.,
chunk_parsed_content_newchunkername
. The output of the new chunker function must be a Python dictionary that complies with theChunkerReturnValue
class defined at the beginning of the chunker_library notebook. The function should accept at least a string of the parsed text to be chunked. If your chunker requires additional parameters, you can add them as function parameters.Add your new chunker to the
chunker_factory
function defined in the chunker_library notebook. If your function accepts additional parameters, use functools’ partial to pre-configure them. This is necessary because UDFs only accept one input parameter, which will be the parsed text in our case. Thechunker_factory
enables you to configure different chunker methods in the pipeline_config and returns a Spark Python UDF (optimized for Databricks Runtime 14.0 and above).Add a simple testing section for your new chunking function. This section should chunk a predefined text provided as a string.
Performance tuning
Spark utilizes partitions to parallelize processing. Data is divided into chunks of rows, and each partition is processed by a single core by default. However, when data is initially read by Apache Spark, it may not create partitions optimized for the desired computation, particularly for our UDFs performing parsing and chunking tasks. It’s crucial to strike a balance between creating partitions that are small enough for efficient parallelization and not so small that the overhead of managing them outweighs the benefits.
You can adjust the number of partitions using df.repartitions(<number of partitions>)
. When applying UDFs, aim for a multiple of the number of cores available on the worker nodes. For instance, in the 02_parse_docs notebook, you could include df_raw_bronze = df_raw_bronze.repartition(2*sc.defaultParallelism)
to create twice as many partitions as the number of available worker cores. Typically, a multiple between 1 and 3 should yield satisfactory performance.
Running the pipeline manually
Alternatively, you can run each individual Notebook step-by-step:
Load the raw files using the
01_load_files
notebook. This saves each document binary as one record in a bronze table (raw_files_table_name
) defined in thedestination_tables_config
. Files are loaded incrementally, processing only new documents since the last run.Parse the documents with the
02_parse_docs
notebook. This notebook executes theparser_library
notebook (ensure to run this as the first cell to restart Python), making different parsers and related utilities available. It then uses the specified parser in thepipeline_config
to parse each document into plain text. As an example, relevant metadata like the number of pages of the original PDF alongside the parsed text is captured. Successfully parsed documents are stored in a silver table (parsed_docs_table_name
), while any unparsed documents are quarantined into a corresponding table.Chunk the parsed documents using the
03_chunk_docs
notebook. Similar to parsing, this notebook executes thechunker_library
notebook (again, run as the first cell). It splits each parsed document into smaller chunks using the specified chunker from thepipeline_config
. Each chunk is assigned a unique ID using an MD5 hash, necessary for synchronization with the vector search index. The final chunks are loaded into a gold table (chunked_docs_table_name
).Create/Sync the vector search index with the
04_vector_index
. This notebook verifies the readiness of the specified vector search endpoint in thevectorsearch_config
. If the configured index already exists, it initiates synchronization with the gold table; otherwise, it creates the index and triggers synchronization. This is expected to take some time if the Vector Search endpoint and index have not yet been created.
Next step
Continue with Step 7. Deploy & monitor.