Copiare dati da o in MongoDB usando Azure Data Factory o Synapse Analytics
SI APPLICA A: Azure Data Factory Azure Synapse Analytics
Suggerimento
Provare Data Factory in Microsoft Fabric, una soluzione di analisi all-in-one per le aziende. Microsoft Fabric copre tutto, dallo spostamento dati al data science, all'analisi in tempo reale, alla business intelligence e alla creazione di report. Vedere le informazioni su come iniziare una nuova prova gratuita!
Questo articolo illustra come usare l'attività Copy nelle pipeline di Synapse Analytics di Azure Data Factory per copiare dati da e in un database MongoDB. Si basa sull'articolo di panoramica dell'attività di copia che presenta una panoramica generale sull'attività di copia.
Importante
Il nuovo connettore MongoDB offre un supporto mongoDB nativo migliorato. Se si usa il connettore MongoDB legacy nella soluzione, supportato così come è solo per la compatibilità con le versioni precedenti, vedere l’articolo Connettore MongoDB (legacy).
Funzionalità supportate
Questo connettore MongoDB è supportato per le funzionalità seguenti:
Funzionalità supportate | IR |
---|---|
Attività di copia (origine/sink) | 7.3 |
① Azure Integration Runtime ② Runtime di integrazione self-hosted
Per un elenco degli archivi dati supportati come origini o sink, vedere la tabella Archivi dati supportati.
In particolare, questo connettore MongoDB supporta le versioni fino alla 4.2. Se il lavoro richiede versioni più recenti della 4.2, considerare l'uso di MongoDB Atlas con il connettore MongoDB Atlas, che offre funzionalità e supporto più completi.
Prerequisiti
Se l'archivio dati si trova all'interno di una rete locale, una rete virtuale di Azure o un cloud privato virtuale di Amazon, è necessario configurare un runtime di integrazione self-hosted per connettersi.
Se l'archivio dati è un servizio dati del cloud gestito, è possibile usare Azure Integration Runtime. Se l'accesso è limitato solo agli indirizzi IP approvati nelle regole del firewall, è possibile aggiungere IP di Azure Integration Runtime nell'elenco Consentiti.
È anche possibile usare la funzionalitàruntime di integrazione della rete virtuale gestita in Azure Data Factory per accedere alla rete locale senza installare e configurare un runtime di integrazione self-hosted.
Per altre informazioni sui meccanismi di sicurezza di rete e sulle opzioni supportate da Data Factory, vedere strategie di accesso ai dati.
Introduzione
Per eseguire l'attività di copia con una pipeline, è possibile usare uno degli strumenti o SDK seguenti:
- Strumento Copia dati
- Il portale di Azure
- .NET SDK
- SDK di Python
- Azure PowerShell
- API REST
- Modello di Azure Resource Manager
Creare un servizio collegato a MongoDB usando l'interfaccia utente
Usare la procedura seguente per creare un servizio collegato a MongoDB nell'interfaccia utente del portale di Azure.
Passare alla scheda Gestisci nell'area di lavoro di Azure Data Factory o Synapse e selezionare Servizi collegati, quindi fare clic su Nuovo:
Cercare MongoDB e selezionare il connettore MongoDB.
Configurare i dettagli del servizio, testare la connessione e creare il nuovo servizio collegato.
Dettagli di configurazione del connettore
Le sezioni seguenti riportano informazioni dettagliate sulle proprietà che vengono usate per definire entità di data factory specifiche per il connettore MongoDB.
Proprietà del servizio collegato
Per il servizio collegato di MongoDB sono supportate le proprietà seguenti:
Proprietà | Descrizione | Richiesto |
---|---|---|
type | La proprietà tipo deve essere impostata su: MongoDbV2 | Sì |
connectionString | Specificare la stringa di connessione di MongoDB, ad esempio mongodb://[username:password@]host[:port][/[database][?options]] . Per altri dettagli, vedere la sezione sulla stringa di connessione nel manuale di MongoDB. È anche possibile inserire una stringa di connessione in Azure Key Vault. Per altri dettagli, vedere Archiviare le credenziali in Azure Key Vault. |
Sì |
database | Nome del database a cui si vuole accedere. | Sì |
connectVia | Il runtime di integrazione da usare per la connessione all'archivio dati. Per altre informazioni, vedere la sezione Prerequisiti. Se non specificato, viene usato il runtime di integrazione di Azure predefinito. | No |
Esempio:
{
"name": "MongoDBLinkedService",
"properties": {
"type": "MongoDbV2",
"typeProperties": {
"connectionString": "mongodb://[username:password@]host[:port][/[database][?options]]",
"database": "myDatabase"
},
"connectVia": {
"referenceName": "<name of Integration Runtime>",
"type": "IntegrationRuntimeReference"
}
}
}
Proprietà del set di dati
Per un elenco completo delle sezioni e delle proprietà disponibili per la definizione dei set di dati, vedere Set di dati e servizi collegati. Per il set di dati MongoDB sono supportate le proprietà seguenti:
Proprietà | Descrizione | Richiesto |
---|---|---|
type | La proprietà tipo del set di dati deve essere impostata su: MongoDbV2Collection | Sì |
collectionName | Nome della raccolta nel database MongoDB. | Sì |
Esempio:
{
"name": "MongoDbDataset",
"properties": {
"type": "MongoDbV2Collection",
"typeProperties": {
"collectionName": "<Collection name>"
},
"schema": [],
"linkedServiceName": {
"referenceName": "<MongoDB linked service name>",
"type": "LinkedServiceReference"
}
}
}
Proprietà dell'attività di copia
Per un elenco completo delle sezioni e delle proprietà disponibili per la definizione delle attività, vedere l'articolo sulle pipeline. Questa sezione fornisce un elenco delle proprietà supportate dall'origine e dal sink MongoDB.
MongoDB come origine
Nella sezione origine dell'attività di copia sono supportate le proprietà seguenti:
Proprietà | Descrizione | Richiesto |
---|---|---|
type | La proprietà tipo dell'origine dell'attività Copy deve essere impostata su: MongoDbV2Source | Sì |
filter | Specifica il filtro di selezione usando gli operatori di query. Per restituire tutti i documenti in una raccolta, omettere questo parametro o passare un documento vuoto ({}). | No |
cursorMethods.project | Specifica i campi da restituire nei documenti per la proiezione. Per restituire tutti i campi nei documenti corrispondenti, omettere questo parametro. | No |
cursorMethods.sort | Specifica l'ordine in cui la query restituisce i documenti corrispondenti. Fare riferimento a cursor.sort(). | No |
cursorMethods.limit | Specifica il numero massimo di documenti restituiti dal server. Fare riferimento a cursor.limit(). | No |
cursorMethods.skip | Specifica il numero di documenti da ignorare e la posizione da cui MongoDB inizia a restituire i risultati. Fare riferimento a cursor.skip(). | No |
batchSize | Specifica il numero di documenti da restituire in ogni batch di risposta dall'istanza di MongoDB. Nella maggior parte dei casi, la modifica della dimensione del batch non influisce sull'utente o sull'applicazione. Azure Cosmos DB limita ogni batch non può superare le dimensioni di 40 MB, ovvero la somma del numero batchSize delle dimensioni dei documenti, quindi ridurre questo valore se le dimensioni del documento sono elevate. | No (il valore predefinito è 100) |
Suggerimento
Il servizio supporta l'utilizzo del documento BSON in modalità strict. Assicurarsi che la query di filtro sia in modalità strict anziché in modalità shell. Per altre informazioni consultare il manuale di MongoDB.
Esempio:
"activities":[
{
"name": "CopyFromMongoDB",
"type": "Copy",
"inputs": [
{
"referenceName": "<MongoDB input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "MongoDbV2Source",
"filter": "{datetimeData: {$gte: ISODate(\"2018-12-11T00:00:00.000Z\"),$lt: ISODate(\"2018-12-12T00:00:00.000Z\")}, _id: ObjectId(\"5acd7c3d0000000000000000\") }",
"cursorMethods": {
"project": "{ _id : 1, name : 1, age: 1, datetimeData: 1 }",
"sort": "{ age : 1 }",
"skip": 3,
"limit": 3
}
},
"sink": {
"type": "<sink type>"
}
}
}
]
MongoDB come sink
Nella sezione sink dell'attività di copia sono supportate le proprietà seguenti:
Proprietà | Descrizione | Richiesto |
---|---|---|
type | La proprietà tipo del sink dell'attività Copy deve essere impostata su MongoDbV2Sink. | Sì |
writeBehavior | Descrive come scrivere dati in MongoDB. Valori consentiti: insert e upsert. Il comportamento di upsert consiste nella sostituzione del documento se esiste già un documento con lo stesso _id . In caso contrario, il documento viene inserito.Nota: il servizio genera automaticamente un _id per un documento se non è specificato un _id nel documento originale o tramite il mapping di colonna. È quindi necessario assicurarsi che il documento contenga un ID in modo che upsert funzioni come previsto. |
No (il valore predefinito è insert) |
writeBatchSize | La proprietà writeBatchSize controlla le dimensioni dei documenti da scrivere in ogni batch. È possibile provare ad aumentare il valore di writeBatchSize per migliorare le prestazioni e a ridurre il valore se le dimensioni dei documenti sono troppo grandi. | No (il valore predefinito è 10.000) |
writeBatchTimeout | Tempo di attesa per il completamento dell'operazione di inserimento batch prima del timeout. Il valore consentito è timespan. | No (il valore predefinito è 00:30:00 - 30 minuti) |
Suggerimento
Per importare documenti JSON come sono, fare riferimento alla sezione Importare o esportare documenti JSON. Per copiare da dati in forma tabulare, fare riferimento a Mapping dello schema.
Esempio
"activities":[
{
"name": "CopyToMongoDB",
"type": "Copy",
"inputs": [
{
"referenceName": "<input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<Document DB output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "<source type>"
},
"sink": {
"type": "MongoDbV2Sink",
"writeBehavior": "upsert"
}
}
}
]
Importare ed esportare documenti JSON
È possibile usare questo connettore MongoDB per:
- Copiare i documenti tra due raccolte MongoDB così come sono.
- Importare documenti JSON da varie origini a MongoDB, tra cui da Azure Cosmos DB, Archiviazione BLOB di Azure, Azure Data Lake Store e altri archivi basati su file supportati.
- Esportare documenti JSON da una raccolta MongoDB in diversi archivi basati su file.
Per ottenere una copia priva di schema, ignorare la sezione "structure" (chiamata anche schema) nel set di dati e il mapping dello schema nell'attività di copia.
Mapping dello schema
Per copiare dati da MongoDB a sink tabulare o viceversa, vedere mapping dello schema.
Aggiornare il servizio collegato MongoDB
Ecco i passaggi che consentono di aggiornare il servizio collegato e le query correlate:
Creare un nuovo servizio collegato MongoDB e configurarlo facendo riferimento alle Proprietà del servizio collegato.
Se si usano query SQL nelle pipeline che fanno riferimento al servizio collegato MongoDB precedente, sostituirle con le query MongoDB equivalenti. Vedere la tabella seguente per gli esempi di sostituzione:
SQL query Query MongoDB equivalente SELECT * FROM users
db.users.find({})
SELECT username, age FROM users
db.users.find({}, {username: 1, age: 1})
SELECT username AS User, age AS Age, statusNumber AS Status, CASE WHEN Status = 0 THEN "Pending" CASE WHEN Status = 1 THEN "Finished" ELSE "Unknown" END AS statusEnum LastUpdatedTime + interval '2' hour AS NewLastUpdatedTime FROM users
db.users.aggregate([{ $project: { _id: 0, User: "$username", Age: "$age", Status: "$statusNumber", statusEnum: { $switch: { branches: [ { case: { $eq: ["$Status", 0] }, then: "Pending" }, { case: { $eq: ["$Status", 1] }, then: "Finished" } ], default: "Unknown" } }, NewLastUpdatedTime: { $add: ["$LastUpdatedTime", 2 * 60 * 60 * 1000] } } }])
SELECT employees.name, departments.name AS department_name FROM employees LEFT JOIN departments ON employees.department_id = departments.id;
db.employees.aggregate([ { $lookup: { from: "departments", localField: "department_id", foreignField: "_id", as: "department" } }, { $unwind: "$department" }, { $project: { _id: 0, name: 1, department_name: "$department.name" } } ])
Contenuto correlato
Per un elenco degli archivi dati supportati come origini e sink dall'attività Copy, vedere Archivi dati supportati.