Step 6 (pipelines). Implement data pipeline fixes

Data pipeline

Follow these steps to modify your data pipeline and run it to:

  1. Create a new vector index.

  2. Create an MLflow run with the data pipeline’s metadata.

The resulting MLflow run is referenced by the `B_quality_iteration/02_evaluate_fixes` notebook.

There are two approaches to modifying the data pipeline:

  • Implement a single fix at a time In this approach, you configure and run a single data pipeline at once. This mode is best if you want to try a single embedding model and test out a single new parser. Databricks suggests starting here to get familiar with these notebooks.

  • Implement multiple fix at once In this approach, also called a sweep, you, in parallel, run multiple data pipelines that each have a different configuration. This mode is best if you want to “sweep” across many different strategies, for example, evaluate three PDF parsers or evaluate many different chunk sizes.

See the GitHub repository for the sample code in this section.

Approach 1: Implement a single fix at a time

  1. Open the B_quality_iteration/data_pipeline_fixes/single_fix/00_config notebook

  2. Follow the instructions in one of the below:

  3. Run the pipeline, by either:

  4. Add the name of the resulting MLflow Run that is outputted to the DATA_PIPELINE_FIXES_RUN_NAMES variable in B_quality_iteration/02_evaluate_fixes notebook

Note

The data preparation pipeline employs Spark Structured Streaming to incrementally load and process files. This entails that files already loaded and prepared are tracked in checkpoints and won’t be reprocessed. Only newly added files will be loaded, prepared, and appended to the corresponding tables.

Therefore, if you wish to rerun the entire pipeline from scratch and reprocess all documents, you need to delete the checkpoints and tables. You can accomplish this by using the reset_tables_and_checkpoints notebook.

Approach 2: Implement multiple fix at once

  1. Open the B_quality_iteration/data_pipeline_fixes/multiple_fixes/00_Run_Multiple_Pipelines notebook.

  2. Follow the instructions in the notebook to add two or more configurations of the data pipeline to run.

  3. Run the notebook to execute these pipelines.

  4. Add the names of the resulting MLflow runs that are outputted to the DATA_PIPELINE_FIXES_RUN_NAMES variable in B_quality_iteration/02_evaluate_fixes notebook.

Appendix

Note

You can find the notebooks referenced below in the single_fix and multiple_fixes directories depending on whether you are implementing a single fix or multiple fixes at a time.

Configuration settings deep dive

The various pre-implemented configuration options for the data pipeline are listed below. Alternatively, you can implement a custom parser/chunker.

  • vectorsearch_config: Specify the vector search endpoint (must be up and running) and the name of the index to be created. Additionally, define the synchronization type between the source table and the index (default is TRIGGERED).

  • embedding_config: Specify the embedding model to be used, along with the tokenizer. For a complete list of options see the `supporting_configs/embedding_models` notebook. The embedding model has to be deployed to a running model serving endpoint. Depending on chunking strategy, the tokenizer is also during splitting to make sure the chunks do not exceed the token limit of the embedding model. Tokenizers are used here to count the number of tokens in the text chunks to ensure that they don’t exceed the maximum context length of the selected embedding model.

The following shows a tokenizer from HuggingFace:

    "embedding_tokenizer": {
        "tokenizer_model_name": "BAAI/bge-large-en-v1.5",
        "tokenizer_source": "hugging_face",
    }

The following shows a tokenizer from TikToken:

"embedding_tokenizer": {
        "tokenizer_model_name": "text-embedding-small",
        "tokenizer_source": "tiktoken",
    }
  • pipeline_config: Defines the file parser, chunker and path to the sources field. Parsers and chunkers are defined in the parser_library and chunker_library notebooks, respectively. These can be found in the single_fix and multiple_fixes directories. For a complete list of options see the supporting_configs/parser_chunker_strategies notebook, which is again available in both the single and multiple fix directories. Different parsers or chunkers may require different configuration parameters where <param x> represent the potential parameters required for a specific chunker. Parsers can also be passed configuration values using the same format.

    "chunker": {
        "name": <chunker-name>,
        "config": {
            "<param 1>": "...",
            "<param 2>": "...",
            ...
        }
    }

Implementing a custom parser/chunker

This project is structured to facilitate the addition of custom parsers or chunkers to the data preparation pipeline.

Add a new parser

Suppose you want to incorporate a new parser using the PyMuPDF library to transform parsed text into Markdown format. Follow these steps:

  1. Install the required dependencies by adding the following code to the parser_library notebook in the single_fix or multiple_fix directory:

    # Dependencies for PyMuPdf
    %pip install pymupdf pymupdf4llm
    
  2. In the parser_library notebook in the single_fix or multiple_fix directory, add a new section for the PyMuPdfMarkdown parser and implement the parsing function. Ensure the output of the function complies with the ParserReturnValue class defined at the beginning of the notebook. This ensures compatibility with Spark UDFs. The try or except block prevents Spark from failing the entire parsing job due to errors in individual documents when applying the parser as a UDF in 02_parse_docs notebook in the single_fix or multiple_fix directory. This notebook will check if parsing failed for any document, quarantine the corresponding rows and raise a warning.

    import fitz
    import pymupdf4llm
    
    def parse_bytes_pymupdfmarkdown(
        raw_doc_contents_bytes: bytes,
    ) -> ParserReturnValue:
        try:
            pdf_doc = fitz.Document(stream=raw_doc_contents_bytes, filetype="pdf")
            md_text = pymupdf4llm.to_markdown(pdf_doc)
    
            output = {
                "num_pages": str(pdf_doc.page_count),
                "parsed_content": md_text.strip(),
            }
    
            return {
                OUTPUT_FIELD_NAME: output,
                STATUS_FIELD_NAME: "SUCCESS",
            }
        except Exception as e:
            warnings.warn(f"Exception {e} has been thrown during parsing")
            return {
                OUTPUT_FIELD_NAME: {"num_pages": "", "parsed_content": ""},
                STATUS_FIELD_NAME: f"ERROR: {e}",
            }
    
  3. Add your new parsing function to the parser_factory in the parser_library notebook in the single_fix or multiple_fix directory to make it configurable in the pipeline_config of the 00_config notebook.

  4. In the 02_parse_docs notebook, parser functions are turned into Spark Python UDFs (arrow-optimized for Databricks Runtime 14.0 or above) and applied to the dataframe containing the new binary PDF files. For testing and development, add a simple testing function to the parser_library notebook that loads the test-document.pdf file and asserts successful parsing:

    with open("./test_data/test-document.pdf", "rb") as file:
        file_bytes = file.read()
        test_result_pymupdfmarkdown = parse_bytes_pymupdfmarkdown(file_bytes)
    
    assert test_result_pymupdfmarkdown[STATUS_FIELD_NAME] == "SUCCESS"
    

Add a new chunker

The process for adding a new chunker follows similar steps to those explained above for a new parser.

  1. Add the required dependencies in the chunker_library notebook.

  2. Add a new section for your chunker and implement a function, e.g., chunk_parsed_content_newchunkername. The output of the new chunker function must be a Python dictionary that complies with the ChunkerReturnValue class defined at the beginning of the chunker_library notebook. The function should accept at least a string of the parsed text to be chunked. If your chunker requires additional parameters, you can add them as function parameters.

  3. Add your new chunker to the chunker_factory function defined in the chunker_library notebook. If your function accepts additional parameters, use functools’ partial to pre-configure them. This is necessary because UDFs only accept one input parameter, which will be the parsed text in our case. The chunker_factory enables you to configure different chunker methods in the pipeline_config and returns a Spark Python UDF (optimized for Databricks Runtime 14.0 and above).

  4. Add a simple testing section for your new chunking function. This section should chunk a predefined text provided as a string.

Performance tuning

Spark utilizes partitions to parallelize processing. Data is divided into chunks of rows, and each partition is processed by a single core by default. However, when data is initially read by Apache Spark, it may not create partitions optimized for the desired computation, particularly for our UDFs performing parsing and chunking tasks. It’s crucial to strike a balance between creating partitions that are small enough for efficient parallelization and not so small that the overhead of managing them outweighs the benefits.

You can adjust the number of partitions using df.repartitions(<number of partitions>). When applying UDFs, aim for a multiple of the number of cores available on the worker nodes. For instance, in the 02_parse_docs notebook, you could include df_raw_bronze = df_raw_bronze.repartition(2*sc.defaultParallelism) to create twice as many partitions as the number of available worker cores. Typically, a multiple between 1 and 3 should yield satisfactory performance.

Running the pipeline manually

Alternatively, you can run each individual Notebook step-by-step:

  1. Load the raw files using the 01_load_files notebook. This saves each document binary as one record in a bronze table (raw_files_table_name) defined in the destination_tables_config. Files are loaded incrementally, processing only new documents since the last run.

  2. Parse the documents with the 02_parse_docs notebook. This notebook executes the parser_library notebook (ensure to run this as the first cell to restart Python), making different parsers and related utilities available. It then uses the specified parser in the pipeline_config to parse each document into plain text. As an example, relevant metadata like the number of pages of the original PDF alongside the parsed text is captured. Successfully parsed documents are stored in a silver table (parsed_docs_table_name), while any unparsed documents are quarantined into a corresponding table.

  3. Chunk the parsed documents using the 03_chunk_docs notebook. Similar to parsing, this notebook executes the chunker_library notebook (again, run as the first cell). It splits each parsed document into smaller chunks using the specified chunker from the pipeline_config. Each chunk is assigned a unique ID using an MD5 hash, necessary for synchronization with the vector search index. The final chunks are loaded into a gold table (chunked_docs_table_name).

  4. Create/Sync the vector search index with the 04_vector_index. This notebook verifies the readiness of the specified vector search endpoint in the vectorsearch_config. If the configured index already exists, it initiates synchronization with the gold table; otherwise, it creates the index and triggers synchronization. This is expected to take some time if the Vector Search endpoint and index have not yet been created.