Benutzerdefinierte Funktionen in Databricks Connect für Scala
Hinweis
Dieser Artikel behandelt Databricks Connect für Databricks Runtime Version 14.1 und höher.
In diesem Artikel wird beschrieben, wie Sie benutzerdefinierte Funktionen mit Databricks Connect für Scala verwenden. Mit Databricks Connect können Sie beliebte IDEs, Notebookserver und benutzerdefinierte Anwendungen mit Azure Databricks-Clustern verbinden. Die Python-Version dieses Artikels finden Sie unter Benutzerdefinierte Funktionen in Databricks Connect für Python.
Hinweis
Bevor Sie beginnen, Databricks Connect zu verwenden, müssen Sie den Databricks Connect-Client einrichten.
Für Databricks Runtime 14.1 und höher unterstützt Databricks Connect für Scala das Ausführen von benutzerdefinierten Funktionen (UDFs).
Um eine UDF auszuführen, müssen die kompilierten Klassen und JARs, die die UDF benötigt, in den Cluster hochgeladen werden.
Die addCompiledArtifacts()
-API kann verwendet werden, um die kompilierten Klassen- und JAR-Dateien anzugeben, die hochgeladen werden müssen.
Hinweis
Das vom Client verwendete Scala muss mit der Scala-Version im Azure Databricks-Cluster übereinstimmen. Weitere Informationen zur Überprüfung der Scala-Version des Clusters finden Sie unter Versionshinweise zu Databricks Runtime-Versionen und -Kompatibilität im Abschnitt „Systemumgebung“ für die Databricks RuntimeVersion des Clusters.
Das folgende Scala-Programm richtet eine einfache UDF ein, die Werte in einer Spalte quadratiert.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}
object Main {
def main(args: Array[String]): Unit = {
val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI
val spark = DatabricksSession.builder()
.addCompiledArtifacts(sourceLocation)
.getOrCreate()
def squared(x: Int): Int = x * x
val squared_udf = udf(squared _)
spark.range(3)
.withColumn("squared", squared_udf(col("id")))
.select("squared")
.show()
}
}
Da die UDF im vorangegangenen Beispiel vollständig in Main
enthalten ist, wird nur das kompilierte Artefakt von Main
hinzugefügt.
Wenn sich die UDF über andere Klassen verteilt oder externe Bibliotheken (d. h. JARs) verwendet, sollten auch alle diese Bibliotheken einbezogen werden.
Wenn die Spark-Sitzung bereits initialisiert ist, können weitere kompilierte Klassen und JARs mithilfe der spark.addArtifact()
-API hochgeladen werden.
Hinweis
Beim Hochladen von JARs müssen alle transitiven Abhängigkeits-JARs für den Upload eingeschlossen werden. Die APIs führen keine automatische Erkennung transitiver Abhängigkeiten durch.
Typisierte Dataset-APIs
Der gleiche Mechanismus, der im vorherigen Abschnitt für UDFs beschrieben wird, gilt auch für typisierte Dataset-APIs.
Typisierte Dataset-APIs ermöglichen eine Ausführung von Transformationen wie Zuordnung, Filter und Aggregationen für resultierende Datasets. Diese werden auch ähnlich wie UDFs im Databricks-Cluster ausgeführt.
Die folgende Scala-Anwendung verwendet die map()
-API, um eine Zahl in einer Ergebnisspalte in eine vorangestellte Zeichenfolge zu ändern.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}
object Main {
def main(args: Array[String]): Unit = {
val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI
val spark = DatabricksSession.builder()
.addCompiledArtifacts(sourceLocation)
.getOrCreate()
spark.range(3).map(f => s"row-$f").show()
}
}
Während in diesem Beispiel die map()
-API verwendet wird, gilt dies auch für andere typisierte Dataset-APIs wie filter()
, mapPartitions()
usw.