o passo 6 (oleoduto). Implementar correções no pipeline de dados

pipeline de dados

Siga estes passos para modificar seu pipeline de dados e executá-lo para:

  1. Criar um novo índice vetorial.

  2. Crie uma execução do MLflow com os metadados do pipeline de dados.

A execução resultante do MLflow é referenciada pelo `B_quality_iteration/02_evaluate_fixes` Notebook.

Há duas abordagens para modificar o pipeline de dados:

  • Implementar uma única correção de cada vez Nessa abordagem, o senhor configura e executa uma única pipeline de dados de uma só vez. Esse modo é melhor se o senhor quiser experimentar um único modelo de incorporação e testar um único analisador novo. Databricks O senhor sugere começar aqui para se familiarizar com o Notebook.

  • Implementar várias correções de uma vez Nessa abordagem, também chamada de sweep, o senhor executa, em paralelo, vários pipelines de dados, cada um com uma configuração diferente. Esse modo é melhor se o senhor quiser "varrer" muitas estratégias diferentes, por exemplo, avaliar três analisadores de PDF ou avaliar muitos tamanhos diferentes de pedaços.

Consulte o repositório do GitHub para obter o código de amostra nesta seção.

Abordagem 1: implementar uma única correção de cada vez

  1. Abra o arquivo B_quality_iteration/data_pipeline_fixes/single_fix/00_config Notebook

  2. Siga as instruções em uma das opções abaixo:

  3. execução do pipeline, por qualquer um dos senhores:

  4. Adicione o nome da execução MLflow resultante que é enviado para a variável DATA_PIPELINE_FIXES_RUN_NAMES em B_quality_iteration/02_evaluate_fixes Notebook

Observação

A preparação de dados pipeline emprega a Spark transmissão estruturada para carregar e processar arquivos de forma incremental. Isso significa que os arquivos já carregados e preparados são rastreados em pontos de verificação e não serão reprocessados. Somente os arquivos recém-adicionados serão carregados, preparados e anexados às tabelas correspondentes.

Portanto, se o senhor quiser executar novamente todo o pipeline do zero e reprocessar todos os documentos, precisará excluir os pontos de verificação e as tabelas. O senhor pode fazer isso usando o Reset Notebook.

Abordagem 2: Implementar várias correções de uma só vez

  1. Abra o arquivo B_quality_iteration/data_pipeline_fixes/multiple_fixes/execução Notebook.

  2. Siga as instruções em Notebook para adicionar duas ou mais configurações do pipeline de dados para execução.

  3. execução o Notebook para executar esse pipeline.

  4. Adicione os nomes da execução resultante MLflow que são enviados para a variável DATA_PIPELINE_FIXES_RUN_NAMES em B_quality_iteration/02_evaluate_fixes Notebook.

Apêndice

Observação

É possível encontrar o Notebook referenciado abaixo nos diretórios single_fix e multiple_fixes, dependendo se o senhor está implementando uma única correção ou várias correções ao mesmo tempo.

Aprofundamento das definições de configuração

As várias opções de configuração pré-implementadas para o pipeline de dados estão listadas abaixo. Como alternativa, o senhor pode implementar um analisador/chunker personalizado.

  • vectorsearch_config: Especifique o endpoint de pesquisa de vetores (deve estar em funcionamento) e o nome do índice a ser criado. Além disso, defina o tipo de sincronização entre a tabela de origem e o índice (default é TRIGGERED).

  • embedding_config: Especifique o modelo de incorporação a ser usado, juntamente com o tokenizador. Para obter uma lista completa de opções, consulte `supporting_configs/embedding_models` Notebook. O modelo de incorporação deve ser implantado em um modelo de serviço em execução endpoint. Dependendo da estratégia de divisão em blocos, o tokenizador também é usado durante a divisão para garantir que os blocos não excedam o limite de tokens do modelo de incorporação. Os tokenizadores são usados aqui para contar o número de tokens nos blocos de texto para garantir que eles não excedam o comprimento máximo do contexto do modelo de incorporação selecionado.

A seguir, o senhor vê um tokenizador do HuggingFace:

    "embedding_tokenizer": {
        "tokenizer_model_name": "BAAI/bge-large-en-v1.5",
        "tokenizer_source": "hugging_face",
    }

A seguir, é mostrado um tokenizador do TikToken:

"embedding_tokenizer": {
        "tokenizer_model_name": "text-embedding-small",
        "tokenizer_source": "tiktoken",
    }
  • pipeline_config: Define o analisador de arquivos, o fragmentador e o caminho para o campo de fontes. Parsers e chunkers são definidos no parser_library e chunker_library Notebook, respectivamente. Eles podem ser encontrados nos diretórios single_fix e multiple_fixes. Para obter uma lista completa de opções, consulte o supporting_configs/parser_chunker_strategies Notebook que está novamente disponível nos diretórios de correção única e múltipla. Diferentes analisadores ou chunkers podem exigir parâmetros de configuração diferentes, em que <param x> representam os possíveis parâmetros necessários para um chunker específico. Os analisadores também podem receber valores de configuração usando o mesmo formato.

    "chunker": {
        "name": <chunker-name>,
        "config": {
            "<param 1>": "...",
            "<param 2>": "...",
            ...
        }
    }

Implementação de um analisador/chunker personalizado

Esse projeto está estruturado para facilitar a adição de analisadores ou chunkers personalizados ao pipeline de preparação de dados.

Adicionar um novo analisador

Suponha que o senhor queira incorporar um novo analisador usando a biblioteca PyMuPDF para transformar o texto analisado no formato Markdown. Siga estes passos:

  1. Instale as dependências necessárias adicionando o seguinte código ao endereço parser_library Notebook no diretório single_fix ou multiple_fix:

    # Dependencies for PyMuPdf
    %pip install pymupdf pymupdf4llm
    
  2. No site parser_library Notebook no diretório single_fix ou multiple_fix, adicione uma nova seção para o analisador PyMuPdfMarkdown e implemente a função de análise. Certifique-se de que a saída da função esteja em conformidade com a classe ParserReturnValue definida no início do site Notebook. Isso garante a compatibilidade com os UDFs do Spark. O bloco try ou except impede que o site Spark falhe em toda a análise Job devido a erros em documentos individuais ao aplicar o analisador como UDF em 02_parse_docs Notebook no diretório single_fix ou multiple_fix. Este Notebook verificará se houve falha na análise de algum documento, colocará em quarentena as linhas correspondentes e emitirá um aviso.

    import fitz
    import pymupdf4llm
    
    def parse_bytes_pymupdfmarkdown(
        raw_doc_contents_bytes: bytes,
    ) -> ParserReturnValue:
        try:
            pdf_doc = fitz.Document(stream=raw_doc_contents_bytes, filetype="pdf")
            md_text = pymupdf4llm.to_markdown(pdf_doc)
    
            output = {
                "num_pages": str(pdf_doc.page_count),
                "parsed_content": md_text.strip(),
            }
    
            return {
                OUTPUT_FIELD_NAME: output,
                STATUS_FIELD_NAME: "SUCCESS",
            }
        except Exception as e:
            warnings.warn(f"Exception {e} has been thrown during parsing")
            return {
                OUTPUT_FIELD_NAME: {"num_pages": "", "parsed_content": ""},
                STATUS_FIELD_NAME: f"ERROR: {e}",
            }
    
  3. Adicione sua nova função de análise ao parser_factory no parser_library Notebook no diretório single_fix ou multiple_fix para torná-la configurável no pipeline_config do 00_config Notebook.

  4. No 02_parse_docs Notebook, as funções do analisador são transformadas em Spark Python UDFs(otimizadas por seta para Databricks Runtime 14.0 ou acima) e aplicadas ao dataframe que contém os novos arquivos PDF binários. Para testes e desenvolvimento, adicione uma função de teste simples à parser_library Notebook que carrega o arquivo test-document.pdf e afirma que a análise foi bem-sucedida:

    with open("./test_data/test-document.pdf", "rb") as file:
        file_bytes = file.read()
        test_result_pymupdfmarkdown = parse_bytes_pymupdfmarkdown(file_bytes)
    
    assert test_result_pymupdfmarkdown[STATUS_FIELD_NAME] == "SUCCESS"
    

Adicionar um novo chunker

O processo para adicionar um novo chunker segue os passos semelhantes aos explicados acima para um novo analisador.

  1. Adicione as dependências necessárias na chunker_library Notebook.

  2. Adicione uma nova seção para o seu chunker e implemente uma função, por exemplo, chunk_parsed_content_newchunkername. A saída da nova função chunker deve ser um dicionário Python que esteja em conformidade com a classe ChunkerReturnValue definida no início da chunker_library Notebook. A função deve aceitar pelo menos uma cadeia de caracteres do texto analisado a ser dividido em pedaços. Se o seu chunker exigir parâmetros adicionais, o senhor poderá adicioná-los como parâmetros de função.

  3. Adicione seu novo chunker à função chunker_factory definida na chunker_library Notebook. Se a sua função aceitar parâmetros adicionais, use o partial do functools para pré-configurá-los. Isso é necessário porque os UDFs aceitam apenas um parâmetro de entrada, que será o texto analisado em nosso caso. O chunker_factory permite que o senhor configure diferentes métodos de chunker no pipeline e retorna um Spark Python UDF (otimizado para Databricks Runtime 14.0 e acima).

  4. Adicione uma seção de teste simples para sua nova função de fragmentação. Essa seção deve conter um texto predefinido fornecido como uma cadeia de caracteres.

Ajuste de desempenho

O Spark utiliza partições para paralelizar o processamento. Os dados são divididos em blocos de linhas e cada partição é processada por um único núcleo pelo site default. No entanto, quando os dados são lidos inicialmente pelo site Apache Spark, ele pode não criar partições otimizadas para o cálculo desejado, especialmente para nossos UDFs que executam tarefas de análise e chunking. É fundamental encontrar um equilíbrio entre a criação de partições que sejam pequenas o suficiente para uma paralelização eficiente e não tão pequenas que a sobrecarga de gerenciá-las supere os benefícios.

O senhor pode ajustar o número de partições usando df.repartitions(<number of partitions>). Ao aplicar UDFs, procure usar um múltiplo do número de núcleos disponíveis nos nós do site worker. Por exemplo, em 02_parse_docs Notebook, o senhor poderia incluir df_raw_bronze = df_raw_bronze.repartition(2*sc.defaultParallelism) para criar duas vezes mais partições do que o número de núcleos worker disponíveis. Normalmente, um múltiplo entre 1 e 3 deve produzir um desempenho satisfatório.

Executar o pipeline manualmente

Como alternativa, o senhor pode executar cada Notebook individual passo a passo:

  1. Carregue os arquivos raw usando o 01_load_files Notebook. Isso salva cada binário do documento como um registro em uma tabela de bronze (raw_files_table_name) definida no destination_tables_config. Os arquivos são carregados de forma incremental, processando apenas os novos documentos desde a última execução.

  2. Analisar os documentos com 02_parse_docs Notebook. Esse Notebook executa o parser_library Notebook (certifique-se de executá-lo como a primeira célula para reiniciar o Python), disponibilizando diferentes analisadores e utilitários relacionados. Em seguida, ele usa o analisador especificado no pipeline_config para analisar cada documento em texto simples. Por exemplo, metadados relevantes, como o número de páginas do PDF original junto com o texto analisado, são capturados. Os documentos analisados com sucesso são armazenados em uma tabela prata (parsed_docs_table_name), enquanto os documentos não analisados são colocados em quarentena em uma tabela correspondente.

  3. Separe os documentos analisados usando o 03_chunk_docs Notebook. Semelhante à análise, este Notebook executa o chunker_library Notebook (novamente, execução como a primeira célula). Ele divide cada documento analisado em pedaços menores usando o chunker especificado do pipeline_config. Cada bloco recebe um ID exclusivo usando um hash MD5, necessário para a sincronização com o índice de pesquisa de vetores. Os blocos finais são carregados em uma tabela ouro (chunked_docs_table_name).

  4. Criar/sincronizar o índice de pesquisa vetorial com o 04_vector_index. Este Notebook verifica a prontidão da pesquisa vetorial especificada endpoint no vectorsearch_config. Se o índice configurado já existir, ele inicia a sincronização com a tabela ouro; caso contrário, ele cria o índice e aciona a sincronização. Espera-se que isso leve algum tempo se o endpoint e o índice do Vector Search ainda não tiverem sido criados.

Próximo passo

Continue com o passo 7. implantado & monitor.