Aggiungere attività ai processi nei bundle di asset di Databricks
Questo articolo fornisce esempi di vari tipi di attività che è possibile aggiungere ai processi di Azure Databricks in Bundle di asset di Databricks. Consultare Che cosa sono i Databricks Asset Bundle?.
La maggior parte dei tipi di attività di processo include parametri specifici dell'attività tra le impostazioni supportate, ma è anche possibile definire i parametri del processo che vengono passati alle attività. I riferimenti a valori dinamici sono supportati per i parametri del processo, che consentono di passare valori specifici per l'esecuzione del processo tra le attività. Vedere Che cos'è un valore dinamico di riferimento?.
Nota
È possibile eseguire l'override delle impostazioni delle attività del processo. Vedere Eseguire l'override delle impostazioni delle attività del processo in bundle di asset di Databricks.
Suggerimento
Per generare rapidamente la configurazione delle risorse per un processo esistente usando l'interfaccia della riga di comando di Databricks, è possibile usare il comando bundle generate job
. Vedere i comandi del bundle.
Scheda del notebook
Questa attività viene usata per eseguire un notebook.
Nell'esempio seguente viene aggiunta un'attività notebook a un processo e viene impostato un parametro di processo denominato my_job_run_id
. Il percorso del notebook da distribuire è relativo al file di configurazione in cui è dichiarata questa attività. L'attività ottiene il notebook dalla posizione distribuita nell'area di lavoro di Azure Databricks.
resources:
jobs:
my-notebook-job:
name: my-notebook-job
tasks:
- task_key: my-notebook-task
notebook_task:
notebook_path: ./my-notebook.ipynb
parameters:
- name: my_job_run_id
default: "{{job.run_id}}"
Per altri mapping che è possibile impostare per questa attività, vedere tasks > notebook_task
nel payload della richiesta dell'operazione di creazione del processo come definito in POST /api/2.1/jobs/create nel riferimento all'API REST, espresso in formato YAML. Vedere Attività notebook per i processi.
Attività di condizione If/else
Il condition_task
consente di aggiungere un'attività con la logica condizionale se/altrimenti al compito. L'attività valuta una condizione che può essere usata per controllare l'esecuzione di altre attività. L'attività condizione non richiede l'esecuzione di un cluster e non supporta tentativi o notifiche. Per altre informazioni sull'attività if/else, vedere Aggiungere logica di diramazione a un processo con l'attività If/else.
L'esempio seguente contiene un'attività di condizione e un'attività di notebook, in cui l'attività di notebook viene eseguita solo se il numero di riparazioni del lavoro è minore di 5.
resources:
jobs:
my-job:
name: my-job
tasks:
- task_key: condition_task
condition_task:
op: LESS_THAN
left: "{{job.repair_count}}"
right: "5"
- task_key: notebook_task
depends_on:
- task_key: condition_task
outcome: "true"
notebook_task:
notebook_path: ../src/notebook.ipynb
Per altri mapping che è possibile impostare per questa attività, vedere tasks > condition_task
nel payload della richiesta dell'operazione di creazione del processo come definito in POST /api/2.1/jobs/create nel riferimento all'API REST, espresso in formato YAML.
Per ogni attività
Il for_each_task
consente di aggiungere un'attività con un ciclo 'for each' al tuo incarico. L'attività esegue un'attività nidificata per ogni input specificato. Per altre informazioni sul for_each_task
, vedere Eseguire un'attività di lavoro di Azure Databricks parametrizzata in un ciclo.
Nell'esempio seguente viene aggiunto un for_each_task
a un compito, in cui viene ripetuto il ciclo sui valori di un altro compito e vengono elaborati.
resources:
jobs:
my_job:
name: my_job
tasks:
- task_key: generate_countries_list
notebook_task:
notebook_path: ../src/generate_countries_list.ipnyb
- task_key: process_countries
depends_on:
- task_key: generate_countries_list
for_each_task:
inputs: "{{tasks.generate_countries_list.values.countries}}"
task:
task_key: process_countries_iteration
notebook_task:
notebook_path: ../src/process_countries_notebook.ipnyb
Per altri mapping che è possibile impostare per questa attività, vedere tasks > for_each_task
nel payload della richiesta dell'operazione di creazione del processo come definito in POST /api/2.1/jobs/create nel riferimento all'API REST, espresso in formato YAML.
Attività script Python
Usare questa attività per eseguire un file Python.
L'esempio seguente aggiunge un'attività script Python a un processo. Il percorso del file Python da distribuire è relativo al file di configurazione in cui è dichiarata questa attività. L'attività ottiene il file Python dal percorso distribuito nell'area di lavoro di Azure Databricks.
resources:
jobs:
my-python-script-job:
name: my-python-script-job
tasks:
- task_key: my-python-script-task
spark_python_task:
python_file: ./my-script.py
Per altri mapping che è possibile impostare per questa attività, vedere tasks > spark_python_task
nel payload della richiesta dell'operazione di creazione del processo come definito in POST /api/2.1/jobs/create nel riferimento all'API REST, espresso in formato YAML. Vedere anche Attività script Python per i processi.
Attività Python wheel
Questa attività viene usata per eseguire un file Python wheel.
L'esempio seguente aggiunge un'attività Python wheel a un processo. Il percorso del file Python wheel da distribuire è relativo al file di configurazione in cui è dichiarata questa attività. Vedere Dipendenze della libreria di aggregazioni di asset di Databricks.
resources:
jobs:
my-python-wheel-job:
name: my-python-wheel-job
tasks:
- task_key: my-python-wheel-task
python_wheel_task:
entry_point: run
package_name: my_package
libraries:
- whl: ./my_package/dist/my_package-*.whl
Per altri mapping che è possibile impostare per questa attività, vedere tasks > python_wheel_task
nel payload della richiesta dell'operazione di creazione del processo come definito in POST /api/2.1/jobs/create nel riferimento all'API REST, espresso in formato YAML. Consultare anche Sviluppare un file Python wheel usando i Databricks Asset Bundle e Attività Python wheel per i processi.
Attività JAR
Usare questa attività per eseguire un file JAR. È possibile fare riferimento a librerie JAR locali o in un'area di lavoro, in un volume del catalogo Unity o in un percorso di archiviazione cloud esterno. Vedere Dipendenze della libreria di aggregazioni di asset di Databricks.
L'esempio seguente aggiunge un file JAR a un processo. Il percorso del file JAR si trova nella posizione del volume specificata.
resources:
jobs:
my-jar-job:
name: my-jar-job
tasks:
- task_key: my-jar-task
spark_jar_task:
main_class_name: org.example.com.Main
libraries:
- jar: /Volumes/main/default/my-volume/my-project-0.1.0-SNAPSHOT.jar
Per altri mapping che è possibile impostare per questa attività, vedere tasks > spark_jar_task
nel payload della richiesta dell'operazione di creazione del processo come definito in POST /api/2.1/jobs/create nel riferimento all'API REST, espresso in formato YAML. Vedere Attività JAR per i processi.
Attività file SQL
Questa attività viene usata per eseguire un file SQL che si trova in un'area di lavoro o in un repository Git remoto.
L'esempio seguente aggiunge un file SQL a un processo. Questa attività file SQL usa il warehouse SQL specificato per eseguire il file SQL specificato.
resources:
jobs:
my-sql-file-job:
name: my-sql-file-job
tasks:
- task_key: my-sql-file-task
sql_task:
file:
path: /Users/someone@example.com/hello-world.sql
source: WORKSPACE
warehouse_id: 1a111111a1111aa1
Per ottenere l’ID di un warehouse SQL, aprire la pagina delle impostazioni del warehouse SQL, quindi copiare l'ID tra parentesi dopo il nome del warehouse nel campo Nome della scheda Panoramica.
Per altri mapping che è possibile impostare per questa attività, vedere tasks > sql_task > file
nel payload della richiesta dell'operazione di creazione del processo come definito in POST /api/2.1/jobs/create nel riferimento all'API REST, espresso in formato YAML. Vedere Attività SQL per i processi.
Attività della pipeline Delta Live Table
Questa attività viene usata per eseguire una pipeline di Delta Live Table. Vedere Che cos'è Delta Live Tables?.
Nell'esempio seguente viene aggiunta un'attività della pipeline Delta Live Table a un processo. Questa attività della pipeline Delta Live Table esegue la pipeline specificata.
resources:
jobs:
my-pipeline-job:
name: my-pipeline-job
tasks:
- task_key: my-pipeline-task
pipeline_task:
pipeline_id: 11111111-1111-1111-1111-111111111111
È possibile ottenere l'ID di una pipeline aprendo la pipeline nell'area di lavoro e copiando il valore dell'ID pipeline nella scheda Dettagli pipeline della pagina delle impostazioni della pipeline.
Per altri mapping che è possibile impostare per questa attività, vedere tasks > pipeline_task
nel payload della richiesta dell'operazione di creazione del processo come definito in POST /api/2.1/jobs/create nel riferimento all'API REST, espresso in formato YAML. Vedere Attività pipeline Delta Live Tables per i processi.
attività dbt
Questa attività viene usata per eseguire uno o più comandi dbt. Vedere Connettersi a dbt Cloud.
L'esempio seguente aggiunge un’attività dbt a un processo. Questa attività dbt usa il data warehouse SQL specificato per eseguire i comandi dbt specificati.
resources:
jobs:
my-dbt-job:
name: my-dbt-job
tasks:
- task_key: my-dbt-task
dbt_task:
commands:
- "dbt deps"
- "dbt seed"
- "dbt run"
project_directory: /Users/someone@example.com/Testing
warehouse_id: 1a111111a1111aa1
libraries:
- pypi:
package: "dbt-databricks>=1.0.0,<2.0.0"
Per ottenere l’ID di un warehouse SQL, aprire la pagina delle impostazioni del warehouse SQL, quindi copiare l'ID tra parentesi dopo il nome del warehouse nel campo Nome della scheda Panoramica.
Per altri mapping che è possibile impostare per questa attività, vedere tasks > dbt_task
nel payload della richiesta dell'operazione di creazione del processo come definito in POST /api/2.1/jobs/create nel riferimento all'API REST, espresso in formato YAML. Vedere attività dbt per i processi.
I bundle di asset di Databricks includono anche un modello di progetto dbt-sql
che definisce un processo con un'attività dbt, nonché i profili dbt per i processi dbt distribuiti. Per informazioni sui modelli di bundle di asset di Databricks, si veda Usare un modello predefinito di bundle.
Eseguire l'attività del processo
Questa attività viene usata per eseguire un altro processo.
L'esempio seguente contiene un'attività di esecuzione del processo nel secondo processo che esegue il primo processo.
resources:
jobs:
my-first-job:
name: my-first-job
tasks:
- task_key: my-first-job-task
new_cluster:
spark_version: "13.3.x-scala2.12"
node_type_id: "i3.xlarge"
num_workers: 2
notebook_task:
notebook_path: ./src/test.py
my_second_job:
name: my-second-job
tasks:
- task_key: my-second-job-task
run_job_task:
job_id: ${resources.jobs.my-first-job.id}
In questo esempio viene utilizzata una sostituzione per recuperare l'ID del processo da eseguire. Per ottenere l'ID di un processo dall'interfaccia utente, aprire il processo nell'area di lavoro e copiare l'ID dal valore ID processo nella scheda Dettagli processo della pagina delle impostazioni dei processi.
Per altri mapping che è possibile impostare per questa attività, vedere tasks > run_job_task
nel payload della richiesta dell'operazione di creazione del processo come definito in POST /api/2.1/jobs/create nel riferimento all'API REST, espresso in formato YAML.