Skip to main content

User-defined functions in Databricks Connect for Python

note

This article covers Databricks Connect for Databricks Runtime 13.1 and above.

Databricks Connect for Python supports user-defined functions (UDF). When a DataFrame operation that includes UDFs is executed, the UDFs are serialized by Databricks Connect and sent to the server as part of the request.

For information about UDFs for Databricks Connect for Scala, see User-defined functions in Databricks Connect for Scala.

note

Because the user-defined function is serialized and deserialized, the Python version of the client must match the Python version on the Databricks compute. For supported versions, see the version support matrix.

Define a UDF

To create a UDF in Databricks Connect for Python, use one of the following supported functions:

For example, the following Python sets up a simple UDF that squares the values in a column.

Python
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from databricks.connect import DatabricksSession

@udf(returnType=IntegerType())
def double(x):
return x * x

spark = DatabricksSession.builder.getOrCreate()

df = spark.range(1, 2)
df = df.withColumn("doubled", double(col("id")))

df.show()

UDFs with dependencies

Preview

This feature is in Public Preview and requires Databricks Connect for Python 16.4 or above, and a cluster running Databricks Runtime 16.4 or above. To use this feature, enable the preview Enhanced Python UDFs in Unity Catalog in your workspace.

Databricks Connect supports specifying Python dependencies that are required for UDFs. These dependencies are installed on Databricks compute as part of the UDF's Python environment.

This feature allows users to specify dependencies that the UDF needs in addition to the packages provided in the base environment. It can also be used to install a different version of the package from what is provided in the base environment.

Dependencies can be installed from the following sources:

  • PyPI packages
    • PyPI packages can be specified according to PEP 508, for example, dice, pyjokes<1 or simplejson==3.19.*.
  • Files stored in Unity Catalog volumes
    • Both wheel packages (.whl) and gzipped tar files (.tar.gz) are supported. The user must be granted READ_FILE permission on the file in the re:[UC] volume.
    • When installing packages from Unity Catalog volumes, to invoke the UDFs, users need READ VOLUME permission on the source volume. Granting this permission to all account users automatically enables this for new users.
    • Unity Catalog volumes files should be specified as dbfs:<path>, for example, dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep.whl or dbfs:/Volumes/users/someone@example.com/tars/my_private_deps.tar.gz.

To include custom dependencies in your UDF, specify them in an environment using withDependencies, then use that environment to create a Spark session. The dependencies are installed on your Databricks compute and will be available in all UDFs that use this Spark session.

The following code declares the PyPI package dice as a dependency:

Python
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dice==3.1.0")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

Or, to specify a dependency of a wheel in a volume:

Python
from databricks.connect import DatabricksSession, DatabricksEnv

env = DatabricksEnv().withDependencies("dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep.whl")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

Behavior in Databricks notebooks and jobs

In notebooks and jobs, UDF dependencies need to be installed directly in the REPL. Databricks Connect validates the REPL Python environment by verifying that all specified dependencies are already installed and throws an exception if any are not installed.

Notebook environment validation occurs for both PyPI and Unity Catalog volume dependencies. Volume dependencies need to be packaged following the standard Python packaging specifications from PEP-427 or later for wheel files, and PEP-241 or later for source distribution files. For more information on Python packaging standards, see the PyPA documentation.

Limitations

  • Files such as Python wheel or source distribution on your local development machine cannot be specified directly as a dependency. They must first be uploaded to Unity Catalog volumes.
  • UDF dependencies are not supported for pandas aggregation UDFs over window functions.

Examples

The following example defines PyPI and volumes dependencies in an environment, creates a session with that environment, then defines and calls UDFs that use those dependencies:

Python
from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import udf, col, pandas_udf
from pyspark.sql.types import IntegerType, LongType, StringType
import pandas as pd

pypi_deps = ["pyjokes>=0.8,<1"]

volumes_deps = [
# Example library from: https://pypi.org/project/dice/#files
"dbfs:/Volumes/main/someone@example.com/test/dice-4.0.0-py3-none-any.whl"
# Example library from: https://pypi.org/project/simplejson/#files
"dbfs:/Volumes/main/someone@example.com/test/simplejson-3.19.3.tar.gz",
]

env = DatabricksEnv().withDependencies(pypi_deps).withDependencies(volumes_deps)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

# UDFs
@udf(returnType=StringType())
def get_joke():
from pyjokes import get_joke
return get_joke()

@udf(returnType=IntegerType())
def double_and_json_parse(x):
import simplejson
return simplejson.loads(simplejson.dumps(x * 2))


@pandas_udf(returnType=LongType())
def multiply_and_add_roll(a: pd.Series, b: pd.Series) -> pd.Series:
import dice
return a * b + dice.roll(f"1d10")[0]


df = spark.range(1, 10)
df = df.withColumns({
"joke": get_joke(),
"doubled": double_and_json_parse(col("id")),
"mutliplied_with_roll": multiply_and_add_roll(col("id"), col("doubled"))
})
df.show()

The following example defines a dependency in an environment, creates a session with that environment, then defines and calls a function that uses that dependency in a batch stream:

Python
import time
from databricks.connect import DatabricksSession, DatabricksEnv

env = DatabricksEnv().withDependencies("simplejson==3.19.3")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

def feb_func(batch_df, batch_id):
from simplejson import loads, dumps
assert loads(dumps(batch_id)) == batch_id
batch_df.collect()

df = spark.readStream.format("rate").load()
q = df.writeStream.foreachBatch(feb_func).start()
time.sleep(3)
q.stop()

Python base environment

UDFs are executed on the Databricks compute and not on the client. The base Python environment in which UDFs are executed depends on the Databricks compute chosen.

The base Python environment is the Python environment of the Databricks Runtime version running on the cluster. The Python version and the list of packages in this base environment are found under the System environment and Installed Python libraries sections of the Databricks Runtime release notes.