Etapa 6 (gasodutos). Implementar correções de pipeline de dados
Siga estas etapas para modificar seu pipeline de dados e executá-lo para:
- Crie um novo índice vetorial.
- Crie uma execução MLflow com os metadados do pipeline de dados.
A execução MLflow resultante é referenciada pelo bloco de B_quality_iteration/02_evaluate_fixes
anotações.
Há duas abordagens para modificar o pipeline de dados:
- Implementar uma única correção de cada vez Nessa abordagem, você configura e executa um único pipeline de dados de uma só vez. Este modo é melhor se você quiser experimentar um único modelo de incorporação e testar um único analisador novo. Databricks sugere começar aqui para get familiarizado com esses notebooks.
- Implementar várias correções de uma só vez Nessa abordagem, também chamada de varredura, você, em paralelo, executa vários pipelines de dados cada um com uma configuração diferente. Esse modo é melhor se você quiser "varrer" muitas estratégias diferentes, por exemplo, avaliar três analisadores de PDF ou avaliar muitos tamanhos de bloco diferentes.
Consulte o repositório GitHub para obter o código de exemplo nesta seção.
Abordagem 1: Implementar uma única correção de cada vez
- Abrir o bloco de notas B_quality_iteration/data_pipeline_fixes/single_fix/00_config
- Siga as instruções abaixo:
- Siga as instruções para implementar uma nova configuração fornecida por este tutorial.
- Siga as etapas para implementar código personalizado para uma análise ou fragmentação.
- Execute o pipeline, através de:
- Abrindo & executando o 00_Run_Entire_Pipeline bloco de anotações.
- Siga as etapas para executar cada etapa do pipeline manualmente.
- Adicione o nome da MLflow Run resultante que é saída para a
DATA_PIPELINE_FIXES_RUN_NAMES
variável no bloco de anotações B_quality_iteration/02_evaluate_fixes
Nota
O pipeline de preparação de dados emprega o Spark Structured Streaming para carregar e processar arquivos de forma incremental. Isso implica que os arquivos já carregados e preparados são rastreados em pontos de verificação e não serão reprocessados. Somente os arquivos recém-adicionados serão carregados, preparados e anexados ao tablescorrespondente.
Portanto, se você deseja executar novamente todo o pipeline do zero e reprocessar todos os documentos, você precisa excluir os pontos de verificação e tables. Você pode fazer isso usando o reset_tables_and_checkpoints bloco de anotações.
Abordagem 2: Implementar várias correções de uma só vez
- Abra o bloco de notas B_quality_iteration/data_pipeline_fixes/multiple_fixes/00_Run_Multiple_Pipelines .
- Siga as instruções no bloco de anotações para adicionar duas ou mais configurações do pipeline de dados a ser executado.
- Execute o bloco de anotações para executar esses pipelines.
- Adicione os nomes das execuções MLflow resultantes que são saídas para a
DATA_PIPELINE_FIXES_RUN_NAMES
variável no B_quality_iteration/02_evaluate_fixes notebook.
Anexo
Nota
Você pode encontrar os blocos de anotações mencionados abaixo nos diretórios single_fix e multiple_fixes , dependendo se você está implementando uma única correção ou várias correções ao mesmo tempo.
Aprofundamento das definições de configuração
As várias opções de configuração pré-implementadas para o pipeline de dados estão listadas abaixo. Como alternativa, você pode implementar um analisador/fragmentador personalizado.
-
vectorsearch_config
: Especifique o ponto de extremidade de pesquisa vetorial (deve estar ativo e em execução) e o nome do índice a ser criado. Além disso, defina o sincronização tipo entre o table de origem e o índice (o padrão éTRIGGERED
). -
embedding_config
: Especifique o modelo de incorporação a ser usado, juntamente com o tokenizador. Para obter uma list completa de opções, consulte osupporting_configs/embedding_models
bloco de anotações. O modelo de incorporação deve ser implantado em um modelo de ponto de extremidade em execução. Dependendo da estratégia de fragmentação, o tokenizador é utilizado durante a divisão para garantir que os fragmentos não excedam o limite de limit tokens do modelo de incorporação. Os tokenizadores são usados aqui para contar o número de tokens nos blocos de texto para garantir que eles não excedam o comprimento máximo de contexto do modelo de incorporação selecionado.
O seguinte mostra um tokenizador do HuggingFace:
"embedding_tokenizer": {
"tokenizer_model_name": "BAAI/bge-large-en-v1.5",
"tokenizer_source": "hugging_face",
}
A seguir mostra um tokenizador do TikToken:
"embedding_tokenizer": {
"tokenizer_model_name": "text-embedding-small",
"tokenizer_source": "tiktoken",
}
-
pipeline_config
: Define o analisador de arquivos, o bloco e o caminho para o campo de códigos-fonte. Analisadores e blocos de anotações são definidos nosparser_library
blocos de anotações echunker_library
, respectivamente. Estes podem ser encontrados nos diretórios single_fix e multiple_fixes . Para obter uma list completa de opções, consulte o notebooksupporting_configs/parser_chunker_strategies
, que está disponível tanto em diretórios de correção única como múltipla. Analisadores ou segmentadores diferentes podem exigir configurações diferentes parameterswhere<param x>
representam o potencial parameters necessário para um segmentador específico. Os analisadores também podem receber a configuração values usando o mesmo formato.
"chunker": {
"name": <chunker-name>,
"config": {
"<param 1>": "...",
"<param 2>": "...",
...
}
}
Implementando um analisador/chunker personalizado
Este projeto é estruturado para facilitar a adição de analisadores personalizados ou chunkers ao pipeline de preparação de dados.
Adicionar um novo analisador
Suponha que você queira incorporar um novo analisador usando a biblioteca PyMuPDF para transformar o texto analisado no formato Markdown. Siga estes passos:
Instale as dependências necessárias adicionando o seguinte código ao
parser_library
bloco de anotações nosingle_fix
diretório oumultiple_fix
:# Dependencies for PyMuPdf %pip install pymupdf pymupdf4llm
parser_library
No bloco de anotações nosingle_fix
diretório oumultiple_fix
, adicione uma nova seção para oPyMuPdfMarkdown
analisador e implemente a função de análise. Certifique-se de que a saída da função está em conformidade com aParserReturnValue
classe definida no início do bloco de notas. Isso garante a compatibilidade com UDFs do Spark. Otry
bloco ouexcept
impede que o Spark falhe todo o trabalho de análise devido a erros em documentos individuais ao aplicar o analisador como UDF no02_parse_docs
bloco de anotações nosingle_fix
diretório oumultiple_fix
. Este bloco de notas verificará se a análise falhou em qualquer documento, colocará em quarentena as linhas correspondentes e emitirá um aviso.import fitz import pymupdf4llm def parse_bytes_pymupdfmarkdown( raw_doc_contents_bytes: bytes, ) -> ParserReturnValue: try: pdf_doc = fitz.Document(stream=raw_doc_contents_bytes, filetype="pdf") md_text = pymupdf4llm.to_markdown(pdf_doc) output = { "num_pages": str(pdf_doc.page_count), "parsed_content": md_text.strip(), } return { OUTPUT_FIELD_NAME: output, STATUS_FIELD_NAME: "SUCCESS", } except Exception as e: warnings.warn(f"Exception {e} has been thrown during parsing") return { OUTPUT_FIELD_NAME: {"num_pages": "", "parsed_content": ""}, STATUS_FIELD_NAME: f"ERROR: {e}", }
Adicione sua nova função de análise ao
parser_factory
noparser_library
bloco de anotações nosingle_fix
diretório oumultiple_fix
para torná-lo configurável nopipeline_config
do bloco de00_config
anotações.No notebook, as funções do analisador são transformadas em UDFs do Spark Python (
02_parse_docs
para seta para Databricks Runtime 14.0 ou superior) e aplicadas ao dataframe que contém os novos arquivos PDF binários. Para teste e desenvolvimento, adicione uma função de teste simples ao bloco de anotações parser_library que carrega o arquivo de test-document.pdf e afirma a análise bem-sucedida:with open("./test_data/test-document.pdf", "rb") as file: file_bytes = file.read() test_result_pymupdfmarkdown = parse_bytes_pymupdfmarkdown(file_bytes) assert test_result_pymupdfmarkdown[STATUS_FIELD_NAME] == "SUCCESS"
Adicionar um novo bloco
O processo para adicionar um novo fragmento segue etapas semelhantes às explicadas acima para um novo analisador.
- Adicione as dependências necessárias no chunker_library bloco de anotações.
- Adicione uma nova seção para seu chunker e implemente uma função, por exemplo,
chunk_parsed_content_newchunkername
. A saída da nova função chunker deve ser um dicionário Python que esteja em conformidade com aChunkerReturnValue
classe definida no início do chunker_library notebook. A função deve aceitar pelo menos uma cadeia de caracteres do texto analisado a ser dividido. Se o seu fragmento requer parametersadicionais, você pode adicioná-los como função parameters. - Adicione seu
chunker_factory
novo bloco de agregação à função definida no chunker_library bloco de anotações. Se sua função aceitar parametersadicionais, use parciais do functools para pré-configurá-los. Isso é necessário porque as UDFs só aceitam um parâmetro de entrada, que será o texto analisado no nosso caso. Ochunker_factory
permite configurar diferentes métodos chunker no pipeline_config e retorna um Spark Python UDF (otimizado para Databricks Runtime 14.0 e superior). - Adicione uma seção de teste simples para sua nova função de fragmentação. Esta seção deve fragmentar um texto predefinido fornecido como uma cadeia de caracteres.
Afinação de Desempenho
O Spark utiliza partições para paralelizar o processamento. Os dados são divididos em partes de linhas e cada partition é processada por um único núcleo por padrão. No entanto, quando os dados são lidos inicialmente pelo Apache Spark, ele pode não criar partições otimizadas para a computação desejada, particularmente para nossos UDFs que executam tarefas de análise e fragmentação. É crucial encontrar um equilíbrio entre a criação de partições que sejam pequenas o suficiente para uma paralelização eficiente e não tão pequenas que a sobrecarga de gerenciá-las supere os benefícios.
Você pode ajustar o número de partições usando df.repartitions(<number of partitions>)
. Ao aplicar UDFs, aponte para um múltiplo do número de núcleos disponíveis nos nós de trabalho. Por exemplo, no 02_parse_docs bloco de anotações, você pode incluir df_raw_bronze = df_raw_bronze.repartition(2*sc.defaultParallelism)
para criar duas vezes mais partições do que o número de núcleos de trabalho disponíveis. Normalmente, um múltiplo entre 1 e 3 deve produzir um desempenho satisfatório.
Executando o pipeline manualmente
Como alternativa, você pode executar cada Bloco de Anotações individual passo a passo:
-
Carregue os arquivos brutos usando o
01_load_files
bloco de anotações. Isso salva cada documento binário como um registro em um table bronze (raw_files_table_name
) definido nodestination_tables_config
. Os arquivos são carregados incrementalmente, processando apenas novos documentos desde a última execução. -
Analise os documentos com o
02_parse_docs
caderno. Este notebook executa oparser_library
notebook (certifique-se de executá-lo como a primeira célula a reiniciar o Python), disponibilizando diferentes analisadores e utilitários relacionados. Em seguida, ele usa o analisador especificado nopipeline_config
para analisar cada documento em texto sem formatação. Como exemplo, metadados relevantes, como o número de páginas do PDF original ao lado do texto analisado, são capturados. Os documentos analisados com êxito são armazenados numa table prateada (parsed_docs_table_name
), enquanto quaisquer documentos não analisados são colocados em quarentena numa correspondente table. -
Fragmente os documentos analisados usando o bloco de
03_chunk_docs
anotações. Semelhante à análise, este bloco de anotações executa ochunker_library
bloco de anotações (novamente, execute como a primeira célula). Ele divide cada documento analisado em partes menores usando o bloco especificado dopipeline_config
. A cada bloco é atribuído um ID exclusivo usando um hash MD5, necessário para a sincronização com o índice de pesquisa vetorial. Os pedaços finais são carregados num(a) table dourado(a) (chunked_docs_table_name
). -
Criar/Sync o índice de pesquisa vetorial com o
04_vector_index
. Este bloco de anotações verifica a prontidão do ponto de extremidade de pesquisa vetorial especificado novectorsearch_config
. Se o índice configurado já existir, ele inicia a sincronização com o tableouro; caso contrário, ele cria o índice e aciona a sincronização. Espera-se que isso leve algum tempo se o ponto de extremidade e o índice da Pesquisa Vetorial ainda não tiverem sido criados.
Próximo passo
Continue com o Passo 7. Implante o monitor de assinaturas.
< Anterior: Passo 6. Corrigir problemas de qualidade de forma iterativa