Skip to main content

Batch Python User-defined functions (UDFs) in Unity Catalog

Preview

This feature is in Public Preview.

Batch Unity Catalog Python UDFs extend the capabilities of Unity Catalog UDFs by allowing you to write Python code to operate on batches of data, significantly improving efficiency by reducing the overhead associated with row-by-row UDFs. These optimizations make Unity Catalog batch Python UDFs ideal for large-scale data processing.

Requirements

Batch Unity Catalog Python UDFs require Databricks Runtime versions 16.3 and above.

Create Batch Unity Catalog Python UDF

Creating a Batch Unity Catalog Python UDF is similar to creating a regular Unity Catalog UDF, with the following additions:

  • PARAMETER STYLE PANDAS: This specifies that the UDF processes data in batches using pandas iterators.
  • HANDLER 'handler_function': This specifies the handler function that is called to process the batches.

The following example shows you how to create a Batch Unity Catalog Python UDF:

Python
%sql
CREATE OR REPLACE TEMPORARY FUNCTION calculate_bmi_pandas(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple

def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for weight_series, height_series in batch_iter:
yield weight_series / (height_series ** 2)
$$;

After registering the UDF, you can call it using SQL or Python:

SQL
SELECT person_id, calculate_bmi_pandas(weight_kg, height_m) AS bmi
FROM (
SELECT 1 AS person_id, CAST(70.0 AS DOUBLE) AS weight_kg, CAST(1.75 AS DOUBLE) AS height_m UNION ALL
SELECT 2 AS person_id, CAST(80.0 AS DOUBLE) AS weight_kg, CAST(1.80 AS DOUBLE) AS height_m
);

Batch UDF handler function

Batch Unity Catalog Python UDFs require a handler function that processes batches and yields results. You must specify the name of the handler function when you create the UDF by using the HANDLER key.

The handler function does the following:

  1. Accepts a iterator argument that iterates over one or more pandas.Series. Each pandas.Series contains the input parameters of the UDF.
  2. Iterates over the generator and processes data.
  3. Returns a generator iterator.

Batch Unity Catalog Python UDFs must return the same number of rows as the input. The handler function ensures this by yielding a pandas.Series with the same length as the input series for each batch.

Install custom dependencies

You can extend the functionality of Batch Unity Catalog Python UDFs beyond the Databricks Runtime environment by defining custom dependencies for external libraries.

See Extend UDFs using custom dependencies.

Batch UDFs can accept single or multiple parameters

Single parameter: When the handler function uses a single input parameter, it receives a iterator that iterates over a pandas.Series for each batch.

Python
%sql
CREATE OR REPLACE TEMPORARY FUNCTION one_parameter_udf(value INT)
RETURNS STRING
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_func'
AS $$
import pandas as pd
from typing import Iterator
def handler_func(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
for value_batch in batch_iter:
d = {"min": value_batch.min(), "max": value_batch.max()}
yield pd.Series([str(d)] * len(value_batch))
$$;
SELECT one_parameter_udf(id), count(*) from range(0, 100000, 3, 8) GROUP BY ALL;

Multiple parameters: For multiple input parameters, the handler function receives an iterator that iterates over multiple pandas.Series. The values in the series are in the same order as the input parameters.

Python
%sql
CREATE OR REPLACE TEMPORARY FUNCTION two_parameter_udf(p1 INT, p2 INT)
RETURNS INT
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple

def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for p1, p2 in batch_iter: # same order as arguments above
yield p1 + p2
$$;
SELECT two_parameter_udf(id , id + 1) from range(0, 100000, 3, 8);

Optimize performance by separating expensive operations

You can optimize computationally expensive operations by separating these operations from the handler function. This ensures that they are executed only once rather than during every iteration over batches of data.

The following example shows how to ensure that an expensive computation is performed only once:

Python
%sql
CREATE OR REPLACE TEMPORARY FUNCTION expensive_computation_udf(value INT)
RETURNS INT
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_func'
AS $$
def compute_value():
# expensive computation...
return 1

expensive_value = compute_value()
def handler_func(batch_iter):
for batch in batch_iter:
yield batch * expensive_value
$$;
SELECT expensive_computation_udf(id), count(*) from range(0, 100000, 3, 8) GROUP BY ALL

Service credentials in Batch Unity Catalog Python UDFs

Batch Unity Catalog Python UDFs can use Unity Catalog Service Credentials to access external cloud services. This is particularly useful for integrating cloud functions like security tokenizers into data processing workflows.

To create a service credential, see Manage access to external cloud services using service credentials.

Specify the service credential you want to use in the CREDENTIALS clause in the UDF definition:

SQL
CREATE OR REPLACE TEMPORARY FUNCTION example_udf(data STRING)
RETURNS STRING
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
CREDENTIALS (
`credential-name` DEFAULT,
`complicated-credential-name` AS short_name,
`simple-cred`,
cred_no_quotes
)
AS $$
# Python code here
$$;

Service credentials permissions

The UDF creator must have ACCESS permission on the Unity Catalog service credential. However, for UDF callers it is sufficient to grant them EXECUTE permission on the UDF. In particular, UDF callers do not need access to the underlying service credential, because the UDF executes using the credential permissions of the UDF creator.

For temporary functions, the creator is always the invoker. UDFs that run in No-PE scope, also known as dedicated clusters, use the caller's permissions instead.

Default credentials and aliases

You can include multiple credentials in the CREDENTIALS clause, but only one can be marked as DEFAULT. You can alias non-default credentials using the AS keyword. Each credential must have a unique alias.

Patched cloud SDKs automatically pick up default credentials. The default credential takes precedence over any default specified in the compute's Spark configuration, and persists in the Unity Catalog UDF definition.

Python
from databricks.service_credentials import getServiceCredentialsProvider
import boto3

# Assuming credential definition: CREDENTIALS(`aws-cred` AS testcred)
boto3_session = boto3.Session(botocore_session=getServiceCredentialsProvider('testcred'))
s3 = boto3_session.client('s3')

Service credential example - AWS Lambda function

The following example uses a service credential to call an AWS Lambda function from a Batch Unity Catalog Python UDF:

Python
%sql
CREATE OR REPLACE FUNCTION main.test.call_lambda_func(data STRING, debug BOOLEAN) RETURNS STRING LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'batchhandler'
CREDENTIALS (
`batch-udf-service-creds-example-cred` DEFAULT
)
AS $$
import boto3
import json
import pandas as pd
import base64
from pyspark.taskcontext import TaskContext


def batchhandler(it):
# Automatically picks up DEFAULT credential:
session = boto3.Session()

client = session.client("lambda", region_name="us-west-2")

# Propagate TaskContext information to lambda context:
user_ctx = {"custom": {"user": TaskContext.get().getLocalProperty("user")}}

for vals, is_debug in it:
payload = json.dumps({"values": vals.to_list(), "is_debug": bool(is_debug[0])})

res = client.invoke(
FunctionName="HashValuesFunction",
InvocationType="RequestResponse",
ClientContext=base64.b64encode(json.dumps(user_ctx).encode("utf-8")).decode(
"utf-8"
),
Payload=payload,
)

response_payload = json.loads(res["Payload"].read().decode("utf-8"))
if "errorMessage" in response_payload:
raise Exception(str(response_payload))

yield pd.Series(response_payload["values"])
$$;

Call the UDF after it is registered:

SQL
SELECT main.test.call_lambda_func(data, false)
FROM VALUES
('abc'),
('def')
AS t(data)

Get task execution context

Use the TaskContext PySpark API to get context information such as user's identity, cluster tags, spark job ID and more. See Get task context in a UDF.

Limitations

  • Python functions must handle NULL values independently, and all type mappings must follow Databricks SQL language mappings.
  • Batch Unity Catalog Python UDFs run in a secure, isolated environment and do not have access to a shared file system or internal services.
  • Multiple UDF invocations within a stage are serialized and intermediate results are materialized and may spill to disk.
  • Service credentials are available only in Batch Unity Catalog Python UDFs. They are not supported in standard Unity Catalog Python UDFs or PySpark UDFs.
  • On dedicated clusters and for temporary functions, the function caller must have ACCESS permissions on the service credentials. See Grant permissions to use a service credential to access an external cloud service.