Batch Python User-defined functions (UDFs) in Unity Catalog
This feature is in Public Preview.
Batch Unity Catalog Python UDFs extend the capabilities of Unity Catalog UDFs by allowing you to write Python code to operate on batches of data, significantly improving efficiency by reducing the overhead associated with row-by-row UDFs. These optimizations make Unity Catalog batch Python UDFs ideal for large-scale data processing.
Requirements
Batch Unity Catalog Python UDFs require Databricks Runtime versions 16.3 and above.
Create Batch Unity Catalog Python UDF
Creating a Batch Unity Catalog Python UDF is similar to creating a regular Unity Catalog UDF, with the following additions:
PARAMETER STYLE PANDAS
: This specifies that the UDF processes data in batches using pandas iterators.HANDLER 'handler_function'
: This specifies the handler function that is called to process the batches.
The following example shows you how to create a Batch Unity Catalog Python UDF:
%sql
CREATE OR REPLACE TEMPORARY FUNCTION calculate_bmi_pandas(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple
def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for weight_series, height_series in batch_iter:
yield weight_series / (height_series ** 2)
$$;
After registering the UDF, you can call it using SQL or Python:
SELECT person_id, calculate_bmi_pandas(weight_kg, height_m) AS bmi
FROM (
SELECT 1 AS person_id, CAST(70.0 AS DOUBLE) AS weight_kg, CAST(1.75 AS DOUBLE) AS height_m UNION ALL
SELECT 2 AS person_id, CAST(80.0 AS DOUBLE) AS weight_kg, CAST(1.80 AS DOUBLE) AS height_m
);
Batch UDF handler function
Batch Unity Catalog Python UDFs require a handler function that processes batches and yields results. You must specify the name of the handler function when you create the UDF by using the HANDLER
key.
The handler function does the following:
- Accepts a iterator argument that iterates over one or more
pandas.Series
. Eachpandas.Series
contains the input parameters of the UDF. - Iterates over the generator and processes data.
- Returns a generator iterator.
Batch Unity Catalog Python UDFs must return the same number of rows as the input. The handler function ensures this by yielding a pandas.Series
with the same length as the input series for each batch.
Install custom dependencies
You can extend the functionality of Batch Unity Catalog Python UDFs beyond the Databricks Runtime environment by defining custom dependencies for external libraries.
See Extend UDFs using custom dependencies.
Batch UDFs can accept single or multiple parameters
Single parameter: When the handler function uses a single input parameter, it receives a iterator that iterates over a pandas.Series
for each batch.
%sql
CREATE OR REPLACE TEMPORARY FUNCTION one_parameter_udf(value INT)
RETURNS STRING
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_func'
AS $$
import pandas as pd
from typing import Iterator
def handler_func(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
for value_batch in batch_iter:
d = {"min": value_batch.min(), "max": value_batch.max()}
yield pd.Series([str(d)] * len(value_batch))
$$;
SELECT one_parameter_udf(id), count(*) from range(0, 100000, 3, 8) GROUP BY ALL;
Multiple parameters: For multiple input parameters, the handler function receives an iterator that iterates over multiple pandas.Series
. The values in the series are in the same order as the input parameters.
%sql
CREATE OR REPLACE TEMPORARY FUNCTION two_parameter_udf(p1 INT, p2 INT)
RETURNS INT
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple
def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for p1, p2 in batch_iter: # same order as arguments above
yield p1 + p2
$$;
SELECT two_parameter_udf(id , id + 1) from range(0, 100000, 3, 8);
Optimize performance by separating expensive operations
You can optimize computationally expensive operations by separating these operations from the handler function. This ensures that they are executed only once rather than during every iteration over batches of data.
The following example shows how to ensure that an expensive computation is performed only once:
%sql
CREATE OR REPLACE TEMPORARY FUNCTION expensive_computation_udf(value INT)
RETURNS INT
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_func'
AS $$
def compute_value():
# expensive computation...
return 1
expensive_value = compute_value()
def handler_func(batch_iter):
for batch in batch_iter:
yield batch * expensive_value
$$;
SELECT expensive_computation_udf(id), count(*) from range(0, 100000, 3, 8) GROUP BY ALL
Service credentials in Batch Unity Catalog Python UDFs
Batch Unity Catalog Python UDFs can use Unity Catalog Service Credentials to access external cloud services. This is particularly useful for integrating cloud functions like security tokenizers into data processing workflows.
To create a service credential, see Manage access to external cloud services using service credentials.
Specify the service credential you want to use in the CREDENTIALS
clause in the UDF definition:
CREATE OR REPLACE TEMPORARY FUNCTION example_udf(data STRING)
RETURNS STRING
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
CREDENTIALS (
`credential-name` DEFAULT,
`complicated-credential-name` AS short_name,
`simple-cred`,
cred_no_quotes
)
AS $$
# Python code here
$$;
Service credentials permissions
The UDF creator must have ACCESS
permission on the Unity Catalog service credential. However, for UDF callers it is sufficient to grant them EXECUTE
permission on the UDF. In particular, UDF callers do not need access to the underlying service credential, because the UDF executes using the credential permissions of the UDF creator.
For temporary functions, the creator is always the invoker. UDFs that run in No-PE scope, also known as dedicated clusters, use the caller's permissions instead.
Default credentials and aliases
You can include multiple credentials in the CREDENTIALS
clause, but only one can be marked as DEFAULT
. You can alias non-default credentials using the AS
keyword. Each credential must have a unique alias.
Patched cloud SDKs automatically pick up default credentials. The default credential takes precedence over any default specified in the compute's Spark configuration, and persists in the Unity Catalog UDF definition.
from databricks.service_credentials import getServiceCredentialsProvider
import boto3
# Assuming credential definition: CREDENTIALS(`aws-cred` AS testcred)
boto3_session = boto3.Session(botocore_session=getServiceCredentialsProvider('testcred'))
s3 = boto3_session.client('s3')
Service credential example - AWS Lambda function
The following example uses a service credential to call an AWS Lambda function from a Batch Unity Catalog Python UDF:
%sql
CREATE OR REPLACE FUNCTION main.test.call_lambda_func(data STRING, debug BOOLEAN) RETURNS STRING LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'batchhandler'
CREDENTIALS (
`batch-udf-service-creds-example-cred` DEFAULT
)
AS $$
import boto3
import json
import pandas as pd
import base64
from pyspark.taskcontext import TaskContext
def batchhandler(it):
# Automatically picks up DEFAULT credential:
session = boto3.Session()
client = session.client("lambda", region_name="us-west-2")
# Propagate TaskContext information to lambda context:
user_ctx = {"custom": {"user": TaskContext.get().getLocalProperty("user")}}
for vals, is_debug in it:
payload = json.dumps({"values": vals.to_list(), "is_debug": bool(is_debug[0])})
res = client.invoke(
FunctionName="HashValuesFunction",
InvocationType="RequestResponse",
ClientContext=base64.b64encode(json.dumps(user_ctx).encode("utf-8")).decode(
"utf-8"
),
Payload=payload,
)
response_payload = json.loads(res["Payload"].read().decode("utf-8"))
if "errorMessage" in response_payload:
raise Exception(str(response_payload))
yield pd.Series(response_payload["values"])
$$;
Call the UDF after it is registered:
SELECT main.test.call_lambda_func(data, false)
FROM VALUES
('abc'),
('def')
AS t(data)
Get task execution context
Use the TaskContext PySpark API to get context information such as user's identity, cluster tags, spark job ID and more. See Get task context in a UDF.
Limitations
- Python functions must handle
NULL
values independently, and all type mappings must follow Databricks SQL language mappings. - Batch Unity Catalog Python UDFs run in a secure, isolated environment and do not have access to a shared file system or internal services.
- Multiple UDF invocations within a stage are serialized and intermediate results are materialized and may spill to disk.
- Service credentials are available only in Batch Unity Catalog Python UDFs. They are not supported in standard Unity Catalog Python UDFs or PySpark UDFs.
- On dedicated clusters and for temporary functions, the function caller must have
ACCESS
permissions on the service credentials. See Grant permissions to use a service credential to access an external cloud service.