o passo 6 (oleoduto). Implementar correções no pipeline de dados
Siga estes passos para modificar seu pipeline de dados e executá-lo para:
Crie um novo índice vetorial.
Crie uma execução do MLflow com os metadados do pipeline de dados.
A MLflow execução resultante é referenciada pelo notebook `B_quality_iteration/02_evaluate_fixes`.
Há duas abordagens para modificar o pipeline de dados:
Implementar uma única correção de cada vez Nessa abordagem, o senhor configura e executa um único pipeline de dados de uma só vez. Esse modo é melhor se você quiser experimentar um único modelo de incorporação e testar um único novo analisador. Databricks O senhor sugere começar aqui para se familiarizar com o Notebook.
Implementar várias correções de uma vez Nessa abordagem, também chamada de sweep, o senhor executa, em paralelo, vários pipelines de dados, cada um com uma configuração diferente. Esse modo é melhor se você quiser “varrer” várias estratégias diferentes, por exemplo, avaliar três analisadores de PDF ou avaliar vários tamanhos de blocos diferentes.
Consulte o repositório do GitHub para obter o código de amostra nesta seção.
Abordagem 1: Implemente uma única correção por vez
Abra o Notebook B_quality_iteration/data_pipeline_fixes/single_fix/00_config
Siga as instruções em uma das opções abaixo:
Siga as instruções para implementar uma nova configuração fornecida por este livro de receitas.
Siga os passos para implementar um código personalizado para análise ou chunking.
execução do pipeline, por qualquer um dos senhores:
Abrir o site & e executar o Notebook.
Seguindo os passos para executar cada passo do pipeline manualmente.
Adicione o nome da execução MLflow resultante que é enviado para a variável
DATA_PIPELINE_FIXES_RUN_NAMES
no Notebook B_quality_iteration/02_evaluate_fixes
Observação
A preparação de dados pipeline emprega a Spark transmissão estruturada para carregar e processar arquivos de forma incremental. Isso significa que os arquivos já carregados e preparados sejam rastreados em pontos de verificação e não sejam reprocessados. Somente os arquivos recém-adicionados serão carregados, preparados e anexados às tabelas correspondentes.
Portanto, se o senhor quiser executar novamente todo o pipeline do zero e reprocessar todos os documentos, precisará excluir os pontos de verificação e as tabelas. O senhor pode fazer isso usando o Reset Notebook.
Abordagem 2: Implemente várias correções de uma só vez
Abra o Notebook B_quality_iteration/data_pipeline_fixes/multiple_fixes/execução.
Siga as instruções no Notebook para adicionar duas ou mais configurações do pipeline de dados para execução.
execução o Notebook para executar esses pipelines.
Adicione os nomes da MLflow execução resultante que são enviados para a variável
DATA_PIPELINE_FIXES_RUN_NAMES
no B_quality_iteration/02_evaluate_fixes Notebook.
Apêndice
Observação
É possível encontrar o Notebook referenciado abaixo nos diretórios single_fix e multiple_fixes, dependendo se o senhor está implementando uma única correção ou várias correções ao mesmo tempo.
Aprofundamento das configurações
As várias opções de configuração pré-implementadas para o pipeline de dados estão listadas abaixo. Como alternativa, você pode implementar um analisador/fragmento personalizado.
vectorsearch_config
: Especifique o endpoint de pesquisa de vetores (deve estar em funcionamento) e o nome do índice a ser criado. Além disso, defina o tipo de sincronização entre a tabela de origem e o índice (default éTRIGGERED
).embedding_config
: especifique o modelo de incorporação a ser usado, junto com o tokenizador. Para obter uma lista completa de opções, consulte o `supporting_configs/embedding_models` Notebook. O modelo de incorporação deve ser implantado em um modelo de serviço em execução endpoint. Dependendo da estratégia de divisão em blocos, o tokenizador também é usado durante a divisão para garantir que os blocos não excedam o limite de tokens do modelo de incorporação. Os tokenizadores são usados aqui para contar o número de tokens nos blocos de texto para garantir que eles não excedam o comprimento máximo do contexto do modelo de incorporação selecionado.
O seguinte mostra um tokenizador do HuggingFace:
"embedding_tokenizer": {
"tokenizer_model_name": "BAAI/bge-large-en-v1.5",
"tokenizer_source": "hugging_face",
}
O seguinte mostra um tokenizador do TikToken:
"embedding_tokenizer": {
"tokenizer_model_name": "text-embedding-small",
"tokenizer_source": "tiktoken",
}
pipeline_config
: define o analisador de arquivos, o fragmento e o caminho para o campo de fontes. Parsers e chunkers são definidos noparser_library
echunker_library
Notebook, respectivamente. Eles podem ser encontrados nos diretórios single_fix e multiple_fixes . Para obter uma lista completa de opções, consulte osupporting_configs/parser_chunker_strategies
Notebook, que está novamente disponível nos diretórios de correção única e múltipla. Analisadores ou fragmentos diferentes podem exigir parâmetros de configuração diferentes, onde<param x>
representam os parâmetros potenciais necessários para um fragmento específico. Os analisadores também podem receber valores de configuração usando o mesmo formato.
"chunker": {
"name": <chunker-name>,
"config": {
"<param 1>": "...",
"<param 2>": "...",
...
}
}
Implementando um analisador/fragmento personalizado
Esse projeto está estruturado para facilitar a adição de analisadores ou chunkers personalizados ao pipeline de preparação de dados.
Adicionar um novo analisador
Suponha que o senhor queira incorporar um novo analisador usando a biblioteca PyMuPDF para transformar o texto analisado no formato Markdown. Siga estes passos:
Instale as dependências necessárias adicionando o seguinte código ao Notebook
parser_library
no diretóriosingle_fix
oumultiple_fix
:# Dependencies for PyMuPdf %pip install pymupdf pymupdf4llm
No
parser_library
Notebook, no diretóriosingle_fix
oumultiple_fix
, adicione uma nova seção para o analisadorPyMuPdfMarkdown
e implemente a função de análise. Certifique-se de que a saída da função esteja em conformidade com a classeParserReturnValue
definida no início do Notebook. Isso garante a compatibilidade com os UDFs do Spark. O blocotry
ouexcept
impede que o site Spark falhe em todo o trabalho de análise devido a erros em documentos individuais ao aplicar o analisador como um UDF no02_parse_docs
Notebook no diretóriosingle_fix
oumultiple_fix
. Esse Notebook verificará se houve falha na análise de algum documento, colocará em quarentena as linhas correspondentes e emitirá um aviso.import fitz import pymupdf4llm def parse_bytes_pymupdfmarkdown( raw_doc_contents_bytes: bytes, ) -> ParserReturnValue: try: pdf_doc = fitz.Document(stream=raw_doc_contents_bytes, filetype="pdf") md_text = pymupdf4llm.to_markdown(pdf_doc) output = { "num_pages": str(pdf_doc.page_count), "parsed_content": md_text.strip(), } return { OUTPUT_FIELD_NAME: output, STATUS_FIELD_NAME: "SUCCESS", } except Exception as e: warnings.warn(f"Exception {e} has been thrown during parsing") return { OUTPUT_FIELD_NAME: {"num_pages": "", "parsed_content": ""}, STATUS_FIELD_NAME: f"ERROR: {e}", }
Adicione sua nova função de análise ao
parser_factory
noparser_library
Notebook no diretóriosingle_fix
oumultiple_fix
para torná-la configurável nopipeline_config
do00_config
Notebook. Notebook.No
02_parse_docs
Notebook, as funções do analisador são transformadas em Spark Python UDFs(otimizadas por seta para Databricks Runtime 14.0 ou acima) e aplicadas ao dataframe que contém os novos arquivos PDF binários. Para testes e desenvolvimento, adicione uma função de teste simples ao Notebook parser_library que carrega o arquivo test-document.pdf e afirma que a análise foi bem-sucedida:with open("./test_data/test-document.pdf", "rb") as file: file_bytes = file.read() test_result_pymupdfmarkdown = parse_bytes_pymupdfmarkdown(file_bytes) assert test_result_pymupdfmarkdown[STATUS_FIELD_NAME] == "SUCCESS"
Adicionar um novo fragmento
O processo para adicionar um novo chunker segue os passos semelhantes aos explicados acima para um novo analisador.
Adicione as dependências necessárias no Notebook chunker_library.
Adicione uma nova seção para seu chunker e implemente uma função, por exemplo,
chunk_parsed_content_newchunkername
. A saída da nova função chunker deve ser um dicionário Python que esteja em conformidade com a classeChunkerReturnValue
definida no início do Notebook chunker_library. A função deve aceitar pelo menos uma cadeia de caracteres do texto analisado a ser dividido em pedaços. Se seu chunker exigir parâmetros adicionais, você poderá adicioná-los como parâmetros de função.Adicione seu novo chunker à função
chunker_factory
definida no Notebook chunker_library. Se sua função aceitar parâmetros adicionais, use functools' partial para pré-configurá-los. Isso é necessário porque os UDFs aceitam apenas um parâmetro de entrada, que será o texto analisado em nosso caso. Ochunker_factory
permite que o senhor configure diferentes métodos de chunker no pipeline e retorna um Spark Python UDF (otimizado para Databricks Runtime 14.0 e acima).Adicione uma seção de teste simples para sua nova função de fragmentação. Essa seção deve conter um texto predefinido fornecido como uma cadeia de caracteres.
Ajuste de desempenho
O Spark utiliza partições para paralelizar o processamento. Os dados são divididos em blocos de linhas e cada partição é processada por um único núcleo pelo site default. No entanto, quando os dados são lidos inicialmente pelo site Apache Spark, ele pode não criar partições otimizadas para o cálculo desejado, especialmente para nossos UDFs que executam tarefas de análise e chunking. É crucial encontrar um equilíbrio entre criar partições que sejam pequenas o suficiente para uma paralelização eficiente e não tão pequenas que a sobrecarga de gerenciamento supere os benefícios.
Você pode ajustar o número de partições usando df.repartitions(<number of partitions>)
. Ao aplicar UDFs, procure usar um múltiplo do número de núcleos disponíveis nos nós do site worker. Por exemplo, no Notebook 02_parse_docs, o senhor poderia incluir df_raw_bronze = df_raw_bronze.repartition(2*sc.defaultParallelism)
para criar duas vezes mais partições do que o número de núcleos disponíveis no site worker. Normalmente, um múltiplo entre 1 e 3 deve produzir um desempenho satisfatório.
Executar o pipeline manualmente
Como alternativa, o senhor pode executar cada Notebook individual passo a passo:
Carregue os arquivos brutos usando o
01_load_files
Notebook. Isso salva cada documento binário como um registro em uma tabela de bronze (raw_files_table_name
) definida nodestination_tables_config
. Os arquivos são carregados de forma incremental, processando apenas os novos documentos desde a última execução.Analisar os documentos com o
02_parse_docs
Notebook. Este Notebook executa oparser_library
Notebook(certifique-se de executá-lo como a primeira célula para reiniciar Python), disponibilizando diferentes analisadores e utilitários relacionados. Em seguida, ele usa o analisador especificado nopipeline_config
para analisar cada documento em texto simples. Como exemplo, metadados relevantes, como o número de páginas do PDF original junto com o texto analisado, são capturados. Os documentos analisados com sucesso são armazenados em uma tabela prateada (parsed_docs_table_name
), enquanto os documentos não analisados são colocados em quarentena em uma tabela correspondente.Separe os documentos analisados usando o
03_chunk_docs
Notebook. Semelhante à análise, esse Notebook executa ochunker_library
Notebook(novamente, execução como a primeira célula). Ele divide cada documento analisado em partes menores usando o fragmento especificado dopipeline_config
. Cada bloco recebe um ID exclusivo usando um hash MD5, necessário para sincronização com o índice de pesquisa vetorial. Os blocos finais são carregados em uma tabela ouro (chunked_docs_table_name
).Crie/sincronize o índice de pesquisa vetorial com o
04_vector_index
. Este Notebook verifica a prontidão da pesquisa vetorial especificada endpoint novectorsearch_config
. Se o índice configurado já existir, ele inicia a sincronização com a tabela ouro; caso contrário, ele cria o índice e aciona a sincronização. Espera-se que isso leve algum tempo se o endpoint e o índice do Vector Search ainda não tiverem sido criados.
Próximo passo
Continue com o passo 7. implantado & monitor.