Condividi tramite


Esempi di codice per Databricks Connect per Python

Nota

Questo articolo illustra Databricks Connect per Databricks Runtime 13.3 LTS e versioni successive.

Questo articolo fornisce esempi di codice che usano Databricks Connect per Python. Databricks Connect consente di connettere gli IDE, i server notebook e le applicazioni personalizzate più diffusi ai cluster Azure Databricks. Consultare Cos’è Databricks Connect?. Per la versione scala di questo articolo, vedere Esempi di codice per Databricks Connect per Scala.

Nota

Prima di iniziare a usare Databricks Connect, è necessario configurare il client Databricks Connect.

Databricks offre diverse applicazioni di esempio aggiuntive che illustrano come usare Databricks Connect. Vedere le applicazioni di esempio per il repository Databricks Connect in GitHub, in particolare:

È anche possibile usare gli esempi di codice più semplici seguenti per sperimentare con Databricks Connect. Questi esempi presuppongono che si stia usando l'autenticazione predefinita per la configurazione del client Databricks Connect.

Questo semplice esempio di codice esegue una query sulla tabella specificata e quindi mostra le prime 5 righe della tabella specificata. Per usare una tabella diversa, modificare la chiamata a spark.read.table.

from databricks.connect import DatabricksSession

spark = DatabricksSession.builder.getOrCreate()

df = spark.read.table("samples.nyctaxi.trips")
df.show(5)

Questo esempio di codice più lungo esegue le operazioni seguenti:

  1. Crea un dataframe in memoria.
  2. Crea una tabella con il nome zzz_demo_temps_table all'interno dello default schema. Se la tabella con questo nome esiste già, la tabella viene eliminata per prima. Per usare uno schema o una tabella diversa, modificare le chiamate a spark.sql, temps.write.saveAsTableo entrambe.
  3. Salva il contenuto del dataframe nella tabella.
  4. Esegue una SELECT query sul contenuto della tabella.
  5. Mostra il risultato della query.
  6. Elimina la tabella.
from databricks.connect import DatabricksSession
from pyspark.sql.types import *
from datetime import date

spark = DatabricksSession.builder.getOrCreate()

# Create a Spark DataFrame consisting of high and low temperatures
# by airport code and date.
schema = StructType([
  StructField('AirportCode', StringType(), False),
  StructField('Date', DateType(), False),
  StructField('TempHighF', IntegerType(), False),
  StructField('TempLowF', IntegerType(), False)
])

data = [
  [ 'BLI', date(2021, 4, 3), 52, 43],
  [ 'BLI', date(2021, 4, 2), 50, 38],
  [ 'BLI', date(2021, 4, 1), 52, 41],
  [ 'PDX', date(2021, 4, 3), 64, 45],
  [ 'PDX', date(2021, 4, 2), 61, 41],
  [ 'PDX', date(2021, 4, 1), 66, 39],
  [ 'SEA', date(2021, 4, 3), 57, 43],
  [ 'SEA', date(2021, 4, 2), 54, 39],
  [ 'SEA', date(2021, 4, 1), 56, 41]
]

temps = spark.createDataFrame(data, schema)

# Create a table on the Databricks cluster and then fill
# the table with the DataFrame's contents.
# If the table already exists from a previous run,
# delete it first.
spark.sql('USE default')
spark.sql('DROP TABLE IF EXISTS zzz_demo_temps_table')
temps.write.saveAsTable('zzz_demo_temps_table')

# Query the table on the Databricks cluster, returning rows
# where the airport code is not BLI and the date is later
# than 2021-04-01. Group the results and order by high
# temperature in descending order.
df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " \
  "WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " \
  "GROUP BY AirportCode, Date, TempHighF, TempLowF " \
  "ORDER BY TempHighF DESC")
df_temps.show()

# Results:
#
# +-----------+----------+---------+--------+
# |AirportCode|      Date|TempHighF|TempLowF|
# +-----------+----------+---------+--------+
# |        PDX|2021-04-03|       64|      45|
# |        PDX|2021-04-02|       61|      41|
# |        SEA|2021-04-03|       57|      43|
# |        SEA|2021-04-02|       54|      39|
# +-----------+----------+---------+--------+

# Clean up by deleting the table from the Databricks cluster.
spark.sql('DROP TABLE zzz_demo_temps_table')

Nota

L'esempio seguente descrive come scrivere codice portabile tra Databricks Connect per Databricks Runtime 13.3 LTS e versioni successive negli ambienti in cui la DatabricksSession classe non è disponibile.

Nell'esempio seguente viene utilizzata la DatabricksSession classe oppure viene utilizzata la SparkSession classe se la DatabricksSession classe non è disponibile, per eseguire una query sulla tabella specificata e restituire le prime 5 righe. In questo esempio viene usata la SPARK_REMOTE variabile di ambiente per l'autenticazione.

from pyspark.sql import SparkSession, DataFrame

def get_spark() -> SparkSession:
  try:
    from databricks.connect import DatabricksSession
    return DatabricksSession.builder.getOrCreate()
  except ImportError:
    return SparkSession.builder.getOrCreate()

def get_taxis(spark: SparkSession) -> DataFrame:
  return spark.read.table("samples.nyctaxi.trips")

get_taxis(get_spark()).show(5)