User-defined functions in Databricks Connect for Scala

Note

This article covers Databricks Connect for Databricks Runtime 14.1 and above.

This article describes how to execute user-defined functions with Databricks Connect for Scala. Databricks Connect enables you to connect popular IDEs, notebook servers, and custom applications to Databricks clusters. For the Python version of this article, see User-defined functions in Databricks Connect for Python.

Note

Before you begin to use Databricks Connect, you must set up the Databricks Connect client.

For Databricks Runtime 14.1 and above, Databricks Connect for Scala supports running user-defined functions (UDFs).

In order to run a UDF, the compiled class and JARs that the UDF requires must be uploaded to the cluster. The addCompiledArtifacts() API can be used to specify the compiled class and JAR files that must be uploaded.

Note

The Scala used by the client must match the Scala version on the Databricks cluster. To check the cluster’s Scala version, see the “System Environment” section for the cluster’s Databricks Runtime version in Databricks Runtime release notes versions and compatibility.

The following Scala program sets up a simple UDF that squares values in a column.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}

object Main {
  def main(args: Array[String]): Unit = {
    val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI

    val spark = DatabricksSession.builder()
      .addCompiledArtifacts(sourceLocation)
      .getOrCreate()

    def squared(x: Int): Int = x * x

    val squared_udf = udf(squared _)

    spark.range(3)
      .withColumn("squared", squared_udf(col("id")))
      .select("squared")
      .show()
  }
}

In the preceding example, because the UDF is fully contained within Main, only the compiled artifact of Main is added. If the UDF spreads over other classes or uses external libraries (i.e., JARs), all of these libraries should also be included.

When the Spark session is already initialized, further compiled classes and JARs can be uploaded using the spark.addArtifact() API.

Note

When uploading JARs, all transitive dependency JARs must be included for upload. The APIs do not perform any automatic detection of transitive dependencies.

Typed Dataset APIs

The same mechanism described in the preceding section for UDFs also applies to typed Dataset APIs.

Typed Dataset APIs allow one to run transformations such as map, filter, and aggregations on resulting Datasets. These are also executed similar to UDFs on the Databricks cluster.

The following Scala application uses the map() API to modify a number in a result column to a prefixed string.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}

object Main {
  def main(args: Array[String]): Unit = {
    val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI

    val spark = DatabricksSession.builder()
      .addCompiledArtifacts(sourceLocation)
      .getOrCreate()

    spark.range(3).map(f => s"row-$f").show()
  }
}

While this example uses the map() API, this also applies to other typed Dataset APIs such as filter(), mapPartitions(), etc.