Étape 6 (pipelines). Implémenter des correctifs de pipeline de données
Effectuez ces étapes pour modifier votre pipeline de données et l’exécuter afin de :
- Créer un index vectoriel.
- Créer une exécution MLflow avec les métadonnées du pipeline de données.
L’exécution MLflow résultante est référencée par le notebook B_quality_iteration/02_evaluate_fixes
.
Il existe deux approches pour modifier le pipeline de données :
- Implémenter un correctif unique à la fois Avec cette approche, vous configurez et exécutez un seul pipeline de données à la fois. Ce mode est préférable si vous souhaitez essayer un seul modèle d’incorporations et tester un seul nouvel analyseur. Databricks suggère de commencer ici pour vous familiariser avec ces notebooks.
- Implémenter plusieurs correctifs à la fois Avec cette approche, également appelée balayage, vous exécutez en parallèle plusieurs pipelines de données qui ont chacun une configuration différente. Ce mode est préférable si vous souhaitez « balayer » plusieurs stratégies différentes, par exemple, évaluer trois analyseurs PDF ou évaluer de nombreuses tailles de bloc différentes.
Consultez le dépôt GitHub pour obtenir l’exemple de code de cette section.
Approche 1 : Implémenter un correctif unique à la fois
- Ouvrez le notebook B_quality_iteration/data_pipeline_fixes/single_fix/00_config.
- Suivez l’une des instructions ci-dessous :
- Suivez les instructions pour implémenter une nouvelle configuration fournie par ce livre de recettes.
- Suivez les étapes pour implémenter du code personnalisé pour une analyse ou une segmentation.
- Exécutez le pipeline en effectuant l’une des opérations suivantes :
- Ouverture et exécution du notebook 00_Run_Entire_Pipeline.
- Exécution des étapes pour exécuter chaque étape du pipeline manuellement.
- Ajoutez le nom de l’exécution MLflow résultante qui est générée à la variable
DATA_PIPELINE_FIXES_RUN_NAMES
dans le notebook B_quality_iteration/02_evaluate_fixes.
Remarque
Le pipeline de préparation des données utilise Spark Structured Streaming pour charger et traiter de manière incrémentielle les fichiers. Cela implique que les fichiers déjà chargés et préparés sont suivis dans des points de contrôle et ne seront pas traités à nouveau. Seuls les fichiers nouvellement ajoutés seront chargés, préparés et ajoutés aux tables correspondantes.
Par conséquent, si vous souhaitez réexécuter l’intégralité du pipeline à partir de zéro et retraiter tous les documents, vous devez supprimer les points de contrôle et les tables. Pour ce faire, utilisez le notebook reset_tables_and_checkpoints.
Approche 2 : Implémenter plusieurs correctifs à la fois
- Ouvrez le notebook B_quality_iteration/data_pipeline_fixes/multiple_fixes/00_Run_Multiple_Pipelines.
- Suivez les instructions du notebook pour ajouter plusieurs configurations du pipeline de données à exécuter.
- Exécutez le notebook pour exécuter ces pipelines.
- Ajoutez le nom des exécutions MLflow résultantes qui sont générées à la variable
DATA_PIPELINE_FIXES_RUN_NAMES
dans le notebook B_quality_iteration/02_evaluate_fixes.
Annexe
Remarque
Vous trouverez les notebooks référencés ci-dessous dans les répertoires single_fix et multiple_fixes, selon que vous implémentez un correctif unique ou plusieurs correctifs à la fois.
Présentation approfondie des paramètres de configuration
Les différentes options de configuration pré-implémentées pour le pipeline de données sont répertoriées ci-dessous. En guise d’alternative, vous pouvez implémenter un analyseur/segmenteur personnalisé.
vectorsearch_config
: spécifiez le point de terminaison de recherche vectorielle (il doit être opérationnel) et le nom de l’index à créer. Définissez également le type de synchronisation entre la table source et l’index (la valeur par défaut estTRIGGERED
).embedding_config
: spécifiez le modèle d’incorporations à utiliser, ainsi que le générateur de jetons. Pour obtenir la liste complète des options, consultez le notebooksupporting_configs/embedding_models
. Le modèle d’incorporations doit être déployé sur un point de terminaison de service de modèle en cours d’exécution. En fonction de la stratégie de segmentation, le générateur de jetons est également utilisé lors du fractionnement afin de s’assurer que les segments ne dépassent pas la limite du nombre de jetons du modèle d’incorporations. Les générateurs de jetons sont utilisés ici pour compter le nombre de jetons dans les segments de texte, afin de s’assurer qu’ils ne dépassent pas la longueur de contexte maximale du modèle d’incorporations sélectionné.
Voici un générateur de jetons de HuggingFace :
"embedding_tokenizer": {
"tokenizer_model_name": "BAAI/bge-large-en-v1.5",
"tokenizer_source": "hugging_face",
}
Voici un générateur de jetons de TikToken :
"embedding_tokenizer": {
"tokenizer_model_name": "text-embedding-small",
"tokenizer_source": "tiktoken",
}
pipeline_config
: définit l’analyseur de fichiers, le segmenteur et le chemin d’accès au champ source. Les analyseurs et les segmenteurs sont définis respectivement dans les notebooksparser_library
etchunker_library
. Ceux-ci se trouvent dans les répertoires single_fix et multiple_fixes. Pour obtenir la liste complète des options, consultez le notessupporting_configs/parser_chunker_strategies
, qui est à nouveau disponible dans les répertoires de correctifs uniques et multiples. Différents analyseurs ou segmenteurs peuvent nécessiter différents paramètres de configuration où<param x>
représentent les paramètres potentiels requis pour un segmenteur spécifique. Des valeurs de configuration peuvent également être transmises aux analyseurs à l’aide du même format.
"chunker": {
"name": <chunker-name>,
"config": {
"<param 1>": "...",
"<param 2>": "...",
...
}
}
Implémentation d’un analyseur/segmenteur personnalisé
Ce projet est structuré afin de faciliter l’ajout d’analyseurs ou segmenteurs personnalisés au pipeline de préparation des données.
Ajouter un nouvel analyseur
Supposez que vous souhaitez incorporer un nouvel analyseur à l’aide de la bibliothèque PyMuPDF pour transformer le texte analysé au format Markdown. Effectuez les étapes suivantes :
Installez les dépendances requises en ajoutant le code suivant au notebook
parser_library
dans le répertoiresingle_fix
oumultiple_fix
:# Dependencies for PyMuPdf %pip install pymupdf pymupdf4llm
Dans le notebook
parser_library
dans le répertoiresingle_fix
oumultiple_fix
, ajoutez une nouvelle section pour l’analyseurPyMuPdfMarkdown
et implémentez la fonction d’analyse. Vérifiez que la sortie de la fonction est conforme à la classeParserReturnValue
définie au début du notebook. Cela garantit la compatibilité avec les fonctions Spark définies par l’utilisateur. Le bloctry
ouexcept
empêche que Spark fasse échouer l’intégralité du travail d’analyse à cause d’erreurs dans des documents individuels lors de l’application de l’analyseur en tant que fonction définie par l’utilisateur dans le notebook02_parse_docs
dans le répertoiresingle_fix
oumultiple_fix
. Ce notebook vérifie si l’analyse a échoué pour un document, met en quarantaine les lignes correspondantes, et déclenche un avertissement.import fitz import pymupdf4llm def parse_bytes_pymupdfmarkdown( raw_doc_contents_bytes: bytes, ) -> ParserReturnValue: try: pdf_doc = fitz.Document(stream=raw_doc_contents_bytes, filetype="pdf") md_text = pymupdf4llm.to_markdown(pdf_doc) output = { "num_pages": str(pdf_doc.page_count), "parsed_content": md_text.strip(), } return { OUTPUT_FIELD_NAME: output, STATUS_FIELD_NAME: "SUCCESS", } except Exception as e: warnings.warn(f"Exception {e} has been thrown during parsing") return { OUTPUT_FIELD_NAME: {"num_pages": "", "parsed_content": ""}, STATUS_FIELD_NAME: f"ERROR: {e}", }
Ajoutez votre nouvelle fonction d’analyse au
parser_factory
dans le notebookparser_library
dans le répertoiresingle_fix
oumultiple_fix
afin de la rendre configurable dans lepipeline_config
du notebook00_config
.Dans le notebook
02_parse_docs
, les fonctions d’analyseur sont transformées en fonctions Python Spark définies par l’utilisateur (optimisées pour Arrow pour Databricks Runtime 14.0 ou versions ultérieures) et appliquées au dataframe contenant les nouveaux fichiers PDF binaires. Pour les tests et le développement, ajoutez au notebook parser_library une fonction de test simple qui charge le fichier test-document.pdf et confirme la réussite de l’analyse :with open("./test_data/test-document.pdf", "rb") as file: file_bytes = file.read() test_result_pymupdfmarkdown = parse_bytes_pymupdfmarkdown(file_bytes) assert test_result_pymupdfmarkdown[STATUS_FIELD_NAME] == "SUCCESS"
Ajouter un nouveau segmenteur
Le processus d’ajout d’un nouveau segmenteur suit des étapes similaires à celles expliquées ci-dessus pour un nouvel analyseur.
- Ajoutez les dépendances requises dans le notebook chunker_library.
- Ajoutez une nouvelle section pour votre segmenteur et implémentez une fonction, par exemple,
chunk_parsed_content_newchunkername
. La sortie de la nouvelle fonction de segmenteur doit être un dictionnaire Python conforme à la classeChunkerReturnValue
définie au début du notebook chunker_library. La fonction doit accepter au moins une chaîne du texte analysé à segmenter. Si votre segmenteur nécessite des paramètres supplémentaires, vous pouvez les ajouter en tant que paramètres de fonction. - Ajoutez votre nouveau segmenteur à la fonction
chunker_factory
définie dans le notebook chunker_library. Si votre fonction accepte des paramètres supplémentaires, utilisez partial de functools pour les préconfigurer. Ceci est nécessaire, car les fonctions définies par l’utilisateur n’acceptent qu’un seul paramètre d’entrée, qui sera le texte analysé dans notre cas.chunker_factory
vous permet de configurer différentes méthodes de segmenteur dans pipeline_config, et retourne une fonction Python Spark définie par l’utilisateur (optimisée pour Databricks Runtime 14.0 et versions ultérieures). - Ajoutez une section de test simple pour votre nouvelle fonction de segmentation. Cette section doit segmenter un texte prédéfini fourni sous forme de chaîne.
Réglage des performances
Spark utilise des partitions pour paralléliser le traitement. Les données sont divisées en segments de lignes, et chaque partition est traitée par un seul cœur par défaut. Toutefois, lorsque les données sont initialement lues par Apache Spark, celui-ci peut ne pas créer de partitions optimisées pour le calcul souhaité, en particulier pour nos fonctions définies par l’utilisateur effectuant des tâches d’analyse et de segmentation. Il est essentiel de trouver un équilibre entre la création de partitions suffisamment petites pour une parallélisation efficace, mais pas si petite que la surcharge de gestion des partitions annule les avantages.
Vous pouvez ajuster le nombre de partitions à l’aide de df.repartitions(<number of partitions>)
. Lors de l’application de fonctions définies par l’utilisateur, visez un multiple du nombre de cœurs disponibles sur les nœuds Worker. Par exemple, dans le notebook 02_parse_docs, vous pouvez inclure df_raw_bronze = df_raw_bronze.repartition(2*sc.defaultParallelism)
afin de créer deux fois plus de partitions que le nombre de cœurs Worker disponibles. En règle générale, un multiple compris entre un et trois devrait produire des performances satisfaisantes.
Exécution manuelle du pipeline
Vous pouvez également exécuter chaque notebook étape par étape :
- Chargez les fichiers bruts à l’aide du notebook
01_load_files
. Cela enregistre chaque fichier binaire de document sous la forme d’un enregistrement dans une table bronze (raw_files_table_name
) définie dansdestination_tables_config
. Les fichiers sont chargés de manière incrémentielle, ne traitant que les nouveaux documents depuis la dernière exécution. - Analysez les documents avec le notebook
02_parse_docs
. Ce notebook exécute le notebookparser_library
(veillez à l’exécuter en tant que première cellule pour redémarrer Python), ce qui rend différents analyseurs et utilitaires associés disponibles. Il utilise ensuite l’analyseur spécifié danspipeline_config
pour analyser chaque document en texte brut. Par exemple, les métadonnées pertinentes telles que le nombre de pages du fichier PDF d’origine sont capturées en même temps que le texte analysé. Les documents analysés avec succès sont stockés dans une table argent (parsed_docs_table_name
), tandis que les documents non analysés sont mis en quarantaine dans une table correspondante. - Segmentez les documents analysés à l’aide du notebook
03_chunk_docs
. Comme pour l’analyse, ce notebook exécute le notechunker_library
(là encore, exécutez-le en tant que première cellule). Il fractionne chaque document analysé en segments plus petits à l’aide du segmenteur spécifié danspipeline_config
. Un ID unique est affecté à chaque segment à l’aide d’un hachage MD5, nécessaire pour la synchronisation avec l’index de recherche vectorielle. Les segments finaux sont chargés dans une table or (chunked_docs_table_name
). - Créez/synchronisez l’index de recherche vectorielle avec le
04_vector_index
. Ce notebook vérifie la préparation du point de terminaison de recherche vectorielle spécifié dansvectorsearch_config
. Si l’index configuré existe déjà, il lance la synchronisation avec la table or ; sinon, il crée l’index et déclenche la synchronisation. Cela doit prendre un certain temps si le point de terminaison de recherche vectorielle et l’index n’ont pas encore été créés.
Étape suivante
Passez à l’Étape 7. Déployer et surveiller.