User-defined scalar functions - Python
This article contains Python user-defined function (UDF) examples. It shows how to register UDFs, how to invoke UDFs, and provides caveats about evaluation order of subexpressions in Spark SQL.
In Databricks Runtime 14.0 and above, you can use Python user-defined table functions (UDTFs) to register functions that return entire relations instead of scalar values. See Python user-defined table functions (UDTFs).
Note
In Databricks Runtime 12.2 LTS and below, Python UDFs and Pandas UDFs are not supported on Unity Catalog compute that uses shared access mode. Scalar Python UDFs and Pandas UDFs are supported in Databricks Runtime 13.3 LTS and above for all access modes.
In Databricks Runtime 13.3 LTS and above, you can register scalar Python UDFs to Unity Catalog using SQL syntax. See User-defined functions (UDFs) in Unity Catalog.
Graviton instance support for Python UDFs on Unity Catalog-enabled clusters is available on Databricks Runtime 15.2 and above.
Important
UDFs and UDAFs are supported on Graviton clusters configured with shared access mode and Unity Catalog in Databricks Runtime 15.2 and above.
Register a function as a UDF
def squared(s):
return s * s
spark.udf.register("squaredWithPython", squared)
You can optionally set the return type of your UDF. The default return type is StringType
.
from pyspark.sql.types import LongType
def squared_typed(s):
return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())
Call the UDF in Spark SQL
spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test
Use UDF with DataFrames
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
Alternatively, you can declare the same UDF using annotation syntax:
from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
Evaluation order and null checking
Spark SQL (including SQL and the DataFrame and Dataset API) does not guarantee the order of
evaluation of subexpressions. In particular, the inputs of an operator or function are not
necessarily evaluated left-to-right or in any other fixed order. For example, logical AND
and OR
expressions do not have left-to-right “short-circuiting” semantics.
Therefore, it is dangerous to rely on the side effects or order of evaluation of Boolean
expressions, and the order of WHERE
and HAVING
clauses, since such expressions and clauses can be
reordered during query optimization and planning. Specifically, if a UDF relies on short-circuiting semantics in SQL for null checking, there’s no
guarantee that the null check will happen before invoking the UDF. For example,
spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee
This WHERE
clause does not guarantee the strlen
UDF to be invoked after filtering out nulls.
To perform proper null checking, we recommend that you do either of the following:
Make the UDF itself null-aware and do null checking inside the UDF itself
Use
IF
orCASE WHEN
expressions to do the null check and invoke the UDF in a conditional branch
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") // ok
Limitations
The following limitations apply to PySpark UDFs:
Module import restrictions: PySpark UDFs on shared clusters and serverless compute cannot access Git folders, workspace files, or Unity Catalog Volumes to import modules on Databricks Runtime 14.2 and below.
Broadcast variables: PySpark UDFs on shared clusters and serverless compute do not support broadcast variables.
Instance profiles: PySpark UDFs on shared clusters and serverless compute do not support instance profiles.
Memory limit: PySpark UDFs on serverless compute have a memory limit of 1GB per PySpark UDF. Exceeding this limit results in the following error:
[UDF_PYSPARK_ERROR.OOM] Python worker exited unexpectedly (crashed) due to running out of memory.