Partager via


Fonctions définies par l’utilisateur dans Databricks Connect pour Scala

Remarque

Cet article couvre Databricks Connect pour Databricks Runtime 14.1 et versions ultérieures.

Cet article décrit comment exécuter des fonctions définies par l’utilisateur avec Databricks Connect pour Scala. Databricks Connect vous permet de connecter des environnements de développement intégré (IDE) populaires, des serveurs notebook et des applications personnalisées aux clusters Azure Databricks. Pour obtenir la version Python de cet article, consultez Fonctions définies par l’utilisateur dans Databricks Connect pour Python.

Remarque

Avant de commencer à utiliser Databricks Connect, vous devez configurer le client Databricks Connect.

Pour Databricks Runtime 14.1 et versions ultérieures, Databricks Connect pour Scala prend en charge l’exécution de fonctions définies par l’utilisateur (UDF).

Pour exécuter une fonction UDF, la classe compilée et les fichiers JAR requis par la fonction UDF doivent être chargés sur le cluster. L’API addCompiledArtifacts() peut être utilisée pour spécifier la classe compilée et les fichiers JAR qui doivent être chargés.

Remarque

La version de Scala utilisée par le client doit correspondre à la version de Scala sur le cluster Azure Databricks. Pour vérifier la version de Scala du cluster, consultez la section « Environnement système » de la version Databricks Runtime du cluster dans Notes de publication sur les versions et la compatibilité de Databricks Runtime.

Le programme Scala suivant configure une fonction UDF simple qui place les valeurs dans une colonne.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}

object Main {
  def main(args: Array[String]): Unit = {
    val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI

    val spark = DatabricksSession.builder()
      .addCompiledArtifacts(sourceLocation)
      .getOrCreate()

    def squared(x: Int): Int = x * x

    val squared_udf = udf(squared _)

    spark.range(3)
      .withColumn("squared", squared_udf(col("id")))
      .select("squared")
      .show()
  }
}

Dans l’exemple précédent, parce que la fonction UDF est entièrement contenue dans Main, seul l’artefact compilé de Main est ajouté. Si la fonction UDF s’étend sur d’autres classes ou utilise des bibliothèques externes (par ex., des bibliothèques JAR), toutes ces bibliothèques doivent également être incluses.

Lorsque la session Spark est déjà initialisée, des classes compilées et des fichiers JAR supplémentaires peuvent être chargés à l’aide de l’API spark.addArtifact().

Remarque

Lors du chargement des fichiers JAR, tous les fichiers JAR de dépendance transitive doivent être inclus. Les API ne procèdent à aucune détection automatique des dépendances transitives.

API de jeu de données typé

Le même mécanisme décrit dans la section précédente sur les fonctions définies par l’utilisateur s’applique également aux API de jeu de données typé.

Les API de jeu de données typé permettent d’exécuter des transformations telles que des mappages, des filtres et des agrégations sur les jeux de données résultants. Elles sont également exécutées de la même façon que les fonctions UDF sur le cluster Databricks.

L’application Scala suivante utilise l’API map() pour modifier un nombre dans une colonne de résultat en chaîne préfixée.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}

object Main {
  def main(args: Array[String]): Unit = {
    val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI

    val spark = DatabricksSession.builder()
      .addCompiledArtifacts(sourceLocation)
      .getOrCreate()

    spark.range(3).map(f => s"row-$f").show()
  }
}

Bien que cet exemple utilise l’API map(), il s’applique également aux autres API de jeu de données typé telles que filter(), mapPartitions(), etc.