Skip to main content

User-defined functions in Databricks Connect for Python

note

This article covers Databricks Connect for Databricks Runtime 13.3 and above.

Databricks Connect for Python supports user-defined functions (UDF). When a DataFrame operation that includes UDFs is executed, the UDFs are serialized by Databricks Connect and sent to the server as part of the request.

For information about UDFs for Databricks Connect for Scala, see User-defined functions in Databricks Connect for Scala.

note

Because the user-defined function is serialized and deserialized, the Python version of the client must match the Python version on the Databricks compute. For supported versions, see the version support matrix.

Define a UDF

To create a UDF in Databricks Connect for Python, use one of the following supported functions:

For example, the following Python sets up a simple UDF that squares the values in a column.

Python
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from databricks.connect import DatabricksSession

@udf(returnType=IntegerType())
def double(x):
return x * x

spark = DatabricksSession.builder.getOrCreate()

df = spark.range(1, 2)
df = df.withColumn("doubled", double(col("id")))

df.show()

Manage UDF dependencies

Preview

This feature is in Public Preview and requires Databricks Connect for Python 16.4 or above, and a cluster running Databricks Runtime 16.4 or above. To use this feature, enable the preview Enhanced Python UDFs in Unity Catalog in your workspace.

Databricks Connect supports specifying Python dependencies that are required for UDFs. These dependencies are installed on Databricks compute as part of the UDF's Python environment.

This feature allows users to specify dependencies that the UDF needs in addition to the packages provided in the base environment. It can also be used to install a different version of the package from what is provided in the base environment.

Dependencies can be installed from the following sources:

  • PyPI packages
    • PyPI packages can be specified according to PEP 508, for example, dice, pyjokes<1 or simplejson==3.19.*.
  • Files stored in Unity Catalog volumes
    • Both wheel packages (.whl) and gzipped tar files (.tar.gz) are supported. The user must be granted READ_FILE permission on the file in the re:[UC] volume.
    • When installing packages from Unity Catalog volumes, to invoke the UDFs, users need READ VOLUME permission on the source volume. Granting this permission to all account users automatically enables this for new users.
    • Unity Catalog volumes files should be specified as dbfs:<path>, for example, dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep.whl or dbfs:/Volumes/users/someone@example.com/tars/my_private_deps.tar.gz.

To include custom dependencies in your UDF, specify them in an environment using withDependencies, then use that environment to create a Spark session. The dependencies are installed on your Databricks compute and will be available in all UDFs that use this Spark session.

The following code declares the PyPI package dice as a dependency:

Python
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dice==3.1.0")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

Or, to specify a dependency of a wheel in a volume:

Python
from databricks.connect import DatabricksSession, DatabricksEnv

env = DatabricksEnv().withDependencies("dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep.whl")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

Behavior in Databricks notebooks and jobs

In notebooks and jobs, UDF dependencies need to be installed directly in the REPL. Databricks Connect validates the REPL Python environment by verifying that all specified dependencies are already installed and throws an exception if any are not installed.

Notebook environment validation occurs for both PyPI and Unity Catalog volume dependencies. Volume dependencies need to be packaged following the standard Python packaging specifications from PEP-427 or later for wheel files, and PEP-241 or later for source distribution files. For more information on Python packaging standards, see the PyPA documentation.

Limitations

  • Files such as Python wheel or source distribution on your local development machine cannot be specified directly as a dependency. They must first be uploaded to Unity Catalog volumes.
  • UDF dependencies support for pyspark.sql.streaming.DataStreamWriter.foreach requires Databricks Connect for Python 18.0 or above, and a cluster running Databricks Runtime 18.0 or above.
  • UDF dependencies support for pyspark.sql.streaming.DataStreamWriter.foreachBatch requires Databricks Connect for Python 18.0 or above, and a cluster running Databricks Runtime 18.0 or above. The feature is not supported on serverless.
  • UDF dependencies are not supported for pandas aggregation UDFs over window functions.

Examples

The following example defines PyPI and volumes dependencies in an environment, creates a session with that environment, then defines and calls UDFs that use those dependencies:

Python
from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import udf, col, pandas_udf
from pyspark.sql.types import IntegerType, LongType, StringType
import pandas as pd

pypi_deps = ["pyjokes>=0.8,<1"]

volumes_deps = [
# Example library from: https://pypi.org/project/dice/#files
"dbfs:/Volumes/main/someone@example.com/test/dice-4.0.0-py3-none-any.whl"
# Example library from: https://pypi.org/project/simplejson/#files
"dbfs:/Volumes/main/someone@example.com/test/simplejson-3.19.3.tar.gz",
]

env = DatabricksEnv().withDependencies(pypi_deps).withDependencies(volumes_deps)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

# UDFs
@udf(returnType=StringType())
def get_joke():
from pyjokes import get_joke
return get_joke()

@udf(returnType=IntegerType())
def double_and_json_parse(x):
import simplejson
return simplejson.loads(simplejson.dumps(x * 2))


@pandas_udf(returnType=LongType())
def multiply_and_add_roll(a: pd.Series, b: pd.Series) -> pd.Series:
import dice
return a * b + dice.roll(f"1d10")[0]


df = spark.range(1, 10)
df = df.withColumns({
"joke": get_joke(),
"doubled": double_and_json_parse(col("id")),
"mutliplied_with_roll": multiply_and_add_roll(col("id"), col("doubled"))
})
df.show()

Automatic management of UDF dependencies

Preview

This feature is in Public Preview and requires Databricks Connect for Python 18.1 or above, Python 3.12 on your local machine, and a cluster running Databricks Runtime 18.1 or above. To use this feature, enable the preview Enhanced Python UDFs in Unity Catalog in your workspace.

The Databricks Connect withAutoDependencies() API enables automatic discovery and upload of local modules and public PyPI dependencies used in the import statements in your UDFs. It removes the need to manually specify dependencies.

The following code enables automatic dependency management:

Python
from databricks.connect import DatabricksSession, DatabricksEnv

env = DatabricksEnv().withAutoDependencies(upload_local=True, use_index=True)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

The withAutoDependencies() method accepts the following parameters:

  • upload_local: When set to True, local modules imported in your UDFs are automatically discovered, packaged, and uploaded to UDF sandbox.
  • use_index: When set to True, public PyPI dependencies used in your UDFs are automatically discovered and installed on Databricks compute. The discovery process uses the installed packages on your local machine to determine versions, ensuring consistency between your local environment and the remote execution environment.

Limitations

  • Dynamic imports (for example, importlib.import_module("foo")) are not supported.
  • Namespace packages (for example, azure.eventhub and google.cloud.aiplatform) are not supported.
  • Dependencies installed using direct-URL references are not supported. This includes those installed from local wheel files.
  • Dependencies installed from private package indices are not supported. Packages installed this way can't be distinguished from packages installed from the public PyPI.
  • Dependency discovery doesn't work in a Python shell. Only Python scripts, IPython shell and Jupyter Notebooks are supported.

Examples

The following example demonstrates automatic dependency management with both local modules and PyPI packages. This example requires that you have installed simplejson and dice (using pip install simplejson dice).

First, create local helper modules:

Python
# my_helper.py
def double(x):
return 2 * x
Python
# my_json.py
import simplejson

def loads(x):
return simplejson.loads(x)

def dumps(x):
return simplejson.dumps(x)

Then, in your main script, import these modules and use them in UDFs:

Python
# main.py
import dice as dc
from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType, FloatType

import my_json
from my_helper import double

env = DatabricksEnv().withAutoDependencies(upload_local=True, use_index=True)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

@udf(returnType=IntegerType())
def double_and_json_parse(x):
return my_json.loads(my_json.dumps(double(x)))

@udf(returnType=FloatType())
def sum_and_add_noise(x, y):
return x + y + (dc.roll("d6")[0] / 6)

df = spark.range(1, 10)
df = df.withColumns({
"doubled": double_and_json_parse(col("id")),
"summed_with_noise": sum_and_add_noise(col("id"), col("doubled")),
})
df.show()

Logging

To output discovered dependencies, set the SPARK_CONNECT_LOG_LEVEL environment variable to info or debug. Alternatively, configure the Python logging framework:

Python
import logging
logging.basicConfig(level=logging.INFO)

The relevant logs are emitted by the databricks.connect.auto_dependencies module, for example:

DEBUG:databricks.connect.auto_dependencies.discovery:Discovered local module: my_json
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered local module: my_helper
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered distribution: simplejson for module simplejson
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered distribution: dice for module dice
INFO:databricks.connect.auto_dependencies.hook:Synced zip artifact for: my_json
INFO:databricks.connect.auto_dependencies.hook:Synced zip artifact for: my_helper
INFO:databricks.connect.auto_dependencies.hook:Updated simplejson with auto-detected version ==3.20.2
INFO:databricks.connect.auto_dependencies.hook:Updated dice with auto-detected version ==4.0.0

Python base environment

UDFs are executed on the Databricks compute and not on the client. The base Python environment in which UDFs are executed depends on the Databricks compute.

For clusters, the base Python environment is the Python environment of the Databricks Runtime version running on the cluster. The Python version and the list of packages in this base environment are found under the System environment and Installed Python libraries sections of the Databricks Runtime release notes.

For serverless compute, the base Python environment corresponds to the serverless environment version according to the following table.

Databricks Connect version

UDF serverless environment

17.0 to 17.3, Python 3.12

Version 4

16.4.1 to below 17, Python 3.12

Version 3

15.4.10 to below 16, Python 3.12

Version 3

15.4.10 to below 16, Python 3.11

Version 2

15.4.0 to 15.4.9 and 16.0 to 16.3

Latest serverless compute. Please migrate to 15.4.10 LTS and above or 16.4.1 LTS and above for a stable Python environment.