What are user-defined functions (UDFs)?
User-defined functions (UDFs) allow you to reuse and share code that extends built-in functionality on Databricks. Use UDFs to perform specific tasks like complex calculations, transformations, or custom data manipulations.
When to use a UDF vs. Apache Spark function?
Use UDFs for logic that is difficult to express with built-in Apache Spark functions. Built-in Apache Spark functions are optimized for distributed processing and offer better performance at scale. For more information, see Functions.
Databricks recommends UDFs for ad hoc queries, manual data cleansing, exploratory data analysis, and operations on small to medium-sized datasets. Common use cases for UDFs include data encryption, decryption, hashing, JSON parsing, and validation.
Use Apache Spark methods for operations on very large datasets and any workloads run regularly or continuously, including ETL jobs and streaming operations.
Understand UDF types
Select a UDF type from the following tabs to see a description, example, and a link to learn more.
- Scalar UDF
- Batch Scalar UDFs
- Non-Scalar UDFs
- UDAF
- UDTFs
Scalar UDFs operate on a single row and return a single result value for each row. They can be Unity Catalog governed or session-scoped.
The following example uses a scalar UDF to calculate the length of each name in a name
column and add the value in a new column name_length
.
+-------+-------+
| name | score |
+-------+-------+
| alice | 10.0 |
| bob | 20.0 |
| carol | 30.0 |
| dave | 40.0 |
| eve | 50.0 |
+-------+-------+
-- Create a SQL UDF for name length
CREATE OR REPLACE FUNCTION main.test.get_name_length(name STRING)
RETURNS INT
RETURN LENGTH(name);
-- Use the UDF in a SQL query
SELECT name, main.test.get_name_length(name) AS name_length
FROM your_table;
+-------+-------+-------------+
| name | score | name_length |
+-------+-------+-------------+
| alice | 10.0 | 5 |
| bob | 20.0 | 3 |
| carol | 30.0 | 5 |
| dave | 40.0 | 4 |
| eve | 50.0 | 3 |
+-------+-------+-------------+
To implement this in a Databricks notebook using PySpark:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
@udf(returnType=IntegerType())
def get_name_length(name):
return len(name)
df = df.withColumn("name_length", get_name_length(df.name))
# Show the result
display(df)
See User-defined functions (UDFs) in Unity Catalog and User-defined scalar functions - Python.
Process data in batches while maintaining 1:1 input/output row parity. This reduces the overhead of row-by-row operations for large-scale data processing. Batch UDFs also maintain state between batches to run more efficiently, reuse resources, and handle complex calculations that need context across data chunks.
They can be Unity Catalog governed or session-scoped.
The following Batch Unity Catalog Python UDF calculates BMI while processing batches of rows:
+-------------+-------------+
| weight_kg | height_m |
+-------------+-------------+
| 90 | 1.8 |
| 77 | 1.6 |
| 50 | 1.5 |
+-------------+-------------+
%sql
CREATE OR REPLACE FUNCTION main.test.calculate_bmi_pandas(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple
def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for weight_series, height_series in batch_iter:
yield weight_series / (height_series ** 2)
$$;
select main.test.calculate_bmi_pandas(cast(70 as double), cast(1.8 as double));
+--------+
| BMI |
+--------+
| 27.8 |
| 30.1 |
| 22.2 |
+--------+
See User-defined functions (UDFs) in Unity Catalog and Batch Python User-defined functions (UDFs) in Unity Catalog.
Non-scalar UDFs operate on entire datasets/columns with flexible input/output ratios (1:N or many:many).
Session-scoped batch pandas UDFs can be of the following types:
- Series to Series
- Iterator of Series to Iterator of Series
- Iterator of multiple Series to Iterator of Series
- Series to scalar
The following is an example of a Series to Series pandas UDF.
from pyspark.sql.functions import pandas_udf
import pandas as pd
df = spark.createDataFrame([(70, 1.75), (80, 1.80), (60, 1.65)], ["Weight", "Height"])
@pandas_udf("double")
def calculate_bmi_pandas(weight: pd.Series, height: pd.Series) -> pd.Series:
return weight / (height ** 2)
df.withColumn("BMI", calculate_bmi_pandas(df["Weight"], df["Height"])).display()
UDAFs operate on multiple rows and return a single aggregated result. UDAFs are session-scoped only.
The following UDAF example aggregates scores by name length.
from pyspark.sql.functions import pandas_udf
from pyspark.sql import SparkSession
import pandas as pd
# Define a pandas UDF for aggregating scores
@pandas_udf("int")
def total_score_udf(scores: pd.Series) -> int:
return scores.sum()
# Group by name length and aggregate
result_df = (df.groupBy("name_length")
.agg(total_score_udf(df["score"]).alias("total_score")))
display(result_df)
+-------------+-------------+
| name_length | total_score |
+-------------+-------------+
| 3 | 70.0 |
| 4 | 40.0 |
| 5 | 40.0 |
+-------------+-------------+
See pandas user-defined functions for Python and User-defined aggregate functions - Scala.
A UDTF takes one or more input arguments and returns multiple rows (and possibly multiple columns) for each input row. UDTFs are session-scoped only.
In the following example, each value in the score column corresponds to a list of categories. The UDTF splits the comma-separated list into multiple rows.
+-------+-------+-----------------+
| name | score | categories |
+-------+-------+-----------------+
| alice | 10.0 | math,science |
| bob | 20.0 | history,math |
| carol | 30.0 | science,history |
| dave | 40.0 | math,art |
| eve | 50.0 | science,art |
+-------+-------+-----------------+
from pyspark.sql.functions import udtf
@udtf(returnType="name: string, score: int, category: string")
class ScoreCategoriesUDTF:
def eval(self, name: str, score: int, categories: str):
category_list = categories.split(',')
for category in category_list:
yield (name, score, category)
ScoreCategoriesUDTF(lit("Alice"), lit(85), lit("Math,Science,English")).display()
+-------+-------+----------+
| name | score | category |
+-------+-------+----------+
| alice | 10.0 | math |
| alice | 10.0 | science |
| bob | 20.0 | history |
| bob | 20.0 | math |
| carol | 30.0 | science |
| carol | 30.0 | history |
| dave | 40.0 | math |
| dave | 40.0 | art |
| eve | 50.0 | science |
| eve | 50.0 | art |
+-------+-------+----------+
Unity Catalog governed vs. session scoped UDFs
Unity Catalog Python UDFs and Batch Unity Catalog Python UDFs are persisted in Unity Catalog for improved governance, reuse, and discoverability. All other UDFs are session-based, which means they are defined in a notebook or job and are scoped to the current SparkSession. You can define and access session-scoped UDFs using Scala or Python.
Unity Catalog governed UDFs cheat sheet
Unity Catalog governed UDFs allow custom functions to be defined, used, securely shared, and governed across computing environments. See User-defined functions (UDFs) in Unity Catalog.
UDF type | Supported compute | Description |
---|---|---|
Unity Catalog Python UDF |
| Define a UDF in Python and register it in Unity Catalog for governance. Scalar UDFs operate on a single row and return a single result value for each row. |
Batch Unity Catalog Python UDF |
| Define a UDF in Python and register it in Unity Catalog for governance. Batch operations on multiple values and return multiple values. Reduces overhead of row-by-row operations for large-scale data processing. |
Session scoped UDFs cheat sheet for user-isolated compute
Session scoped UDFs are defined in a notebook or job and are scoped to the current SparkSession. You can define and access session-scoped UDFs using Scala or Python.
UDF type | Supported compute | Description |
---|---|---|
Python scalar |
| Scalar UDFs operate on a single row and return a single result value for each row. |
Python non-scalar |
| Non-scalar UDFs include |
Python UDTFs |
| A UDTF takes one or more input arguments and returns multiple rows (and possibly multiple columns) for each input row. |
Scala scalar UDFs |
| Scalar UDFs operate on a single row and return a single result value for each row. |
Scala UDAFs |
| UDAFs operate on multiple rows and return a single aggregated result. |
Performance considerations
-
Built-in functions and SQL UDFs are the most efficient options.
-
Scala UDFs are generally faster than Python UDFs.
- Unisolated Scala UDFs run in the Java Virtual Machine (JVM), so they avoid the overhead of moving data in and out of the JVM.
- Isolated Scala UDFs have to move data in and out of the JVM, but they can still be faster than Python UDFs because they handle memory more efficiently.
-
Python UDFs and pandas UDFs tend to be slower than Scala UDFs because they need to serialize data and moved it out of the JVM to the Python interpreter.
- Pandas UDFs are up to 100x faster than Python UDFs because they use Apache Arrow to reduce serialization costs.