o passo 6 (oleoduto). Implementar correções no pipeline de dados
![pipeline de dados](../../_images/data-pipeline.png)
Siga estes passos para modificar seu pipeline de dados e executá-lo para:
Criar um novo índice vetorial.
Crie uma execução do MLflow com os metadados do pipeline de dados.
A execução resultante do MLflow é referenciada pelo `B_quality_iteration/02_evaluate_fixes` Notebook.
Há duas abordagens para modificar o pipeline de dados:
Implementar uma única correção de cada vez Nessa abordagem, o senhor configura e executa uma única pipeline de dados de uma só vez. Esse modo é melhor se o senhor quiser experimentar um único modelo de incorporação e testar um único analisador novo. Databricks O senhor sugere começar aqui para se familiarizar com o Notebook.
Implementar várias correções de uma vez Nessa abordagem, também chamada de sweep, o senhor executa, em paralelo, vários pipelines de dados, cada um com uma configuração diferente. Esse modo é melhor se o senhor quiser "varrer" muitas estratégias diferentes, por exemplo, avaliar três analisadores de PDF ou avaliar muitos tamanhos diferentes de pedaços.
Consulte o repositório do GitHub para obter o código de amostra nesta seção.
Abordagem 1: implementar uma única correção de cada vez
Abra o arquivo B_quality_iteration/data_pipeline_fixes/single_fix/00_config Notebook
Siga as instruções em uma das opções abaixo:
Siga as instruções para implementar uma nova configuração fornecidas por este livro de receitas.
Siga os passos para implementar um código personalizado para análise ou chunking.
execução do pipeline, por qualquer um dos senhores:
Abrir e executar a execução Notebook.
Seguindo os passos para executar cada passo do pipeline manualmente.
Adicione o nome da execução MLflow resultante que é enviado para a variável
DATA_PIPELINE_FIXES_RUN_NAMES
em B_quality_iteration/02_evaluate_fixes Notebook
Observação
A preparação de dados pipeline emprega a Spark transmissão estruturada para carregar e processar arquivos de forma incremental. Isso significa que os arquivos já carregados e preparados são rastreados em pontos de verificação e não serão reprocessados. Somente os arquivos recém-adicionados serão carregados, preparados e anexados às tabelas correspondentes.
Portanto, se o senhor quiser executar novamente todo o pipeline do zero e reprocessar todos os documentos, precisará excluir os pontos de verificação e as tabelas. O senhor pode fazer isso usando o Reset Notebook.
Abordagem 2: Implementar várias correções de uma só vez
Abra o arquivo B_quality_iteration/data_pipeline_fixes/multiple_fixes/execução Notebook.
Siga as instruções em Notebook para adicionar duas ou mais configurações do pipeline de dados para execução.
execução o Notebook para executar esse pipeline.
Adicione os nomes da execução resultante MLflow que são enviados para a variável
DATA_PIPELINE_FIXES_RUN_NAMES
em B_quality_iteration/02_evaluate_fixes Notebook.
Apêndice
Observação
É possível encontrar o Notebook referenciado abaixo nos diretórios single_fix e multiple_fixes, dependendo se o senhor está implementando uma única correção ou várias correções ao mesmo tempo.
Aprofundamento das definições de configuração
As várias opções de configuração pré-implementadas para o pipeline de dados estão listadas abaixo. Como alternativa, o senhor pode implementar um analisador/chunker personalizado.
vectorsearch_config
: Especifique o endpoint de pesquisa de vetores (deve estar em funcionamento) e o nome do índice a ser criado. Além disso, defina o tipo de sincronização entre a tabela de origem e o índice (default éTRIGGERED
).embedding_config
: Especifique o modelo de incorporação a ser usado, juntamente com o tokenizador. Para obter uma lista completa de opções, consulte `supporting_configs/embedding_models` Notebook. O modelo de incorporação deve ser implantado em um modelo de serviço em execução endpoint. Dependendo da estratégia de divisão em blocos, o tokenizador também é usado durante a divisão para garantir que os blocos não excedam o limite de tokens do modelo de incorporação. Os tokenizadores são usados aqui para contar o número de tokens nos blocos de texto para garantir que eles não excedam o comprimento máximo do contexto do modelo de incorporação selecionado.
A seguir, o senhor vê um tokenizador do HuggingFace:
"embedding_tokenizer": {
"tokenizer_model_name": "BAAI/bge-large-en-v1.5",
"tokenizer_source": "hugging_face",
}
A seguir, é mostrado um tokenizador do TikToken:
"embedding_tokenizer": {
"tokenizer_model_name": "text-embedding-small",
"tokenizer_source": "tiktoken",
}
pipeline_config
: Define o analisador de arquivos, o fragmentador e o caminho para o campo de fontes. Parsers e chunkers são definidos noparser_library
echunker_library
Notebook, respectivamente. Eles podem ser encontrados nos diretórios single_fix e multiple_fixes. Para obter uma lista completa de opções, consulte osupporting_configs/parser_chunker_strategies
Notebook que está novamente disponível nos diretórios de correção única e múltipla. Diferentes analisadores ou chunkers podem exigir parâmetros de configuração diferentes, em que<param x>
representam os possíveis parâmetros necessários para um chunker específico. Os analisadores também podem receber valores de configuração usando o mesmo formato.
"chunker": {
"name": <chunker-name>,
"config": {
"<param 1>": "...",
"<param 2>": "...",
...
}
}
Implementação de um analisador/chunker personalizado
Esse projeto está estruturado para facilitar a adição de analisadores ou chunkers personalizados ao pipeline de preparação de dados.
Adicionar um novo analisador
Suponha que o senhor queira incorporar um novo analisador usando a biblioteca PyMuPDF para transformar o texto analisado no formato Markdown. Siga estes passos:
Instale as dependências necessárias adicionando o seguinte código ao endereço
parser_library
Notebook no diretóriosingle_fix
oumultiple_fix
:# Dependencies for PyMuPdf %pip install pymupdf pymupdf4llm
No site
parser_library
Notebook no diretóriosingle_fix
oumultiple_fix
, adicione uma nova seção para o analisadorPyMuPdfMarkdown
e implemente a função de análise. Certifique-se de que a saída da função esteja em conformidade com a classeParserReturnValue
definida no início do site Notebook. Isso garante a compatibilidade com os UDFs do Spark. O blocotry
ouexcept
impede que o site Spark falhe em toda a análise Job devido a erros em documentos individuais ao aplicar o analisador como UDF em02_parse_docs
Notebook no diretóriosingle_fix
oumultiple_fix
. Este Notebook verificará se houve falha na análise de algum documento, colocará em quarentena as linhas correspondentes e emitirá um aviso.import fitz import pymupdf4llm def parse_bytes_pymupdfmarkdown( raw_doc_contents_bytes: bytes, ) -> ParserReturnValue: try: pdf_doc = fitz.Document(stream=raw_doc_contents_bytes, filetype="pdf") md_text = pymupdf4llm.to_markdown(pdf_doc) output = { "num_pages": str(pdf_doc.page_count), "parsed_content": md_text.strip(), } return { OUTPUT_FIELD_NAME: output, STATUS_FIELD_NAME: "SUCCESS", } except Exception as e: warnings.warn(f"Exception {e} has been thrown during parsing") return { OUTPUT_FIELD_NAME: {"num_pages": "", "parsed_content": ""}, STATUS_FIELD_NAME: f"ERROR: {e}", }
Adicione sua nova função de análise ao
parser_factory
noparser_library
Notebook no diretóriosingle_fix
oumultiple_fix
para torná-la configurável nopipeline_config
do00_config
Notebook.No
02_parse_docs
Notebook, as funções do analisador são transformadas em Spark Python UDFs(otimizadas por seta para Databricks Runtime 14.0 ou acima) e aplicadas ao dataframe que contém os novos arquivos PDF binários. Para testes e desenvolvimento, adicione uma função de teste simples à parser_library Notebook que carrega o arquivo test-document.pdf e afirma que a análise foi bem-sucedida:with open("./test_data/test-document.pdf", "rb") as file: file_bytes = file.read() test_result_pymupdfmarkdown = parse_bytes_pymupdfmarkdown(file_bytes) assert test_result_pymupdfmarkdown[STATUS_FIELD_NAME] == "SUCCESS"
Adicionar um novo chunker
O processo para adicionar um novo chunker segue os passos semelhantes aos explicados acima para um novo analisador.
Adicione as dependências necessárias na chunker_library Notebook.
Adicione uma nova seção para o seu chunker e implemente uma função, por exemplo,
chunk_parsed_content_newchunkername
. A saída da nova função chunker deve ser um dicionário Python que esteja em conformidade com a classeChunkerReturnValue
definida no início da chunker_library Notebook. A função deve aceitar pelo menos uma cadeia de caracteres do texto analisado a ser dividido em pedaços. Se o seu chunker exigir parâmetros adicionais, o senhor poderá adicioná-los como parâmetros de função.Adicione seu novo chunker à função
chunker_factory
definida na chunker_library Notebook. Se a sua função aceitar parâmetros adicionais, use o partial do functools para pré-configurá-los. Isso é necessário porque os UDFs aceitam apenas um parâmetro de entrada, que será o texto analisado em nosso caso. Ochunker_factory
permite que o senhor configure diferentes métodos de chunker no pipeline e retorna um Spark Python UDF (otimizado para Databricks Runtime 14.0 e acima).Adicione uma seção de teste simples para sua nova função de fragmentação. Essa seção deve conter um texto predefinido fornecido como uma cadeia de caracteres.
Ajuste de desempenho
O Spark utiliza partições para paralelizar o processamento. Os dados são divididos em blocos de linhas e cada partição é processada por um único núcleo pelo site default. No entanto, quando os dados são lidos inicialmente pelo site Apache Spark, ele pode não criar partições otimizadas para o cálculo desejado, especialmente para nossos UDFs que executam tarefas de análise e chunking. É fundamental encontrar um equilíbrio entre a criação de partições que sejam pequenas o suficiente para uma paralelização eficiente e não tão pequenas que a sobrecarga de gerenciá-las supere os benefícios.
O senhor pode ajustar o número de partições usando df.repartitions(<number of partitions>)
. Ao aplicar UDFs, procure usar um múltiplo do número de núcleos disponíveis nos nós do site worker. Por exemplo, em 02_parse_docs Notebook, o senhor poderia incluir df_raw_bronze = df_raw_bronze.repartition(2*sc.defaultParallelism)
para criar duas vezes mais partições do que o número de núcleos worker disponíveis. Normalmente, um múltiplo entre 1 e 3 deve produzir um desempenho satisfatório.
Executar o pipeline manualmente
Como alternativa, o senhor pode executar cada Notebook individual passo a passo:
Carregue os arquivos raw usando o
01_load_files
Notebook. Isso salva cada binário do documento como um registro em uma tabela de bronze (raw_files_table_name
) definida nodestination_tables_config
. Os arquivos são carregados de forma incremental, processando apenas os novos documentos desde a última execução.Analisar os documentos com
02_parse_docs
Notebook. Esse Notebook executa oparser_library
Notebook (certifique-se de executá-lo como a primeira célula para reiniciar o Python), disponibilizando diferentes analisadores e utilitários relacionados. Em seguida, ele usa o analisador especificado nopipeline_config
para analisar cada documento em texto simples. Por exemplo, metadados relevantes, como o número de páginas do PDF original junto com o texto analisado, são capturados. Os documentos analisados com sucesso são armazenados em uma tabela prata (parsed_docs_table_name
), enquanto os documentos não analisados são colocados em quarentena em uma tabela correspondente.Separe os documentos analisados usando o
03_chunk_docs
Notebook. Semelhante à análise, este Notebook executa ochunker_library
Notebook (novamente, execução como a primeira célula). Ele divide cada documento analisado em pedaços menores usando o chunker especificado dopipeline_config
. Cada bloco recebe um ID exclusivo usando um hash MD5, necessário para a sincronização com o índice de pesquisa de vetores. Os blocos finais são carregados em uma tabela ouro (chunked_docs_table_name
).Criar/sincronizar o índice de pesquisa vetorial com o
04_vector_index
. Este Notebook verifica a prontidão da pesquisa vetorial especificada endpoint novectorsearch_config
. Se o índice configurado já existir, ele inicia a sincronização com a tabela ouro; caso contrário, ele cria o índice e aciona a sincronização. Espera-se que isso leve algum tempo se o endpoint e o índice do Vector Search ainda não tiverem sido criados.
Próximo passo
Continue com o passo 7. implantado & monitor.