Partager via


Étape 6 (pipelines). Implémenter des correctifs de pipeline de données

Pipeline de données

Effectuez ces étapes pour modifier votre pipeline de données et l’exécuter afin de :

  1. Créer un index vectoriel.
  2. Créer une exécution MLflow avec les métadonnées du pipeline de données.

L’exécution MLflow résultante est référencée par le notebook B_quality_iteration/02_evaluate_fixes.

Il existe deux approches pour modifier le pipeline de données :

  • Implémenter un correctif unique à la fois Avec cette approche, vous configurez et exécutez un seul pipeline de données à la fois. Ce mode est préférable si vous souhaitez essayer un seul modèle d’incorporations et tester un seul nouvel analyseur. Databricks suggère de commencer ici pour vous familiariser avec ces notebooks.
  • Implémenter plusieurs correctifs à la fois Avec cette approche, également appelée balayage, vous exécutez en parallèle plusieurs pipelines de données qui ont chacun une configuration différente. Ce mode est préférable si vous souhaitez « balayer » plusieurs stratégies différentes, par exemple, évaluer trois analyseurs PDF ou évaluer de nombreuses tailles de bloc différentes.

Consultez le dépôt GitHub pour obtenir l’exemple de code de cette section.

Approche 1 : Implémenter un correctif unique à la fois

  1. Ouvrez le notebook B_quality_iteration/data_pipeline_fixes/single_fix/00_config.
  2. Suivez l’une des instructions ci-dessous :
  3. Exécutez le pipeline en effectuant l’une des opérations suivantes :
  4. Ajoutez le nom de l’exécution MLflow résultante qui est générée à la variable DATA_PIPELINE_FIXES_RUN_NAMES dans le notebook B_quality_iteration/02_evaluate_fixes.

Remarque

Le pipeline de préparation des données utilise Spark Structured Streaming pour charger et traiter de manière incrémentielle les fichiers. Cela implique que les fichiers déjà chargés et préparés sont suivis dans des points de contrôle et ne seront pas traités à nouveau. Seuls les fichiers nouvellement ajoutés seront chargés, préparés et ajoutés aux tables correspondantes.

Par conséquent, si vous souhaitez réexécuter l’intégralité du pipeline à partir de zéro et retraiter tous les documents, vous devez supprimer les points de contrôle et les tables. Pour ce faire, utilisez le notebook reset_tables_and_checkpoints.

Approche 2 : Implémenter plusieurs correctifs à la fois

  1. Ouvrez le notebook B_quality_iteration/data_pipeline_fixes/multiple_fixes/00_Run_Multiple_Pipelines.
  2. Suivez les instructions du notebook pour ajouter plusieurs configurations du pipeline de données à exécuter.
  3. Exécutez le notebook pour exécuter ces pipelines.
  4. Ajoutez le nom des exécutions MLflow résultantes qui sont générées à la variable DATA_PIPELINE_FIXES_RUN_NAMES dans le notebook B_quality_iteration/02_evaluate_fixes.

Annexe

Remarque

Vous trouverez les notebooks référencés ci-dessous dans les répertoires single_fix et multiple_fixes, selon que vous implémentez un correctif unique ou plusieurs correctifs à la fois.

Présentation approfondie des paramètres de configuration

Les différentes options de configuration pré-implémentées pour le pipeline de données sont répertoriées ci-dessous. En guise d’alternative, vous pouvez implémenter un analyseur/segmenteur personnalisé.

  • vectorsearch_config : spécifiez le point de terminaison de recherche vectorielle (il doit être opérationnel) et le nom de l’index à créer. Définissez également le type de synchronisation entre la table source et l’index (la valeur par défaut est TRIGGERED).
  • embedding_config : spécifiez le modèle d’incorporations à utiliser, ainsi que le générateur de jetons. Pour obtenir la liste complète des options, consultez le notebook supporting_configs/embedding_models. Le modèle d’incorporations doit être déployé sur un point de terminaison de service de modèle en cours d’exécution. En fonction de la stratégie de segmentation, le générateur de jetons est également utilisé lors du fractionnement afin de s’assurer que les segments ne dépassent pas la limite du nombre de jetons du modèle d’incorporations. Les générateurs de jetons sont utilisés ici pour compter le nombre de jetons dans les segments de texte, afin de s’assurer qu’ils ne dépassent pas la longueur de contexte maximale du modèle d’incorporations sélectionné.

Voici un générateur de jetons de HuggingFace :

    "embedding_tokenizer": {
        "tokenizer_model_name": "BAAI/bge-large-en-v1.5",
        "tokenizer_source": "hugging_face",
    }

Voici un générateur de jetons de TikToken :

"embedding_tokenizer": {
        "tokenizer_model_name": "text-embedding-small",
        "tokenizer_source": "tiktoken",
    }
  • pipeline_config : définit l’analyseur de fichiers, le segmenteur et le chemin d’accès au champ source. Les analyseurs et les segmenteurs sont définis respectivement dans les notebooks parser_library et chunker_library. Ceux-ci se trouvent dans les répertoires single_fix et multiple_fixes. Pour obtenir la liste complète des options, consultez le notes supporting_configs/parser_chunker_strategies, qui est à nouveau disponible dans les répertoires de correctifs uniques et multiples. Différents analyseurs ou segmenteurs peuvent nécessiter différents paramètres de configuration où <param x> représentent les paramètres potentiels requis pour un segmenteur spécifique. Des valeurs de configuration peuvent également être transmises aux analyseurs à l’aide du même format.
    "chunker": {
        "name": <chunker-name>,
        "config": {
            "<param 1>": "...",
            "<param 2>": "...",
            ...
        }
    }

Implémentation d’un analyseur/segmenteur personnalisé

Ce projet est structuré afin de faciliter l’ajout d’analyseurs ou segmenteurs personnalisés au pipeline de préparation des données.

Ajouter un nouvel analyseur

Supposez que vous souhaitez incorporer un nouvel analyseur à l’aide de la bibliothèque PyMuPDF pour transformer le texte analysé au format Markdown. Effectuez les étapes suivantes :

  1. Installez les dépendances requises en ajoutant le code suivant au notebook parser_library dans le répertoire single_fix ou multiple_fix :

    # Dependencies for PyMuPdf
    %pip install pymupdf pymupdf4llm
    
  2. Dans le notebook parser_library dans le répertoire single_fix ou multiple_fix, ajoutez une nouvelle section pour l’analyseur PyMuPdfMarkdown et implémentez la fonction d’analyse. Vérifiez que la sortie de la fonction est conforme à la classe ParserReturnValue définie au début du notebook. Cela garantit la compatibilité avec les fonctions Spark définies par l’utilisateur. Le bloc try ou except empêche que Spark fasse échouer l’intégralité du travail d’analyse à cause d’erreurs dans des documents individuels lors de l’application de l’analyseur en tant que fonction définie par l’utilisateur dans le notebook 02_parse_docs dans le répertoire single_fix ou multiple_fix. Ce notebook vérifie si l’analyse a échoué pour un document, met en quarantaine les lignes correspondantes, et déclenche un avertissement.

    import fitz
    import pymupdf4llm
    
    def parse_bytes_pymupdfmarkdown(
        raw_doc_contents_bytes: bytes,
    ) -> ParserReturnValue:
        try:
            pdf_doc = fitz.Document(stream=raw_doc_contents_bytes, filetype="pdf")
            md_text = pymupdf4llm.to_markdown(pdf_doc)
    
            output = {
                "num_pages": str(pdf_doc.page_count),
                "parsed_content": md_text.strip(),
            }
    
            return {
                OUTPUT_FIELD_NAME: output,
                STATUS_FIELD_NAME: "SUCCESS",
            }
        except Exception as e:
            warnings.warn(f"Exception {e} has been thrown during parsing")
            return {
                OUTPUT_FIELD_NAME: {"num_pages": "", "parsed_content": ""},
                STATUS_FIELD_NAME: f"ERROR: {e}",
            }
    
  3. Ajoutez votre nouvelle fonction d’analyse au parser_factory dans le notebook parser_library dans le répertoire single_fix ou multiple_fix afin de la rendre configurable dans le pipeline_config du notebook 00_config.

  4. Dans le notebook 02_parse_docs, les fonctions d’analyseur sont transformées en fonctions Python Spark définies par l’utilisateur (optimisées pour Arrow pour Databricks Runtime 14.0 ou versions ultérieures) et appliquées au dataframe contenant les nouveaux fichiers PDF binaires. Pour les tests et le développement, ajoutez au notebook parser_library une fonction de test simple qui charge le fichier test-document.pdf et confirme la réussite de l’analyse :

    with open("./test_data/test-document.pdf", "rb") as file:
        file_bytes = file.read()
        test_result_pymupdfmarkdown = parse_bytes_pymupdfmarkdown(file_bytes)
    
    assert test_result_pymupdfmarkdown[STATUS_FIELD_NAME] == "SUCCESS"
    

Ajouter un nouveau segmenteur

Le processus d’ajout d’un nouveau segmenteur suit des étapes similaires à celles expliquées ci-dessus pour un nouvel analyseur.

  1. Ajoutez les dépendances requises dans le notebook chunker_library.
  2. Ajoutez une nouvelle section pour votre segmenteur et implémentez une fonction, par exemple, chunk_parsed_content_newchunkername. La sortie de la nouvelle fonction de segmenteur doit être un dictionnaire Python conforme à la classe ChunkerReturnValue définie au début du notebook chunker_library. La fonction doit accepter au moins une chaîne du texte analysé à segmenter. Si votre segmenteur nécessite des paramètres supplémentaires, vous pouvez les ajouter en tant que paramètres de fonction.
  3. Ajoutez votre nouveau segmenteur à la fonction chunker_factory définie dans le notebook chunker_library. Si votre fonction accepte des paramètres supplémentaires, utilisez partial de functools pour les préconfigurer. Ceci est nécessaire, car les fonctions définies par l’utilisateur n’acceptent qu’un seul paramètre d’entrée, qui sera le texte analysé dans notre cas. chunker_factory vous permet de configurer différentes méthodes de segmenteur dans pipeline_config, et retourne une fonction Python Spark définie par l’utilisateur (optimisée pour Databricks Runtime 14.0 et versions ultérieures).
  4. Ajoutez une section de test simple pour votre nouvelle fonction de segmentation. Cette section doit segmenter un texte prédéfini fourni sous forme de chaîne.

Réglage des performances

Spark utilise des partitions pour paralléliser le traitement. Les données sont divisées en segments de lignes, et chaque partition est traitée par un seul cœur par défaut. Toutefois, lorsque les données sont initialement lues par Apache Spark, celui-ci peut ne pas créer de partitions optimisées pour le calcul souhaité, en particulier pour nos fonctions définies par l’utilisateur effectuant des tâches d’analyse et de segmentation. Il est essentiel de trouver un équilibre entre la création de partitions suffisamment petites pour une parallélisation efficace, mais pas si petite que la surcharge de gestion des partitions annule les avantages.

Vous pouvez ajuster le nombre de partitions à l’aide de df.repartitions(<number of partitions>). Lors de l’application de fonctions définies par l’utilisateur, visez un multiple du nombre de cœurs disponibles sur les nœuds Worker. Par exemple, dans le notebook 02_parse_docs, vous pouvez inclure df_raw_bronze = df_raw_bronze.repartition(2*sc.defaultParallelism) afin de créer deux fois plus de partitions que le nombre de cœurs Worker disponibles. En règle générale, un multiple compris entre un et trois devrait produire des performances satisfaisantes.

Exécution manuelle du pipeline

Vous pouvez également exécuter chaque notebook étape par étape :

  1. Chargez les fichiers bruts à l’aide du notebook 01_load_files. Cela enregistre chaque fichier binaire de document sous la forme d’un enregistrement dans une table bronze (raw_files_table_name) définie dans destination_tables_config. Les fichiers sont chargés de manière incrémentielle, ne traitant que les nouveaux documents depuis la dernière exécution.
  2. Analysez les documents avec le notebook 02_parse_docs. Ce notebook exécute le notebook parser_library (veillez à l’exécuter en tant que première cellule pour redémarrer Python), ce qui rend différents analyseurs et utilitaires associés disponibles. Il utilise ensuite l’analyseur spécifié dans pipeline_config pour analyser chaque document en texte brut. Par exemple, les métadonnées pertinentes telles que le nombre de pages du fichier PDF d’origine sont capturées en même temps que le texte analysé. Les documents analysés avec succès sont stockés dans une table argent (parsed_docs_table_name), tandis que les documents non analysés sont mis en quarantaine dans une table correspondante.
  3. Segmentez les documents analysés à l’aide du notebook 03_chunk_docs. Comme pour l’analyse, ce notebook exécute le note chunker_library (là encore, exécutez-le en tant que première cellule). Il fractionne chaque document analysé en segments plus petits à l’aide du segmenteur spécifié dans pipeline_config. Un ID unique est affecté à chaque segment à l’aide d’un hachage MD5, nécessaire pour la synchronisation avec l’index de recherche vectorielle. Les segments finaux sont chargés dans une table or (chunked_docs_table_name).
  4. Créez/synchronisez l’index de recherche vectorielle avec le 04_vector_index. Ce notebook vérifie la préparation du point de terminaison de recherche vectorielle spécifié dans vectorsearch_config. Si l’index configuré existe déjà, il lance la synchronisation avec la table or ; sinon, il crée l’index et déclenche la synchronisation. Cela doit prendre un certain temps si le point de terminaison de recherche vectorielle et l’index n’ont pas encore été créés.

Étape suivante

Passez à l’Étape 7. Déployer et surveiller.