L’analytique en temps réel peut vous aider à prendre des décisions rapides et à effectuer des actions automatisées en fonction des insights actuels. Il peut également vous aider à offrir des expériences client améliorées. Cette solution décris comment conserver les pools de données Azure Synapse Analytics synchronisés avec les modifications de données opérationnelles dans MongoDB.
Architecture
Le diagramme suivant montre comment implémenter la synchronisation en temps réel Atlas vers Azure Synapse Analytics. Ce flux simple garantit que toutes les modifications qui se produisent dans la collection Atlas MongoDB sont répliquées dans le référentiel Azure Data Lake Storage par défaut dans l’espace de travail Azure Synapse Analytics. Une fois que les données se trouvent dans Data Lake Storage, vous pouvez utiliser des pipelines Azure Synapse Analytics pour envoyer (push) les données vers des pools SQL dédiés, des pools Spark ou d’autres solutions, en fonction de vos besoins d’analyse.
Téléchargez un fichier PowerPoint de cette architecture.
Dataflow
Les modifications en temps réel dans le magasin de données opérationnel MongoDB Atlas (ODS) sont capturées et mises à la disposition de Data Lake Storage dans un espace de travail Azure Synapse Analytics pour les cas d’usage d’analytique en temps réel, les rapports en direct et les tableaux de bord.
Les modifications de données dans le magasin de données opérationnel/transactionnel MongoDB Atlas sont capturées par les déclencheurs Atlas.
Lorsqu’un déclencheur de base de données Atlas observe un événement, il transmet le type de modification et le document modifié (complet ou delta) à une fonction Atlas.
La fonction Atlas déclenche une fonction Azure, en passant l’événement de modification et un document JSON.
Azure Functions utilise la bibliothèque cliente Azure Storage Files Data Lake pour écrire le document modifié dans data Lake Storage configuré dans l’espace de travail Azure Synapse Analytics.
Une fois que les données se trouvent dans Data Lake Storage, elles peuvent être envoyées à des pools SQL dédiés, des pools Spark et d’autres solutions. Vous pouvez également convertir les données de JSON en formats Parquet ou Delta à l’aide de flux de données Azure Synapse Analytics ou de pipelines de copie pour exécuter des rapports BI supplémentaires ou IA/machine learning sur les données actuelles.
Composants
- Flux de modification MongoDB Atlas vous permettent d’informer les applications des modifications apportées à un cluster de collecte, de base de données ou de déploiement. Les flux de modification donnent aux applications l’accès aux modifications de données en temps réel et leur permettent de réagir immédiatement aux modifications. Cette fonctionnalité est essentielle dans les cas d’usage tels que le suivi des événements IoT et les modifications des données financières, où les alarmes doivent être déclenchées et les actions réactives doivent être effectuées immédiatement. Les déclencheurs Atlas utilisent des flux de modification pour surveiller les regroupements pour les modifications et appeler automatiquement la fonction Atlas associée en réponse à l’événement de déclencheur.
- Déclencheurs Atlas répondre aux insertions, mises à jour et suppressions de documents dans une collection spécifique et peut appeler automatiquement une fonction Atlas en réponse à l’événement de modification.
- Fonctions Atlas sont des implémentations de code JavaScript côté serveur serverless qui peuvent effectuer des actions basées sur les événements qui appellent un déclencheur Atlas. La combinaison de déclencheurs Atlas avec les fonctions Atlas simplifie l’implémentation d’architectures pilotées par les événements.
- Fonctions Azure est une plateforme de calcul serverless basée sur les événements que vous pouvez utiliser pour développer efficacement des applications avec le langage de programmation de votre choix. Vous pouvez également l’utiliser pour vous connecter en toute transparence avec d’autres services Azure. Dans ce scénario, une fonction Azure capture un événement de modification et l’utilise pour écrire un objet blob contenant les données modifiées dans Data Lake Storage à l’aide de la bibliothèque cliente Azure Storage Files Data Lake.
- Data Lake Storage est la solution de stockage par défaut dans Azure Synapse Analytics. Vous pouvez utiliser des pools serverless pour interroger les données directement.
- Pipelines et flux de données dans Azure Synapse Analytics peut être utilisé pour envoyer (push) l’objet blob qui contient les données modifiées MongoDB vers des pools SQL dédiés ou des pools Spark pour une analyse plus approfondie. Les pipelines vous permettent d’agir sur les jeux de données modifiés dans Data Lake Storage à l’aide des déclencheurs d’événements de stockage et déclencheurs planifiés pour créer des solutions pour les cas d’utilisation en temps réel et en quasi temps réel. Cette intégration accélère la consommation en aval des jeux de données de modification.
Autres solutions
Cette solution utilise des déclencheurs Atlas pour encapsuler le code pour écouter les flux de modification Atlas et déclencher Azure Functions en réponse à l’événement de modification. Il est donc beaucoup plus facile d’implémenter que le solution alternative précédemment fournie. Pour cette solution, vous devez écrire du code pour écouter les flux de changement dans une application Web Azure App Service.
Une autre alternative consiste à utiliser le connecteur Spark MongoDB pour lire les données de flux MongoDB et les écrire dans des tables Delta. Le code est exécuté en continu dans un notebook Spark qui fait partie d’un pipeline dans Azure Synapse Analytics. Pour plus d’informations sur l’implémentation de cette solution, consultez Synchroniser de Atlas à Azure Synapse Analytics en utilisant la diffusion Spark.
Toutefois, l’utilisation de déclencheurs Atlas avec Azure Functions fournit une solution entièrement serverless. Étant donné qu’elle n’est pas serverless, la solution offre une scalabilité robuste et une optimisation des coûts. La tarification est basée sur un modèle de coût de paiement à l’utilisation. Vous pouvez économiser davantage d’argent à l’aide de la fonction Atlas pour combiner quelques événements de modification avant d’appeler le point de terminaison Azure Functions. Cette stratégie peut être utile dans les scénarios de trafic lourd.
En outre, Microsoft Fabric unifie votre patrimoine de données et facilite l’exécution de l’analytique et de l’IA sur les données, afin d’obtenir rapidement des insights. L’ingénierie des données Azure Synapse Analytics, la science des données, l’entreposage de données et l’analytique en temps réel dans Fabric peuvent désormais mieux utiliser les données MongoDB envoyées à OneLake. Vous pouvez utiliser les connecteurs Dataflow Gen2 et de pipeline de données pour Atlas pour charger des données Atlas directement dans OneLake. Ce mécanisme sans code offre un moyen puissant d’ingérer des données d’Atlas vers OneLake.
Dans Fabric, vous pouvez référencer directement les données qui sont poussées vers Data Lake Storage en utilisant les raccourcis OneLake, sans aucune opération d'extraction, de transformation et de chargement (ETL).
Vous pouvez envoyer (push) les données à Power BI pour créer des rapports et des visualisations pour la création de rapports BI.
Détails du scénario
MongoDB Atlas, la couche de données opérationnelles de beaucoup d’application d'entreprise, stocke les données provenant d’applications internes, de services orientés client et d’API tierces à partir de plusieurs canaux. Vous pouvez utiliser les pipelines de données dans Azure Synapse Analytics pour combiner ces données avec des données relationnelles d’autres applications traditionnelles et avec des données non structurées à partir de sources telles que des journaux, les stockages d’objet ainsi que les clickstreams.
Les entreprises utilisent des fonctionnalités MongoDB telles que des Agrégations, nœuds analytiques, Atlas Search, Vector Search, Atlas Data Lake, Atlas SQL Interface, Data Federation et Graphiques pour activer l’intelligence pilotée par l’application. Toutefois, les données transactionnelles dans MongoDB sont extraites, transformées et chargées dans des pools SQL dédiés Azure Synapse Analytics ou des pools Spark pour les pools BATCH, IA / Machine Learning et l’analytique et l’intelligence de l’entrepôt de données.
Il existe deux scénarios pour le déplacement des données entre Atlas et Azure Synapse Analytics : l’intégration par lots et la synchronisation en temps réel.
Intégration des lots
Vous pouvez utiliser l’intégration par lots et micro-lots pour déplacer des données d’Atlas vers Data Lake Storage dans Azure Synapse Analytics. Vous pouvez extraire l’intégralité des données historiques à la fois ou extraire des données incrémentielles en fonction des critères de filtre.
Des instances locales MongoDB et MongoDB Atlas peuvent être intégrés en tant que ressource source ou puits dans Azure Synapse Analytics. Pour plus d’informations sur les connecteurs, consultez Copier des données depuis ou vers MongoDB ou Copier des données depuis ou vers MongoDB Atlas.
Le connecteur source offre un moyen pratique d’exécuter Azure Synapse Analytics sur des données opérationnelles stockées localement dans MongoDB ou Atlas. Vous pouvez extraire des données d’Atlas à l’aide du connecteur source et charger les données dans Data Lake Storage dans Parquet, Avro, JSON et formats de texte ou en tant que stockage d’objets blob CSV. Ces fichiers peuvent ensuite être transformés ou joints à d'autres fichiers provenant d'autres sources de données dans des scénarios multi-bases de données, multi-clouds ou hybrides. Ce cas d’usage est courant dans les scénarios d’entrepôt de données d’entreprise (EDW) et d’analytique à grande échelle. Vous pouvez également utiliser le connecteur récepteur pour stocker les résultats de l’analytique dans Atlas. Pour plus d’informations sur l’intégration par lots, consultez Analyser les données opérationnelles sur MongoDB Atlas à l’aide d’Azure Synapse Analytics.
Synchronisation en temps réel
L’architecture décrite dans cet article peut vous aider à implémenter la synchronisation en temps réel pour maintenir votre stockage Azure Synapse Analytics actuel avec les données opérationnelles de MongoDB.
Cette solution se compose de deux fonctions principales :
- Capture des modifications dans Atlas
- Déclencher la fonction Azure pour propager les modifications apportées à Azure Synapse Analytics
Capturer les modifications dans Atlas
Vous pouvez capturer les modifications à l’aide d’un déclencheur Atlas, que vous pouvez configurer dans l’interface utilisateur Ajouter un déclencheur ou à l’aide de l’API d’administration Atlas App Services. Les déclencheurs écoutent les modifications de base de données provoquées par les événements de base de données tels que les insertions, les mises à jour et les suppressions. Les déclencheurs Atlas déclenchent également une fonction Atlas lorsqu’un événement de modification est détecté. Vous pouvez utiliser l’interface utilisateur Ajouter un déclencheur pour ajouter la fonction. Vous pouvez également créer une fonction Atlas et l’associer en tant que point de terminaison d’appel de déclencheur à l’aide de l’API d’administration Atlas.
La capture d’écran suivante montre le formulaire que vous pouvez utiliser pour créer et modifier un déclencheur Atlas. Dans la section Détails de la source du déclencheur , vous spécifiez la collection que le déclencheur surveille les événements de modification et les événements de base de données qu’il surveille (insertion, mise à jour, suppression et/ou remplacement).
Le déclencheur peut appeler une fonction Atlas en réponse à l’événement pour lequel il est activé. La capture d’écran suivante montre le code JavaScript simple, ajouté en tant que fonction Atlas, à appeler en réponse au déclencheur de base de données. La fonction Atlas appelle une fonction Azure, en lui transmettant les métadonnées de l’événement de modification avec le document qui a été inséré, mis à jour, supprimé ou remplacé, en fonction de ce que le déclencheur est activé.
Code de fonction Atlas
Le code de la fonction Atlas déclenche la fonction Azure associée au point de terminaison de fonction Azure en transmettant l’intégralité des changeEvent
dans le corps de la requête à la fonction Azure.
Vous devez remplacer l’espace réservé <Azure function URL endpoint>
par le point de terminaison d’URL de fonction Azure réel.
exports = function(changeEvent) {
// Invoke Azure function that inserts the change stream into Data Lake Storage.
console.log(typeof fullDocument);
const response = context.http.post({
url: "<Azure function URL endpoint>",
body: changeEvent,
encodeBodyAsJSON: true
});
return response;
};
Déclencher la fonction Azure pour propager les modifications apportées à Azure Synapse Analytics
La fonction Atlas est codée pour appeler une fonction Azure qui écrit le document de modification dans Data Lake Storage dans Azure Synapse Analytics. La fonction Azure utilise la bibliothèque cliente Azure Data Lake Storage pour Python SDK pour créer une instance de la classe DataLakeServiceClient
qui représente votre compte de stockage.
La fonction Azure utilise une clé de stockage pour l’authentification. Vous pouvez également utiliser des implémentations OAuth d’ID Microsoft Entra. Les storage_account_key
et autres attributs liés à Dake Lake Storage sont récupérés à partir des variables d’environnement du système d’exploitation configurées. Une fois le corps de la requête décodé, le fullDocument
(le document inséré ou mis à jour) est analysé à partir du corps de la requête, puis écrit dans Data Lake Storage par les fonctions clientes Data Lake append_data
et flush_data
.
Pour une opération de suppression, fullDocumentBeforeChange
est utilisé au lieu de fullDocument
.
fullDocument
n’a aucune valeur dans une opération de suppression. Par conséquent, le code extrait le document qui a été supprimé, qui est capturé dans fullDocumentBeforeChange
. Notez que fullDocumentBeforeChange
est renseigné uniquement lorsque le paramètre Pré-image du document est défini sur activé, comme illustré dans la capture d’écran précédente.
import json
import logging
import os
import azure.functions as func
from azure.storage.filedatalake import DataLakeServiceClient
def main(req: func.HttpRequest) -> func.HttpResponse:
logging.info('Python HTTP trigger function processed a new request.')
logging.info(req)
storage_account_name = os.environ["storage_account_name"]
storage_account_key = os.environ["storage_account_key"]
storage_container = os.environ["storage_container"]
storage_directory = os.environ["storage_directory"]
storage_file_name = os.environ["storage_file_name"]
service_client = DataLakeServiceClient(account_url="{}://{}.dfs.core.windows.net".format(
"https", storage_account_name), credential=storage_account_key)
json_data = req.get_body()
logging.info(json_data)
object_id = "test"
try:
json_string = json_data.decode("utf-8")
json_object = json.loads(json_string)
if json_object["operationType"] == "delete":
object_id = json_object["fullDocumentBeforeChange"]["_id"]["$oid"]
data = {"operationType": json_object["operationType"], "data":json_object["fullDocumentBeforeChange"]}
else:
object_id = json_object["fullDocument"]["_id"]["$oid"]
data = {"operationType": json_object["operationType"], "data":json_object["fullDocument"]}
logging.info(object_id)
encoded_data = json.dumps(data)
except Exception as e:
logging.info("Exception occurred : "+ str(e))
file_system_client = service_client.get_file_system_client(file_system=storage_container)
directory_client = file_system_client.get_directory_client(storage_directory)
file_client = directory_client.create_file(storage_file_name + "-" + str(object_id) + ".txt")
file_client.append_data(data=encoded_data, offset=0, length=len(encoded_data))
file_client.flush_data(len(encoded_data))
return func.HttpResponse(f"This HTTP triggered function executed successfully.")
Jusqu’à présent, vous avez vu comment le déclencheur Atlas capture toutes les modifications qui se produisent et les transmet à une fonction Azure via une fonction Atlas, et que la fonction Azure écrit le document de modification en tant que nouveau fichier dans Data Lake Storage dans l’espace de travail Azure Synapse Analytics.
Une fois le fichier ajouté à Data Lake Storage, vous pouvez configurer un déclencheur d’événement de stockage pour déclencher un pipeline qui peut ensuite écrire le document de modification dans un pool SQL dédié ou dans une table de pool Spark. Le pipeline peut utiliser l’activité de copie et transformer les données à l’aide d’un flux de données. Sinon, si votre cible finale est un pool SQL dédié, vous pouvez modifier la fonction Azure pour écrire directement dans le pool SQL dédié dans Azure Synapse Analytics. Pour un pool SQL, obtenez la chaîne de connexion odbc pour la connexion du pool SQL. Consultez Utiliser Python pour interroger une base de données pour obtenir un exemple de code Python que vous pouvez utiliser pour interroger la table du pool SQL à l’aide de la chaîne de connexion. Vous pouvez modifier ce code pour utiliser une requête Insert pour écrire dans un pool SQL dédié. Il existe des paramètres de configuration et des rôles qui doivent être attribués pour permettre à la fonction d’écrire dans un pool SQL dédié. Les informations relatives à ces paramètres et rôles sont en dehors de l’étendue de cet article.
Si vous souhaitez une solution quasi en temps réel et que vous n’avez pas besoin des données à synchroniser en temps réel, l’utilisation des exécutions de pipeline planifiées peut être une bonne option. Vous pouvez configurer des déclencheurs planifiés pour déclencher un pipeline avec l’activité de copie ou un flux de données, à une fréquence proche en temps réel que votre entreprise peut se permettre, pour utiliser le connecteur MongoDB pour extraire les données de MongoDB qui ont été insérées, mises à jour ou supprimées entre la dernière exécution planifiée et l’exécution actuelle. Le pipeline utilise le connecteur MongoDB comme connecteur source pour extraire les données delta à partir de MongoDB Atlas et les envoyer (push) vers les pools SQL dédiés Data Lake Storage ou Azure Synapse Analytics, à l’aide de ces connexions en tant que connexions récepteurs. Cette solution utilise un mécanisme d’extraction (par opposition à la solution principale décrite dans cet article, qui est un mécanisme push) de MongoDB Atlas, car les modifications se produisent dans la collection Atlas MongoDB à laquelle le déclencheur Atlas écoute.
Cas d’usage potentiels
MongoDB et azure Synapse Analytics EDW et les services analytiques peuvent servir de nombreux cas d’usage :
Retail
- Création d’une intelligence dans le regroupement de produits et la promotion des produits
- Implémentation du client 360 et de l’hyper-personnalisation
- Prédiction de l’épuisement des stocks et optimisation des commandes de chaîne d’approvisionnement
- Implémentation d’un prix de remise dynamique et d’une recherche intelligente dans le commerce électronique
Banque et finances
- Personnalisation des services financiers clients
- Détection et blockage de transactions frauduleuses
Télécommunications
- Optimisation des réseaux de nouvelle génération
- Optimisation de la valeur des réseaux de périphérie
Automobile
- Optimisation du paramétrage des véhicules connectés
- Détection d’anomalies dans la communication IoT dans les véhicules connectés
Industrie
- Fourniture d’une maintenance prédictive pour les machines
- Optimisation de la gestion du stockage et de l’inventaire
À propos de l’installation
Ces considérations implémentent les piliers d’Azure Well-Architected Framework, un ensemble de principes directeurs que vous pouvez utiliser pour améliorer la qualité d’une charge de travail. Pour plus d'informations, consultez Microsoft Azure Well-Architected Framework.
Sécurité
La sécurité fournit des garanties contre les attaques délibérées, et contre l’utilisation abusive de vos données et systèmes importants. Pour plus d’informations, consultez Vue d’ensemble du pilier Sécurité.
Azure Functions est un service managé serverless, de sorte que les ressources d’application et les composants de plateforme sont protégés par une sécurité renforcée. Toutefois, nous vous recommandons d’utiliser le protocole HTTPS et les dernières versions TLS. Il est également recommandé de valider l’entrée pour vous assurer qu’il s’agit d’un document de modification MongoDB. Consultez Sécurisation des Fonctions Azure pour connaître les considérations de sécurité relatives à Azure Functions.
MongoDB Atlas est une base de données managée en tant que service. MongoDB offre donc une sécurité de plateforme améliorée. MongoDB fournit plusieurs mécanismes pour garantir une sécurité de 360 degrés pour les données stockées, notamment l’accès à la base de données, la sécurité réseau, le chiffrement au repos et en transit et la souveraineté des données. Consultez Sécurité MongoDB Atlas pour le livre blanc de sécurité MongoDB Atlas et d’autres articles qui peuvent vous aider à vous assurer que les données dans MongoDB sont sécurisées tout au long du cycle de vie des données.
Optimisation des coûts
L’optimisation des coûts consiste à réduire les dépenses inutiles et à améliorer l’efficacité opérationnelle. Pour plus d’informations, consultez Vue d’ensemble du pilier d’optimisation des coûts.
Pour estimer le coût des produits et configurations Azure, consultez la calculatrice de prix Azure. Azure vous permet d’éviter les frais inutiles en déterminant le nombre correct de ressources à utiliser, en analysant les dépenses au fil du temps et en effectuant une mise à l’échelle pour répondre aux besoins de l’entreprise sans dépasser le budget alloué. Les fonctions Azure Functions entraînent des coûts uniquement lorsqu’elles sont appelées. Toutefois, selon le volume des modifications dans MongoDB Atlas, vous pouvez évaluer à l’aide d’un mécanisme de traitement par lots dans la fonction Atlas pour stocker les modifications dans une autre collection temporaire et déclencher la fonction Azure uniquement si le lot dépasse une certaine limite.
Pour plus d’informations sur les clusters Atlas, consultez 5 façons de réduire les coûts avec MongoDB Atlas et coûts de configuration de cluster. La page de tarification MongoDB peut vous aider à comprendre les options de tarification des clusters MongoDB Atlas et d’autres offres de la plateforme de données des développeurs MongoDB Atlas. Atlas Data Federation peut être déployé dans Azure et prend en charge Stockage Blob Azure (en préversion). Si vous envisagez d’utiliser le traitement par lots pour optimiser les coûts, envisagez d’écrire dans le stockage Blob au lieu d’une collection temporaire MongoDB.
Efficacité des performances
L’efficacité des performances est la capacité de votre charge de travail à s’adapter à la demande des utilisateurs de façon efficace. Pour plus d’informations, consultez Vue d’ensemble du pilier d’efficacité des performances.
Les déclencheurs Atlas et les fonctions Azure Functions sont testés dans le temps pour les performances et la scalabilité. Consultez Performances et mise à l’échelle dans Durable Functions (Azure Functions) pour comprendre les considérations relatives aux performances et à l’évolutivité pour Azure Functions. Consultez Mise à l’échelle à la demande pour en savoir plus sur l’amélioration des performances de vos instances MongoDB Atlas. Consultez Guide des meilleures pratiques pour les performances MongoDB pour connaître les meilleures pratiques pour la configuration d’Atlas MongoDB.
Conclusion
MongoDB Atlas s’intègre parfaitement à Azure Synapse Analytics, ce qui permet aux clients Atlas d’utiliser facilement Atlas comme source ou récepteur pour Azure Synapse Analytics. Cette solution vous permet d’utiliser des données opérationnelles MongoDB en temps réel à partir d’Azure Synapse Analytics pour l’analytique complexe et l’inférence IA.
Déployer ce scénario
Synchronisation en temps réel de MongoDB Atlas à Azure Synapse Analytics
Contributeurs
Cet article est géré par Microsoft. Il a été écrit à l’origine par les contributeurs suivants.
Principaux auteurs :
- Diana Annie Jenosh | Architecte de solutions senior - Équipe partenaires MongoDB
- Venkatesh Shanbag | Architecte de solutions senior - Équipe partenaires MongoDB
Autres contributeurs :
- Sunil Sabat | Responsable principal du programme - Équipe ADF
- Wee Hyong Tok | Directeur principal PM - Équipe ADF
Pour afficher les profils LinkedIn non publics, connectez-vous à LinkedIn.