Condividi tramite


Passaggio 6 (pipeline). Implementare le correzioni della pipeline di dati

Pipeline di dati

Seguire questa procedura per modificare la pipeline di dati ed eseguirla in:

  1. Creare un nuovo indice vettoriale.
  2. Creare un'esecuzione MLflow con i metadati della pipeline di dati.

L'esecuzione MLflow risultante fa riferimento al B_quality_iteration/02_evaluate_fixes notebook.

Esistono due approcci per modificare la pipeline di dati:

  • Implementare una singola correzione alla volta In questo approccio è possibile configurare ed eseguire una singola pipeline di dati contemporaneamente. Questa modalità è ottimale se si vuole provare un singolo modello di incorporamento e testare un singolo parser nuovo. Databricks suggerisce di iniziare da qui per acquisire familiarità con questi notebook.
  • Implementare più correzioni contemporaneamente In questo approccio, detto anche sweep, si esegue in parallelo più pipeline di dati con una configurazione diversa. Questa modalità è ottimale se si vuole "eseguire lo sweep" in molte strategie diverse, ad esempio valutare tre parser PDF o valutare molte dimensioni di blocchi diverse.

Per il codice di esempio in questa sezione, vedere il repository GitHub.

Approccio 1: Implementare una singola correzione alla volta

  1. Aprire il notebook B_quality_iteration/data_pipeline_fixes/single_fix/00_config
  2. Seguire le istruzioni riportate di seguito:
  3. Eseguire la pipeline in uno dei due casi:
  4. Aggiungere il nome dell'esecuzione MLflow risultante restituita alla DATA_PIPELINE_FIXES_RUN_NAMES variabile in B_quality_iteration/02_evaluate_fixes notebook

Nota

La pipeline di preparazione dei dati usa Spark Structured Streaming per caricare ed elaborare i file in modo incrementale. Ciò comporta che i file già caricati e preparati vengono rilevati nei checkpoint e non verranno rielaborati. Solo i file appena aggiunti verranno caricati, preparati e aggiunti alle tabelle corrispondenti.

Pertanto, se si vuole rieseguire l'intera pipeline da zero e rielaborare tutti i documenti, è necessario eliminare i checkpoint e le tabelle. A tale scopo, è possibile usare il notebook reset_tables_and_checkpoints .

Approccio 2: Implementare più correzioni contemporaneamente

  1. Aprire il notebook B_quality_iteration/data_pipeline_fixes/multiple_fixes/00_Run_Multiple_Pipelines .
  2. Seguire le istruzioni nel notebook per aggiungere due o più configurazioni della pipeline di dati da eseguire.
  3. Eseguire il notebook per eseguire queste pipeline.
  4. Aggiungere i nomi delle esecuzioni MLflow risultanti restituite alla DATA_PIPELINE_FIXES_RUN_NAMES variabile in B_quality_iteration/02_evaluate_fixes notebook.

Appendice

Nota

È possibile trovare i notebook a cui si fa riferimento di seguito nelle directory single_fix e multiple_fixes a seconda che si stia implementando una singola correzione o più correzioni alla volta.

Approfondimento delle impostazioni di configurazione

Di seguito sono elencate le varie opzioni di configurazione pre-implementate per la pipeline di dati. In alternativa, è possibile implementare un parser/chunker personalizzato.

  • vectorsearch_config: specificare l'endpoint di ricerca vettoriale (deve essere operativo) e il nome dell'indice da creare. Definire inoltre il tipo di sincronizzazione tra la tabella di origine e l'indice (il valore predefinito è TRIGGERED).
  • embedding_config: specificare il modello di incorporamento da usare, insieme al tokenizer. Per un elenco completo delle opzioni, vedere il supporting_configs/embedding_models notebook. Il modello di incorporamento deve essere distribuito in un endpoint di gestione del modello in esecuzione. A seconda della strategia di suddivisione in blocchi, il tokenizer viene anche durante la suddivisione per assicurarsi che i blocchi non superino il limite di token del modello di incorporamento. I tokenizer vengono usati qui per contare il numero di token nei blocchi di testo per assicurarsi che non superino la lunghezza massima del contesto del modello di incorporamento selezionato.

Di seguito è riportato un tokenizer di HuggingFace:

    "embedding_tokenizer": {
        "tokenizer_model_name": "BAAI/bge-large-en-v1.5",
        "tokenizer_source": "hugging_face",
    }

Di seguito è illustrato un tokenizer di TikToken:

"embedding_tokenizer": {
        "tokenizer_model_name": "text-embedding-small",
        "tokenizer_source": "tiktoken",
    }
  • pipeline_config: definisce il parser di file, il chunker e il percorso del campo sources. I parser e i parser_library blocchi sono definiti rispettivamente nei notebook e chunker_library . Queste sono disponibili nelle directory single_fix e multiple_fixes . Per un elenco completo delle opzioni, vedere il supporting_configs/parser_chunker_strategies notebook, che è di nuovo disponibile sia nelle directory di correzione singola che in più directory di correzione. Parser o blocchi diversi possono richiedere parametri di configurazione diversi, in cui <param x> rappresentano i parametri potenziali necessari per uno specifico chunker. I parser possono anche essere passati valori di configurazione usando lo stesso formato.
    "chunker": {
        "name": <chunker-name>,
        "config": {
            "<param 1>": "...",
            "<param 2>": "...",
            ...
        }
    }

Implementazione di un parser/chunker personalizzato

Questo progetto è strutturato per facilitare l'aggiunta di parser o blocchi personalizzati alla pipeline di preparazione dei dati.

Aggiungere un nuovo parser

Si supponga di voler incorporare un nuovo parser usando la libreria PyMuPDF per trasformare il testo analizzato in formato Markdown. Seguire questa procedura:

  1. Installare le dipendenze necessarie aggiungendo il codice seguente al parser_library notebook nella single_fix directory o multiple_fix :

    # Dependencies for PyMuPdf
    %pip install pymupdf pymupdf4llm
    
  2. parser_library Nel notebook nella single_fix directory o multiple_fix aggiungere una nuova sezione per il PyMuPdfMarkdown parser e implementare la funzione di analisi. Verificare che l'output della funzione sia conforme alla ParserReturnValue classe definita all'inizio del notebook. In questo modo si garantisce la compatibilità con le funzioni definite dall'utente spark. Il try blocco o except impedisce a Spark di non riuscire l'intero processo di analisi a causa di errori nei singoli documenti quando si applica il parser come funzione definita dall'utente nel 02_parse_docs notebook nella single_fix directory o multiple_fix . Questo notebook verificherà se l'analisi non è riuscita per qualsiasi documento, mette in quarantena le righe corrispondenti e genera un avviso.

    import fitz
    import pymupdf4llm
    
    def parse_bytes_pymupdfmarkdown(
        raw_doc_contents_bytes: bytes,
    ) -> ParserReturnValue:
        try:
            pdf_doc = fitz.Document(stream=raw_doc_contents_bytes, filetype="pdf")
            md_text = pymupdf4llm.to_markdown(pdf_doc)
    
            output = {
                "num_pages": str(pdf_doc.page_count),
                "parsed_content": md_text.strip(),
            }
    
            return {
                OUTPUT_FIELD_NAME: output,
                STATUS_FIELD_NAME: "SUCCESS",
            }
        except Exception as e:
            warnings.warn(f"Exception {e} has been thrown during parsing")
            return {
                OUTPUT_FIELD_NAME: {"num_pages": "", "parsed_content": ""},
                STATUS_FIELD_NAME: f"ERROR: {e}",
            }
    
  3. Aggiungere la nuova funzione di analisi a parser_factory nel notebook nella single_fix directory o multiple_fix per renderla configurabile nel pipeline_config notebook 00_config parser_library.

  4. 02_parse_docs Nel notebook le funzioni parser vengono trasformate in funzioni definite dall'utente Python Spark (ottimizzate per databricks Runtime 14.0 o versione successiva) e applicate al dataframe contenente i nuovi file PDF binari. Per il test e lo sviluppo, aggiungere una semplice funzione di test al notebook di parser_library che carica il file test-document.pdf e asserisce l'analisi corretta:

    with open("./test_data/test-document.pdf", "rb") as file:
        file_bytes = file.read()
        test_result_pymupdfmarkdown = parse_bytes_pymupdfmarkdown(file_bytes)
    
    assert test_result_pymupdfmarkdown[STATUS_FIELD_NAME] == "SUCCESS"
    

Aggiungere un nuovo chunker

Il processo di aggiunta di un nuovo chunker segue passaggi simili a quelli illustrati in precedenza per un nuovo parser.

  1. Aggiungere le dipendenze necessarie nel notebook di chunker_library .
  2. Aggiungere una nuova sezione per il chunker e implementare una funzione, ad esempio chunk_parsed_content_newchunkername. L'output della nuova funzione chunker deve essere un dizionario Python conforme alla ChunkerReturnValue classe definita all'inizio del notebook chunker_library . La funzione deve accettare almeno una stringa del testo analizzato da suddividere in blocchi. Se il blocco di blocchi richiede parametri aggiuntivi, è possibile aggiungerli come parametri di funzione.
  3. Aggiungere il nuovo chunker alla chunker_factory funzione definita nel notebook di chunker_library . Se la funzione accetta parametri aggiuntivi, usare functools parziale per preconfigurarli. Ciò è necessario perché le funzioni definite dall'utente accettano un solo parametro di input, che sarà il testo analizzato in questo caso. chunker_factory consente di configurare diversi metodi di suddivisione in blocchi nella pipeline_config e di restituire una funzione definita dall'utente Python Spark (ottimizzata per Databricks Runtime 14.0 e versioni successive).
  4. Aggiungere una sezione di test semplice per la nuova funzione di suddivisione in blocchi. Questa sezione deve suddividere in blocchi un testo predefinito fornito come stringa.

Ottimizzazione delle prestazioni

Spark usa le partizioni per parallelizzare l'elaborazione. I dati sono suddivisi in blocchi di righe e ogni partizione viene elaborata da un singolo core per impostazione predefinita. Tuttavia, quando i dati vengono letti inizialmente da Apache Spark, potrebbe non creare partizioni ottimizzate per il calcolo desiderato, in particolare per le funzioni definite dall'utente che eseguono attività di analisi e suddivisione in blocchi. È fondamentale trovare un equilibrio tra la creazione di partizioni sufficientemente piccole per una parallelizzazione efficiente e non così piccola che il sovraccarico di gestione di tali partizioni superi i vantaggi.

È possibile modificare il numero di partizioni usando df.repartitions(<number of partitions>). Quando si applicano funzioni definite dall'utente, mirare a un multiplo del numero di core disponibili nei nodi di lavoro. Ad esempio, nel notebook di 02_parse_docs , è possibile includere df_raw_bronze = df_raw_bronze.repartition(2*sc.defaultParallelism) per creare il doppio di partizioni del numero di core di lavoro disponibili. In genere, un multiplo compreso tra 1 e 3 dovrebbe produrre prestazioni soddisfacenti.

Esecuzione manuale della pipeline

In alternativa, è possibile eseguire ogni singolo notebook procedura dettagliata:

  1. Caricare i file non elaborati usando il 01_load_files notebook. In questo modo ogni file binario di documento viene salvato come record in una tabella bronze (raw_files_table_name) definita in destination_tables_config. I file vengono caricati in modo incrementale, elaborando solo nuovi documenti dall'ultima esecuzione.
  2. Analizzare i documenti con il 02_parse_docs notebook. Questo notebook esegue il parser_library notebook (assicurarsi di eseguirlo come prima cella per riavviare Python), rendendo disponibili parser diversi e utilità correlate. Usa quindi il parser specificato in pipeline_config per analizzare ogni documento in testo normale. Ad esempio, vengono acquisiti metadati rilevanti come il numero di pagine del PDF originale insieme al testo analizzato. I documenti analizzati correttamente vengono archiviati in una tabella silver (parsed_docs_table_name), mentre tutti i documenti non archiviati vengono messi in quarantena in una tabella corrispondente.
  3. Suddividere i documenti analizzati usando il 03_chunk_docs notebook. Analogamente all'analisi, questo notebook esegue il chunker_library notebook (eseguire di nuovo come prima cella). Suddivide ogni documento analizzato in blocchi più piccoli usando il blocco specificato da pipeline_config. A ogni blocco viene assegnato un ID univoco usando un hash MD5, necessario per la sincronizzazione con l'indice di ricerca vettoriale. I blocchi finali vengono caricati in una tabella gold (chunked_docs_table_name).
  4. Creare/sincronizzare l'indice di ricerca vettoriale con .04_vector_index Questo notebook verifica la conformità dell'endpoint di ricerca del vettore specificato in vectorsearch_config. Se l'indice configurato esiste già, avvia la sincronizzazione con la tabella gold; in caso contrario, crea l'indice e attiva la sincronizzazione. Questa operazione richiede tempo se l'endpoint e l'indice di Ricerca vettoriale non sono ancora stati creati.

Passaggio successivo

Continuare con il passaggio 7. Distribuisci e monitor.