Condividi tramite


Aggiungere attività ai processi nei bundle di asset di Databricks

Questo articolo fornisce esempi di vari tipi di attività che è possibile aggiungere ai processi di Azure Databricks in Bundle di asset di Databricks. Consultare Che cosa sono i Databricks Asset Bundle?.

La maggior parte dei tipi di attività di processo include parametri specifici dell'attività tra le impostazioni supportate, ma è anche possibile definire i parametri del processo che vengono passati alle attività. I riferimenti a valori dinamici sono supportati per i parametri del processo, che consentono di passare valori specifici per l'esecuzione del processo tra le attività. Vedere Che cos'è un valore dinamico di riferimento?.

Nota

È possibile eseguire l'override delle impostazioni delle attività del processo. Vedere Eseguire l'override delle impostazioni delle attività del processo in bundle di asset di Databricks.

Suggerimento

Per generare rapidamente la configurazione delle risorse per un processo esistente usando l'interfaccia della riga di comando di Databricks, è possibile usare il comando bundle generate job. Vedere i comandi del bundle.

Scheda del notebook

Questa attività viene usata per eseguire un notebook.

Nell'esempio seguente viene aggiunta un'attività notebook a un processo e viene impostato un parametro di processo denominato my_job_run_id. Il percorso del notebook da distribuire è relativo al file di configurazione in cui è dichiarata questa attività. L'attività ottiene il notebook dalla posizione distribuita nell'area di lavoro di Azure Databricks.

resources:
  jobs:
    my-notebook-job:
      name: my-notebook-job
      tasks:
        - task_key: my-notebook-task
          notebook_task:
            notebook_path: ./my-notebook.ipynb
      parameters:
        - name: my_job_run_id
          default: "{{job.run_id}}"

Per altri mapping che è possibile impostare per questa attività, vedere tasks > notebook_task nel payload della richiesta dell'operazione di creazione del processo come definito in POST /api/2.1/jobs/create nel riferimento all'API REST, espresso in formato YAML. Vedere Attività notebook per i processi.

Attività di condizione If/else

Il condition_task consente di aggiungere un'attività con la logica condizionale se/altrimenti al compito. L'attività valuta una condizione che può essere usata per controllare l'esecuzione di altre attività. L'attività condizione non richiede l'esecuzione di un cluster e non supporta tentativi o notifiche. Per altre informazioni sull'attività if/else, vedere Aggiungere logica di diramazione a un processo con l'attività If/else.

L'esempio seguente contiene un'attività di condizione e un'attività di notebook, in cui l'attività di notebook viene eseguita solo se il numero di riparazioni del lavoro è minore di 5.

resources:
  jobs:
    my-job:
      name: my-job
      tasks:
        - task_key: condition_task
          condition_task:
            op: LESS_THAN
            left: "{{job.repair_count}}"
            right: "5"
        - task_key: notebook_task
          depends_on:
            - task_key: condition_task
              outcome: "true"
          notebook_task:
            notebook_path: ../src/notebook.ipynb

Per altri mapping che è possibile impostare per questa attività, vedere tasks > condition_task nel payload della richiesta dell'operazione di creazione del processo come definito in POST /api/2.1/jobs/create nel riferimento all'API REST, espresso in formato YAML.

Per ogni attività

Il for_each_task consente di aggiungere un'attività con un ciclo 'for each' al tuo incarico. L'attività esegue un'attività nidificata per ogni input specificato. Per altre informazioni sul for_each_task, vedere Eseguire un'attività di lavoro di Azure Databricks parametrizzata in un ciclo.

Nell'esempio seguente viene aggiunto un for_each_task a un compito, in cui viene ripetuto il ciclo sui valori di un altro compito e vengono elaborati.

resources:
  jobs:
    my_job:
      name: my_job
      tasks:
        - task_key: generate_countries_list
          notebook_task:
            notebook_path: ../src/generate_countries_list.ipnyb
        - task_key: process_countries
          depends_on:
            - task_key: generate_countries_list
          for_each_task:
            inputs: "{{tasks.generate_countries_list.values.countries}}"
            task:
              task_key: process_countries_iteration
              notebook_task:
                notebook_path: ../src/process_countries_notebook.ipnyb

Per altri mapping che è possibile impostare per questa attività, vedere tasks > for_each_task nel payload della richiesta dell'operazione di creazione del processo come definito in POST /api/2.1/jobs/create nel riferimento all'API REST, espresso in formato YAML.

Attività script Python

Usare questa attività per eseguire un file Python.

L'esempio seguente aggiunge un'attività script Python a un processo. Il percorso del file Python da distribuire è relativo al file di configurazione in cui è dichiarata questa attività. L'attività ottiene il file Python dal percorso distribuito nell'area di lavoro di Azure Databricks.

resources:
  jobs:
    my-python-script-job:
      name: my-python-script-job

      tasks:
        - task_key: my-python-script-task
          spark_python_task:
            python_file: ./my-script.py

Per altri mapping che è possibile impostare per questa attività, vedere tasks > spark_python_task nel payload della richiesta dell'operazione di creazione del processo come definito in POST /api/2.1/jobs/create nel riferimento all'API REST, espresso in formato YAML. Vedere anche Attività script Python per i processi.

Attività Python wheel

Questa attività viene usata per eseguire un file Python wheel.

L'esempio seguente aggiunge un'attività Python wheel a un processo. Il percorso del file Python wheel da distribuire è relativo al file di configurazione in cui è dichiarata questa attività. Vedere Dipendenze della libreria di aggregazioni di asset di Databricks.

resources:
  jobs:
    my-python-wheel-job:
      name: my-python-wheel-job
      tasks:
        - task_key: my-python-wheel-task
          python_wheel_task:
            entry_point: run
            package_name: my_package
          libraries:
            - whl: ./my_package/dist/my_package-*.whl

Per altri mapping che è possibile impostare per questa attività, vedere tasks > python_wheel_task nel payload della richiesta dell'operazione di creazione del processo come definito in POST /api/2.1/jobs/create nel riferimento all'API REST, espresso in formato YAML. Consultare anche Sviluppare un file Python wheel usando i Databricks Asset Bundle e Attività Python wheel per i processi.

Attività JAR

Usare questa attività per eseguire un file JAR. È possibile fare riferimento a librerie JAR locali o in un'area di lavoro, in un volume del catalogo Unity o in un percorso di archiviazione cloud esterno. Vedere Dipendenze della libreria di aggregazioni di asset di Databricks.

L'esempio seguente aggiunge un file JAR a un processo. Il percorso del file JAR si trova nella posizione del volume specificata.

resources:
  jobs:
    my-jar-job:
      name: my-jar-job
      tasks:
        - task_key: my-jar-task
          spark_jar_task:
            main_class_name: org.example.com.Main
          libraries:
            - jar: /Volumes/main/default/my-volume/my-project-0.1.0-SNAPSHOT.jar

Per altri mapping che è possibile impostare per questa attività, vedere tasks > spark_jar_task nel payload della richiesta dell'operazione di creazione del processo come definito in POST /api/2.1/jobs/create nel riferimento all'API REST, espresso in formato YAML. Vedere Attività JAR per i processi.

Attività file SQL

Questa attività viene usata per eseguire un file SQL che si trova in un'area di lavoro o in un repository Git remoto.

L'esempio seguente aggiunge un file SQL a un processo. Questa attività file SQL usa il warehouse SQL specificato per eseguire il file SQL specificato.

resources:
  jobs:
    my-sql-file-job:
      name: my-sql-file-job
      tasks:
        - task_key: my-sql-file-task
          sql_task:
            file:
              path: /Users/someone@example.com/hello-world.sql
              source: WORKSPACE
            warehouse_id: 1a111111a1111aa1

Per ottenere l’ID di un warehouse SQL, aprire la pagina delle impostazioni del warehouse SQL, quindi copiare l'ID tra parentesi dopo il nome del warehouse nel campo Nome della scheda Panoramica.

Per altri mapping che è possibile impostare per questa attività, vedere tasks > sql_task > file nel payload della richiesta dell'operazione di creazione del processo come definito in POST /api/2.1/jobs/create nel riferimento all'API REST, espresso in formato YAML. Vedere Attività SQL per i processi.

Attività della pipeline Delta Live Table

Questa attività viene usata per eseguire una pipeline di Delta Live Table. Vedere Che cos'è Delta Live Tables?.

Nell'esempio seguente viene aggiunta un'attività della pipeline Delta Live Table a un processo. Questa attività della pipeline Delta Live Table esegue la pipeline specificata.

resources:
  jobs:
    my-pipeline-job:
      name: my-pipeline-job
      tasks:
        - task_key: my-pipeline-task
          pipeline_task:
            pipeline_id: 11111111-1111-1111-1111-111111111111

È possibile ottenere l'ID di una pipeline aprendo la pipeline nell'area di lavoro e copiando il valore dell'ID pipeline nella scheda Dettagli pipeline della pagina delle impostazioni della pipeline.

Per altri mapping che è possibile impostare per questa attività, vedere tasks > pipeline_task nel payload della richiesta dell'operazione di creazione del processo come definito in POST /api/2.1/jobs/create nel riferimento all'API REST, espresso in formato YAML. Vedere Attività pipeline Delta Live Tables per i processi.

attività dbt

Questa attività viene usata per eseguire uno o più comandi dbt. Vedere Connettersi a dbt Cloud.

L'esempio seguente aggiunge un’attività dbt a un processo. Questa attività dbt usa il data warehouse SQL specificato per eseguire i comandi dbt specificati.

resources:
  jobs:
    my-dbt-job:
      name: my-dbt-job
      tasks:
        - task_key: my-dbt-task
          dbt_task:
            commands:
              - "dbt deps"
              - "dbt seed"
              - "dbt run"
            project_directory: /Users/someone@example.com/Testing
            warehouse_id: 1a111111a1111aa1
          libraries:
            - pypi:
                package: "dbt-databricks>=1.0.0,<2.0.0"

Per ottenere l’ID di un warehouse SQL, aprire la pagina delle impostazioni del warehouse SQL, quindi copiare l'ID tra parentesi dopo il nome del warehouse nel campo Nome della scheda Panoramica.

Per altri mapping che è possibile impostare per questa attività, vedere tasks > dbt_task nel payload della richiesta dell'operazione di creazione del processo come definito in POST /api/2.1/jobs/create nel riferimento all'API REST, espresso in formato YAML. Vedere attività dbt per i processi.

I bundle di asset di Databricks includono anche un modello di progetto dbt-sql che definisce un processo con un'attività dbt, nonché i profili dbt per i processi dbt distribuiti. Per informazioni sui modelli di bundle di asset di Databricks, si veda Usare un modello predefinito di bundle.

Eseguire l'attività del processo

Questa attività viene usata per eseguire un altro processo.

L'esempio seguente contiene un'attività di esecuzione del processo nel secondo processo che esegue il primo processo.

resources:
  jobs:
    my-first-job:
      name: my-first-job
      tasks:
        - task_key: my-first-job-task
          new_cluster:
            spark_version: "13.3.x-scala2.12"
            node_type_id: "i3.xlarge"
            num_workers: 2
          notebook_task:
            notebook_path: ./src/test.py
    my_second_job:
      name: my-second-job
      tasks:
        - task_key: my-second-job-task
          run_job_task:
            job_id: ${resources.jobs.my-first-job.id}

In questo esempio viene utilizzata una sostituzione per recuperare l'ID del processo da eseguire. Per ottenere l'ID di un processo dall'interfaccia utente, aprire il processo nell'area di lavoro e copiare l'ID dal valore ID processo nella scheda Dettagli processo della pagina delle impostazioni dei processi.

Per altri mapping che è possibile impostare per questa attività, vedere tasks > run_job_task nel payload della richiesta dell'operazione di creazione del processo come definito in POST /api/2.1/jobs/create nel riferimento all'API REST, espresso in formato YAML.