Inserire dati da Salesforce
Importante
LakeFlow Connect è in anteprima pubblica controllata. Per partecipare all’anteprima, contattare il team dell’account Databricks.
Questo articolo descrive come inserire dati da Salesforce e caricarli in Azure Databricks usando LakeFlow Connect. La pipeline di ingestione risultante è governata da Unity Catalog ed è basata su elaborazione serverless e Delta Live Tables.
Il connettore di inserimento Salesforce supporta l'origine seguente:
- Salesforce Sales Cloud
Operazioni preliminari
Per creare una pipeline di inserimento, è necessario soddisfare i requisiti seguenti:
La tua area di lavoro è stata abilitata per Unity Catalog.
Il calcolo senza server è abilitato per i notebook, i flussi di lavoro e Delta Live Tables. Consultare la sezione Abilitare l’elaborazione serverless.
Per creare una connessione: è presente
CREATE CONNECTION
nel metastore.Per usare una connessione esistente: si dispone
USE CONNECTION
oALL PRIVILEGES
sull'oggetto connessione.USE CATALOG
nel catalogdi destinazione.USE SCHEMA
eCREATE TABLE
in un schema oCREATE SCHEMA
esistente nel catalogdi destinazione .(Scelta consigliata) Creare un utente salesforce che Databricks può usare per recuperare i dati. Assicurarsi che l'utente abbia accesso all'API e accesso a tutti gli oggetti che si prevede di inserire.
Creare una connessione Salesforce
Autorizzazioni necessarie:CREATE CONNECTION
nel metastore. Contattare un amministratore del metastore per grant.
Se si vuole creare una pipeline di inserimento usando una connessione esistente, passare alla sezione seguente. È necessario USE CONNECTION
o ALL PRIVILEGES
sulla connessione.
Per creare una connessione Salesforce, eseguire le operazioni seguenti:
Nell'area di lavoro di Azure Databricks, clicca su Catalog> Percorsi esterni >Connections> Crea connessione.
Per Nome connessione specificare un nome univoco per la connessione Salesforce.
In Tipo di connessione fare clic su Salesforce.
Se stai eseguendo l'inserimento da un account sandbox di Salesforce, setè nel sandbox al
true
.Fare clic su Accedi con Salesforce.
Se si sta eseguendo l'inserimento da una sandbox salesforce, fare clic su Usa dominio personalizzato. Specificare l'URL della sandbox e quindi procedere con l'accesso. Databricks consiglia di accedere come utente salesforce dedicato all'inserimento di Databricks.
Dopo aver restituito la pagina Crea connessione , fare clic su Crea.
Creare una pipeline di inserimento
Autorizzazioni necessarie:USE CONNECTION
o ALL PRIVILEGES
per una connessione.
Questo passaggio descrive come creare la pipeline di inserimento. Ogni table ingestata corrisponde di default a un table di streaming con lo stesso nome (ma tutto minuscolo) nella destinazione, a meno che non venga rinominato esplicitamente.
Interfaccia utente Databricks
Nella barra laterale dell'area di lavoro di Azure Databricks fare clic su Inserimento dati.
Nella pagina Aggiungi dati in Connettori Databricks fare clic su Salesforce.
Verrà visualizzata la procedura guidata di inserimento di Salesforce.
Nella pagina Pipeline della procedura guidata immettere un nome univoco per la pipeline di inserimento.
Nell'elenco a discesa
destinazione un . I dati inseriti e i log degli eventi saranno scritti in questo catalog. Select la connessione Catalog Unity in cui sono archiviati i credentials necessari per accedere ai dati di Salesforce.
Se non sono presenti connectionsSalesforce, fare clic su Crea connessione. È necessario avere il
CREATE CONNECTION
privilegio per il metastore.Fare clic su Crea pipeline e continuare.
Nella pagina origine
l' Salesforce da inserire in Databricks e quindi fare clic su Avanti .Se si select un schema, il connettore di inserimento Salesforce scrive tutte le tables esistenti e future nella schema di origine gestita da Unity Catalogtables.
Nella pagina destinazione
Unity e in cui scrivere. Se non si vuole usare un schemaesistente, fare clic su Crea schema. È necessario disporre dei privilegi di
USE CATALOG
eCREATE SCHEMA
nel catalogpadre.Fare clic su Salva pipeline e continuare.
Nella pagina Impostazioni fare clic su Crea pianificazione. Set la frequenza per refresh la destinazione tables.
Facoltativamente, set inviare notifiche di posta elettronica per l'esito positivo o negativo dell'operazione della pipeline.
Fare clic su Salva ed esegui pipeline.
Bundle di asset di Databricks
Questa scheda descrive come distribuire una pipeline di inserimento usando i bundle di asset di Databricks .This tab describes how to deploy an ingestion pipeline using Databricks Asset Bundles (DAB). I bundle possono contenere definizioni YAML di processi e attività, vengono gestiti tramite l'interfaccia della riga di comando di Databricks e possono essere condivisi ed eseguiti in aree di lavoro di destinazione diverse, ad esempio sviluppo, gestione temporanea e produzione. Per altre informazioni, vedere Aggregazioni di asset di Databricks.
Creare un nuovo bundle usando l'interfaccia della riga di comando di Databricks:
databricks bundle init
Aggiungere due nuovi file di risorse al bundle:
- Un file di definizione della pipeline (
resources/sfdc_pipeline.yml
). - File del flusso di lavoro che controlla la frequenza di inserimento dati (
resources/sfdc_job.yml
).
Di seguito è riportato un file
resources/sfdc_pipeline.yml
di esempio:variables: dest_catalog: default: main dest_schema: default: ingest_destination_schema # The main pipeline for sfdc_dab resources: pipelines: pipeline_sfdc: name: salesforce_pipeline ingestion_definition: connection_name: <salesforce-connection> objects: # An array of objects to ingest from Salesforce. This example # ingests the AccountShare, AccountPartner, and ApexPage objects. - table: source_schema: objects source_table: AccountShare destination_catalog: ${var.dest_catalog} destination_schema: ${var.dest_schema} - table: source_schema: objects source_table: AccountPartner destination_catalog: ${var.dest_catalog} destination_schema: ${var.dest_schema} - table: source_schema: objects source_table: ApexPage destination_catalog: ${var.dest_catalog} destination_schema: ${var.dest_schema} channel: "preview"
Di seguito è riportato un file
resources/sfdc_job.yml
di esempio:resources: jobs: sfdc_dab_job: name: sfdc_dab_job trigger: # Run this job every day, exactly one day from the last run # See https://docs.databricks.com/api/workspace/jobs/create#trigger periodic: interval: 1 unit: DAYS email_notifications: on_failure: - <email-address> tasks: - task_key: refresh_pipeline pipeline_task: pipeline_id: ${resources.pipelines.pipeline_sfdc.id}
- Un file di definizione della pipeline (
Distribuire la pipeline usando l'interfaccia della riga di comando di Databricks:
databricks bundle deploy
Databricks CLI
Per creare la pipeline
databricks pipelines create --json "<pipeline-definition | json-file-path>"
Per update della pipeline:
databricks pipelines update --json "<<pipeline-definition | json-file-path>"
Per get la definizione della pipeline:
databricks pipelines get "<pipeline-id>"
Per eliminare la pipeline:
databricks pipelines delete "<pipeline-id>"
Per altre informazioni, è possibile eseguire:
databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help
Avviare, pianificare e set avvisi nella pipeline
Dopo che è stata creata la pipeline, torna all'area di lavoro di Databricks e quindi fai clic su Delta Live Tables.
La nuova pipeline viene visualizzata nella pipeline list.
Per visualizzare i dettagli della pipeline, fare clic sul nome della pipeline.
Nella pagina dei dettagli della pipeline eseguire la pipeline facendo clic su Avvia. È possibile pianificare la pipeline facendo clic su Pianifica.
Per set avvisi nella pipeline, fare clic su Pianificazione, quindi fare clic su Altre opzionie dopodiché aggiungere una notifica.
Al termine dell'inserimento, è possibile eseguire una query sul tables.
Nota
Quando viene eseguita la pipeline, è possibile che vengano visualizzati due views di origine per un determinato table. Una visualizzazione contiene gli snapshot per i campi della formula. L'altra vista contiene i pull incrementali dei dati per i campi non formula. Questi views vengono uniti nel tabledi destinazione.