Condividi tramite


Inserire dati da Salesforce

Importante

LakeFlow Connect è in anteprima pubblica controllata. Per partecipare all’anteprima, contattare il team dell’account Databricks.

Questo articolo descrive come inserire dati da Salesforce e caricarli in Azure Databricks usando LakeFlow Connect. La pipeline di ingestione risultante è governata da Unity Catalog ed è basata su elaborazione serverless e Delta Live Tables.

Il connettore di inserimento Salesforce supporta l'origine seguente:

  • Salesforce Sales Cloud

Operazioni preliminari

Per creare una pipeline di inserimento, è necessario soddisfare i requisiti seguenti:

  • La tua area di lavoro è stata abilitata per Unity Catalog.

  • Il calcolo senza server è abilitato per i notebook, i flussi di lavoro e Delta Live Tables. Consultare la sezione Abilitare l’elaborazione serverless.

  • Per creare una connessione: è presente CREATE CONNECTION nel metastore.

    Per usare una connessione esistente: si dispone USE CONNECTION o ALL PRIVILEGES sull'oggetto connessione.

  • USE CATALOG nel catalogdi destinazione.

  • USE SCHEMA e CREATE TABLE in un schema o CREATE SCHEMA esistente nel catalogdi destinazione .

  • (Scelta consigliata) Creare un utente salesforce che Databricks può usare per recuperare i dati. Assicurarsi che l'utente abbia accesso all'API e accesso a tutti gli oggetti che si prevede di inserire.

Creare una connessione Salesforce

Autorizzazioni necessarie:CREATE CONNECTION nel metastore. Contattare un amministratore del metastore per grant.

Se si vuole creare una pipeline di inserimento usando una connessione esistente, passare alla sezione seguente. È necessario USE CONNECTION o ALL PRIVILEGES sulla connessione.

Per creare una connessione Salesforce, eseguire le operazioni seguenti:

  1. Nell'area di lavoro di Azure Databricks, clicca su Catalog> Percorsi esterni >Connections> Crea connessione.

  2. Per Nome connessione specificare un nome univoco per la connessione Salesforce.

  3. In Tipo di connessione fare clic su Salesforce.

  4. Se stai eseguendo l'inserimento da un account sandbox di Salesforce, setè nel sandbox al true.

  5. Fare clic su Accedi con Salesforce.

    Account di accesso di Salesforce

  6. Se si sta eseguendo l'inserimento da una sandbox salesforce, fare clic su Usa dominio personalizzato. Specificare l'URL della sandbox e quindi procedere con l'accesso. Databricks consiglia di accedere come utente salesforce dedicato all'inserimento di Databricks.

    Usare il pulsante dominio personalizzato

    Immettere l'URL della sandbox

  7. Dopo aver restituito la pagina Crea connessione , fare clic su Crea.

Creare una pipeline di inserimento

Autorizzazioni necessarie:USE CONNECTION o ALL PRIVILEGES per una connessione.

Questo passaggio descrive come creare la pipeline di inserimento. Ogni table ingestata corrisponde di default a un table di streaming con lo stesso nome (ma tutto minuscolo) nella destinazione, a meno che non venga rinominato esplicitamente.

Interfaccia utente Databricks

  1. Nella barra laterale dell'area di lavoro di Azure Databricks fare clic su Inserimento dati.

  2. Nella pagina Aggiungi dati in Connettori Databricks fare clic su Salesforce.

    Verrà visualizzata la procedura guidata di inserimento di Salesforce.

  3. Nella pagina Pipeline della procedura guidata immettere un nome univoco per la pipeline di inserimento.

  4. Nell'elenco a discesa destinazione un . I dati inseriti e i log degli eventi saranno scritti in questo catalog.

  5. Select la connessione Catalog Unity in cui sono archiviati i credentials necessari per accedere ai dati di Salesforce.

    Se non sono presenti connectionsSalesforce, fare clic su Crea connessione. È necessario avere il CREATE CONNECTION privilegio per il metastore.

  6. Fare clic su Crea pipeline e continuare.

  7. Nella pagina origine l' Salesforce da inserire in Databricks e quindi fare clic su Avanti.

    Se si select un schema, il connettore di inserimento Salesforce scrive tutte le tables esistenti e future nella schema di origine gestita da Unity Catalogtables.

  8. Nella pagina destinazione Unity e in cui scrivere.

    Se non si vuole usare un schemaesistente, fare clic su Crea schema. È necessario disporre dei privilegi di USE CATALOG e CREATE SCHEMA nel catalogpadre.

  9. Fare clic su Salva pipeline e continuare.

  10. Nella pagina Impostazioni fare clic su Crea pianificazione. Set la frequenza per refresh la destinazione tables.

  11. Facoltativamente, set inviare notifiche di posta elettronica per l'esito positivo o negativo dell'operazione della pipeline.

  12. Fare clic su Salva ed esegui pipeline.

Bundle di asset di Databricks

Questa scheda descrive come distribuire una pipeline di inserimento usando i bundle di asset di Databricks .This tab describes how to deploy an ingestion pipeline using Databricks Asset Bundles (DAB). I bundle possono contenere definizioni YAML di processi e attività, vengono gestiti tramite l'interfaccia della riga di comando di Databricks e possono essere condivisi ed eseguiti in aree di lavoro di destinazione diverse, ad esempio sviluppo, gestione temporanea e produzione. Per altre informazioni, vedere Aggregazioni di asset di Databricks.

  1. Creare un nuovo bundle usando l'interfaccia della riga di comando di Databricks:

    databricks bundle init
    
  2. Aggiungere due nuovi file di risorse al bundle:

    • Un file di definizione della pipeline (resources/sfdc_pipeline.yml).
    • File del flusso di lavoro che controlla la frequenza di inserimento dati (resources/sfdc_job.yml).

    Di seguito è riportato un file resources/sfdc_pipeline.yml di esempio:

    variables:
      dest_catalog:
        default: main
      dest_schema:
        default: ingest_destination_schema
    
    # The main pipeline for sfdc_dab
    resources:
      pipelines:
        pipeline_sfdc:
          name: salesforce_pipeline
          ingestion_definition:
            connection_name: <salesforce-connection>
            objects:
              # An array of objects to ingest from Salesforce. This example
              # ingests the AccountShare, AccountPartner, and ApexPage objects.
              - table:
                  source_schema: objects
                  source_table: AccountShare
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
              - table:
                  source_schema: objects
                  source_table: AccountPartner
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
              - table:
                  source_schema: objects
                  source_table: ApexPage
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
          channel: "preview"
    

    Di seguito è riportato un file resources/sfdc_job.yml di esempio:

    resources:
      jobs:
        sfdc_dab_job:
          name: sfdc_dab_job
    
          trigger:
            # Run this job every day, exactly one day from the last run
            # See https://docs.databricks.com/api/workspace/jobs/create#trigger
            periodic:
              interval: 1
              unit: DAYS
    
          email_notifications:
            on_failure:
              - <email-address>
    
          tasks:
            - task_key: refresh_pipeline
              pipeline_task:
                pipeline_id: ${resources.pipelines.pipeline_sfdc.id}
    
  3. Distribuire la pipeline usando l'interfaccia della riga di comando di Databricks:

    databricks bundle deploy
    

Databricks CLI

Per creare la pipeline

databricks pipelines create --json "<pipeline-definition | json-file-path>"

Per update della pipeline:

databricks pipelines update --json "<<pipeline-definition | json-file-path>"

Per get la definizione della pipeline:

databricks pipelines get "<pipeline-id>"

Per eliminare la pipeline:

databricks pipelines delete "<pipeline-id>"

Per altre informazioni, è possibile eseguire:

databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help

Avviare, pianificare e set avvisi nella pipeline

  1. Dopo che è stata creata la pipeline, torna all'area di lavoro di Databricks e quindi fai clic su Delta Live Tables.

    La nuova pipeline viene visualizzata nella pipeline list.

  2. Per visualizzare i dettagli della pipeline, fare clic sul nome della pipeline.

  3. Nella pagina dei dettagli della pipeline eseguire la pipeline facendo clic su Avvia. È possibile pianificare la pipeline facendo clic su Pianifica.

  4. Per set avvisi nella pipeline, fare clic su Pianificazione, quindi fare clic su Altre opzionie dopodiché aggiungere una notifica.

  5. Al termine dell'inserimento, è possibile eseguire una query sul tables.

Nota

Quando viene eseguita la pipeline, è possibile che vengano visualizzati due views di origine per un determinato table. Una visualizzazione contiene gli snapshot per i campi della formula. L'altra vista contiene i pull incrementali dei dati per i campi non formula. Questi views vengono uniti nel tabledi destinazione.