User-defined functions in Databricks Connect for Python
This article covers Databricks Connect for Databricks Runtime 13.1 and above.
Databricks Connect for Python supports user-defined functions (UDF). When a DataFrame operation that includes UDFs is executed, the UDFs are serialized by Databricks Connect and sent to the server as part of the request.
For information about UDFs for Databricks Connect for Scala, see User-defined functions in Databricks Connect for Scala.
Because the user-defined function is serialized and deserialized, the Python version of the client must match the Python version on the Databricks compute. For supported versions, see the version support matrix.
Define a UDF
To create a UDF in Databricks Connect for Python, use one of the following supported functions:
- PySpark user-defined functions
- PySpark streaming functions
For example, the following Python sets up a simple UDF that squares the values in a column.
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from databricks.connect import DatabricksSession
@udf(returnType=IntegerType())
def double(x):
return x * x
spark = DatabricksSession.builder.getOrCreate()
df = spark.range(1, 2)
df = df.withColumn("doubled", double(col("id")))
df.show()
UDFs with dependencies
This feature is in Public Preview and requires Databricks Connect for Python 16.4 or above, and a cluster running Databricks Runtime 16.4 or above. To use this feature, enable the preview Enhanced Python UDFs in Unity Catalog in your workspace.
Databricks Connect supports specifying Python dependencies that are required for UDFs. These dependencies are installed on Databricks compute as part of the UDF's Python environment.
This feature allows users to specify dependencies that the UDF needs in addition to the packages provided in the base environment. It can also be used to install a different version of the package from what is provided in the base environment.
Dependencies can be installed from the following sources:
- PyPI packages
- PyPI packages can be specified according to PEP 508, for example,
dice
,pyjokes<1
orsimplejson==3.19.*
.
- PyPI packages can be specified according to PEP 508, for example,
- Files stored in Unity Catalog volumes
- Both wheel packages (
.whl
) and gzipped tar files (.tar.gz
) are supported. The user must be grantedREAD_FILE
permission on the file in the re:[UC] volume. - When installing packages from Unity Catalog volumes, to invoke the UDFs, users need
READ VOLUME
permission on the source volume. Granting this permission to all account users automatically enables this for new users. - Unity Catalog volumes files should be specified as
dbfs:<path>
, for example,dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep.whl
ordbfs:/Volumes/users/someone@example.com/tars/my_private_deps.tar.gz
.
- Both wheel packages (
To include custom dependencies in your UDF, specify them in an environment using withDependencies
, then use that environment to create a Spark session. The dependencies are installed on your Databricks compute and will be available in all UDFs that use this Spark session.
The following code declares the PyPI package dice
as a dependency:
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dice==3.1.0")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
Or, to specify a dependency of a wheel in a volume:
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep.whl")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
Behavior in Databricks notebooks and jobs
In notebooks and jobs, UDF dependencies need to be installed directly in the REPL. Databricks Connect validates the REPL Python environment by verifying that all specified dependencies are already installed and throws an exception if any are not installed.
Notebook environment validation occurs for both PyPI and Unity Catalog volume dependencies. Volume dependencies need to be packaged following the standard Python packaging specifications from PEP-427 or later for wheel files, and PEP-241 or later for source distribution files. For more information on Python packaging standards, see the PyPA documentation.
Limitations
- Files such as Python wheel or source distribution on your local development machine cannot be specified directly as a dependency. They must first be uploaded to Unity Catalog volumes.
- UDF dependencies are not supported for pandas aggregation UDFs over window functions.
Examples
The following example defines PyPI and volumes dependencies in an environment, creates a session with that environment, then defines and calls UDFs that use those dependencies:
from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import udf, col, pandas_udf
from pyspark.sql.types import IntegerType, LongType, StringType
import pandas as pd
pypi_deps = ["pyjokes>=0.8,<1"]
volumes_deps = [
# Example library from: https://pypi.org/project/dice/#files
"dbfs:/Volumes/main/someone@example.com/test/dice-4.0.0-py3-none-any.whl"
# Example library from: https://pypi.org/project/simplejson/#files
"dbfs:/Volumes/main/someone@example.com/test/simplejson-3.19.3.tar.gz",
]
env = DatabricksEnv().withDependencies(pypi_deps).withDependencies(volumes_deps)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
# UDFs
@udf(returnType=StringType())
def get_joke():
from pyjokes import get_joke
return get_joke()
@udf(returnType=IntegerType())
def double_and_json_parse(x):
import simplejson
return simplejson.loads(simplejson.dumps(x * 2))
@pandas_udf(returnType=LongType())
def multiply_and_add_roll(a: pd.Series, b: pd.Series) -> pd.Series:
import dice
return a * b + dice.roll(f"1d10")[0]
df = spark.range(1, 10)
df = df.withColumns({
"joke": get_joke(),
"doubled": double_and_json_parse(col("id")),
"mutliplied_with_roll": multiply_and_add_roll(col("id"), col("doubled"))
})
df.show()
The following example defines a dependency in an environment, creates a session with that environment, then defines and calls a function that uses that dependency in a batch stream:
import time
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("simplejson==3.19.3")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
def feb_func(batch_df, batch_id):
from simplejson import loads, dumps
assert loads(dumps(batch_id)) == batch_id
batch_df.collect()
df = spark.readStream.format("rate").load()
q = df.writeStream.foreachBatch(feb_func).start()
time.sleep(3)
q.stop()
Python base environment
UDFs are executed on the Databricks compute and not on the client. The base Python environment in which UDFs are executed depends on the Databricks compute chosen.
The base Python environment is the Python environment of the Databricks Runtime version running on the cluster. The Python version and the list of packages in this base environment are found under the System environment and Installed Python libraries sections of the Databricks Runtime release notes.