Python user-defined table functions (UDTFs) in Unity Catalog
Registering Python UDTFs in Unity Catalog is in Public Preview.
A Unity Catalog user-defined table function (UDTF) registers functions that return complete tables instead of scalar values. Unlike scalar functions that return a single result value from each call, UDTFs are invoked in a SQL statement's FROM clause and can return multiple rows and columns.
UDTFs are particularly useful for:
- Transforming arrays or complex data structures into multiple rows
- Integrating external APIs or services into SQL workflows
- Implementing custom data generation or enrichment logic
- Processing data that requires stateful operations across rows
Each UDTF call accepts zero or more arguments. These arguments can be scalar expressions or table arguments representing entire input tables.
UDTFs can be registered in two ways:
- Unity Catalog: Register the UDTF as a governed object in Unity Catalog.
- Session-scoped: Register to the local
SparkSession, isolated to the current notebook or job. See Python user-defined table functions (UDTFs).
Requirements
Unity Catalog Python UDTFs are supported on the following compute types:
- Classic compute with standard access mode (Databricks Runtime 17.1 and above)
- SQL warehouse (serverless or pro)
Create a UDTF in Unity Catalog
Use SQL DDL to create a governed UDTF in Unity Catalog. UDTFs are invoked using a SQL statement's FROM clause.
CREATE OR REPLACE FUNCTION square_numbers(start INT, end INT)
RETURNS TABLE (num INT, squared INT)
LANGUAGE PYTHON
HANDLER 'SquareNumbers'
DETERMINISTIC
AS $$
class SquareNumbers:
"""
Basic UDTF that computes a sequence of integers
and includes the square of each number in the range.
"""
def eval(self, start: int, end: int):
for num in range(start, end + 1):
yield (num, num * num)
$$;
SELECT * FROM square_numbers(1, 5);
+-----+---------+
| num | squared |
+-----+---------+
| 1 | 1 |
| 2 | 4 |
| 3 | 9 |
| 4 | 16 |
| 5 | 25 |
+-----+---------+
Databricks implements Python UDTFs as Python classes with a mandatory eval method that yields output rows.
Table arguments
TABLE arguments are supported in Databricks Runtime 17.2 and above.
UDTFs can accept entire tables as input arguments, enabling complex stateful transformations and aggregations.
eval() and terminate() lifecycle methods
Table arguments in UDTFs make use of the following functions to process each row:
eval(): Called once for each row in the input table. This is the main processing method and is required.terminate(): Called once at the end of each partition, after all rows have been processed byeval(). Use this method to yield final aggregated results or perform cleanup operations. This method is optional but essential for stateful operations like aggregations, counting, or batch processing.
For more information about eval() and terminate() methods, see Apache Spark documentation: Python UDTF.
Row access patterns
eval() receives rows from TABLE arguments as pyspark.sql.Row objects. You can access values by column name (row['id'], row['name']) or by index (row[0], row[1]).
- Schema flexibility: Declare TABLE arguments without schema definitions (for example,
data TABLE,t TABLE). The function accepts any table structure, so your code should validate that required columns exist.
See Example: Match IP addresses against CIDR network blocks and Example: Batch image captioning using Databricks vision endpoints.
Environment isolation
Shared isolation environments require Databricks Runtime 17.2 and above. In earlier versions, all Unity Catalog Python UDTFs run in strict isolation mode.
Unity Catalog Python UDTFs with the same owner and session can share an isolation environment by default. This improves performance and reduces memory usage by reducing the number of separate environments that need to be launched.
Strict isolation
To ensure a UDTF always runs in its own, fully isolated environment, add the STRICT ISOLATION characteristic clause.
Most UDTFs don't need strict isolation. Standard data processing UDTFs benefit from the default shared isolation environment and run faster with lower memory consumption.
Add the STRICT ISOLATION characteristic clause to UDTFs that:
- Run input as code using
eval(),exec(), or similar functions. - Write files to the local file system.
- Modify global variables or system state.
- Access or modify environment variables.
The following UDTF example sets a custom environment variable, reads the variable back, and multiplies a set of numbers using the variable. Because the UDTF mutates the process environment, run it in STRICT ISOLATION. Otherwise, it could leak or override environment variables for other UDFs/UDTFs in the same environment, causing incorrect behavior.
CREATE OR REPLACE TEMPORARY FUNCTION multiply_numbers(factor STRING)
RETURNS TABLE (original INT, scaled INT)
LANGUAGE PYTHON
STRICT ISOLATION
HANDLER 'Multiplier'
AS $$
import os
class Multiplier:
def eval(self, factor: str):
# Save the factor as an environment variable
os.environ["FACTOR"] = factor
# Read it back and convert it to a number
scale = int(os.getenv("FACTOR", "1"))
# Multiply 0 through 4 by the factor
for i in range(5):
yield (i, i * scale)
$$;
SELECT * FROM multiply_numbers("3");
Set DETERMINISTIC if your function produces consistent results
Add DETERMINISTIC to your function definition if it produces the same outputs for the same inputs. This allows query optimizations to improve performance.
By default, Batch Unity Catalog Python UDTFs are assumed to be non-deterministic unless explicitly declared. Examples of non-deterministic functions include: generating random values, accessing current times or dates, or making external API calls.
See CREATE FUNCTION (SQL and Python).
Practical examples
The following examples demonstrate real-world use cases for Unity Catalog Python UDTFs, progressing from simple data transformations to complex external integrations.
Example: Re-implementing explode
While Spark provides a built-in explode function, creating your own version demonstrates the fundamental UDTF pattern of taking a single input and producing multiple output rows.
CREATE OR REPLACE FUNCTION my_explode(arr ARRAY<STRING>)
RETURNS TABLE (element STRING)
LANGUAGE PYTHON
HANDLER 'MyExplode'
DETERMINISTIC
AS $$
class MyExplode:
def eval(self, arr):
if arr is None:
return
for element in arr:
yield (element,)
$$;
Use the function directly in a SQL query:
SELECT element FROM my_explode(array('apple', 'banana', 'cherry'));
+---------+
| element |
+---------+
| apple |
| banana |
| cherry |
+---------+
Or apply it to existing table data with a LATERAL join:
SELECT s.*, e.element
FROM my_items AS s,
LATERAL my_explode(s.items) AS e;
Example: IP address geolocation via REST API
This example demonstrates how UDTFs can integrate external APIs directly into your SQL workflow. Analysts can enrich data with real-time API calls using familiar SQL syntax, without requiring separate ETL processes.
CREATE OR REPLACE FUNCTION ip_to_location(ip_address STRING)
RETURNS TABLE (city STRING, country STRING)
LANGUAGE PYTHON
HANDLER 'IPToLocationAPI'
AS $$
class IPToLocationAPI:
def eval(self, ip_address):
import requests
api_url = f"https://api.ip-lookup.example.com/{ip_address}"
try:
response = requests.get(api_url)
response.raise_for_status()
data = response.json()
yield (data.get('city'), data.get('country'))
except requests.exceptions.RequestException as e:
# Return nothing if the API request fails
return
$$;
Python UDTFs allow TCP/UDP network traffic over ports 80, 443, and 53 when using serverless compute or compute configured with standard access mode.
Use the function to enrich web log data with geographic information:
SELECT
l.timestamp,
l.request_path,
geo.city,
geo.country
FROM web_logs AS l,
LATERAL ip_to_location(l.ip_address) AS geo;
This approach enables real-time geographic analysis without requiring pre-processed lookup tables or separate data pipelines. The UDTF handles HTTP requests, JSON parsing, and error handling, making external data sources accessible through standard SQL queries.
Example: Match IP addresses against CIDR network blocks
This example demonstrates matching IP addresses against CIDR network blocks, a common data engineering task that requires complex SQL logic.
First, create sample data with both IPv4 and IPv6 addresses:
-- An example IP logs with both IPv4 and IPv6 addresses
CREATE OR REPLACE TEMPORARY VIEW ip_logs AS
VALUES
('log1', '192.168.1.100'),
('log2', '10.0.0.5'),
('log3', '172.16.0.10'),
('log4', '8.8.8.8'),
('log5', '2001:db8::1'),
('log6', '2001:db8:85a3::8a2e:370:7334'),
('log7', 'fe80::1'),
('log8', '::1'),
('log9', '2001:db8:1234:5678::1')
t(log_id, ip_address);
Next, define and register the UDTF. Notice the Python class structure:
- The
t TABLEparameter accepts an input table with any schema. The UDTF automatically adapts to process whatever columns are provided. This flexibility means you can use the same function across different tables without modifying the function signature. However, you must carefully check the schema of the rows to ensure compatibility. - The
__init__method is used for heavy, one-time setup, like loading the large network list. This work takes place once per partition of the input table. - The
evalmethod processes each row and contains the core matching logic. This method executes exactly once for each row in the input partition, and each execution is performed by the corresponding instance of theIpMatcherUDTF class for that partition. - The
HANDLERclause specifies the name of the Python class that implements the UDTF logic.
CREATE OR REPLACE TEMPORARY FUNCTION ip_cidr_matcher(t TABLE)
RETURNS TABLE(log_id STRING, ip_address STRING, network STRING, ip_version INT)
LANGUAGE PYTHON
HANDLER 'IpMatcher'
COMMENT 'Match IP addresses against a list of network CIDR blocks'
AS $$
class IpMatcher:
def __init__(self):
import ipaddress
# Heavy initialization - load networks once per partition
self.nets = []
cidrs = ['192.168.0.0/16', '10.0.0.0/8', '172.16.0.0/12',
'2001:db8::/32', 'fe80::/10', '::1/128']
for cidr in cidrs:
self.nets.append(ipaddress.ip_network(cidr))
def eval(self, row):
import ipaddress
# Validate that required fields exist
required_fields = ['log_id', 'ip_address']
for field in required_fields:
if field not in row:
raise ValueError(f"Missing required field: {field}")
try:
ip = ipaddress.ip_address(row['ip_address'])
for net in self.nets:
if ip in net:
yield (row['log_id'], row['ip_address'], str(net), ip.version)
return
yield (row['log_id'], row['ip_address'], None, ip.version)
except ValueError:
yield (row['log_id'], row['ip_address'], 'Invalid', None)
$$;
Now that ip_cidr_matcher is registered in Unity Catalog, call it directly from SQL using the TABLE() syntax:
-- Process all IP addresses
SELECT
*
FROM
ip_cidr_matcher(t => TABLE(ip_logs))
ORDER BY
log_id;
+--------+-------------------------------+-----------------+-------------+
| log_id | ip_address | network | ip_version |
+--------+-------------------------------+-----------------+-------------+
| log1 | 192.168.1.100 | 192.168.0.0/16 | 4 |
| log2 | 10.0.0.5 | 10.0.0.0/8 | 4 |
| log3 | 172.16.0.10 | 172.16.0.0/12 | 4 |
| log4 | 8.8.8.8 | null | 4 |
| log5 | 2001:db8::1 | 2001:db8::/32 | 6 |
| log6 | 2001:db8:85a3::8a2e:370:7334 | 2001:db8::/32 | 6 |
| log7 | fe80::1 | fe80::/10 | 6 |
| log8 | ::1 | ::1/128 | 6 |
| log9 | 2001:db8:1234:5678::1 | 2001:db8::/32 | 6 |
+--------+-------------------------------+-----------------+-------------+
Example: Batch image captioning using Databricks vision endpoints
This example demonstrates batch image captioning using a Databricks vision model serving endpoint. It showcases using terminate() for batch processing and partition-based execution.
-
Create a table with public image URLs:
SQLCREATE OR REPLACE TEMPORARY VIEW sample_images AS
VALUES
('https://upload.wikimedia.org/wikipedia/commons/thumb/d/dd/Gfp-wisconsin-madison-the-nature-boardwalk.jpg/2560px-Gfp-wisconsin-madison-the-nature-boardwalk.jpg', 'scenery'),
('https://upload.wikimedia.org/wikipedia/commons/thumb/a/a7/Camponotus_flavomarginatus_ant.jpg/1024px-Camponotus_flavomarginatus_ant.jpg', 'animals'),
('https://upload.wikimedia.org/wikipedia/commons/thumb/1/15/Cat_August_2010-4.jpg/1200px-Cat_August_2010-4.jpg', 'animals'),
('https://upload.wikimedia.org/wikipedia/commons/thumb/c/c5/M101_hires_STScI-PRC2006-10a.jpg/1024px-M101_hires_STScI-PRC2006-10a.jpg', 'scenery')
images(image_url, category); -
Create a Unity Catalog Python UDTF to generate image captions:
- Initialize the UDTF with the configuration, including batch size, Databricks API token, vision model endpoint, and workspace URL.
- In the
evalmethod, collect the image URLs into a buffer. When the buffer reaches the batch size, trigger batch processing. This ensures that multiple images are processed together in a single API call rather than individual calls per image. - In the batch processing method, download all buffered images, encode them as base64, and send them to a single API request to Databricks VisionModel. The model processes all images simultaneously and returns captions for the entire batch.
- The
terminatemethod is executed exactly once at the end of each partition. In the terminate method, process any remaining images in the buffer and yield all collected captions as results.
Replace <workspace-url> with your actual Databricks workspace URL (https://your-workspace.cloud.databricks.com).
CREATE OR REPLACE TEMPORARY FUNCTION batch_inference_image_caption(data TABLE, api_token STRING)
RETURNS TABLE (caption STRING)
LANGUAGE PYTHON
HANDLER 'BatchInferenceImageCaption'
COMMENT 'batch image captioning by sending groups of image URLs to a Databricks vision endpoint and returning concise captions for each image.'
AS $$
class BatchInferenceImageCaption:
def __init__(self):
self.batch_size = 3
self.vision_endpoint = "databricks-claude-3-7-sonnet"
self.workspace_url = "<workspace-url>"
self.image_buffer = []
self.results = []
def eval(self, row, api_token):
self.image_buffer.append((str(row[0]), api_token))
if len(self.image_buffer) >= self.batch_size:
self._process_batch()
def terminate(self):
if self.image_buffer:
self._process_batch()
for caption in self.results:
yield (caption,)
def _process_batch(self):
batch_data = self.image_buffer.copy()
self.image_buffer.clear()
import base64
import httpx
import requests
# API request timeout in seconds
api_timeout = 60
# Maximum tokens for vision model response
max_response_tokens = 300
# Temperature controls randomness (lower = more deterministic)
model_temperature = 0.3
# create a batch for the images
batch_images = []
api_token = batch_data[0][1] if batch_data else None
for image_url, _ in batch_data:
image_response = httpx.get(image_url, timeout=15)
image_data = base64.standard_b64encode(image_response.content).decode("utf-8")
batch_images.append(image_data)
content_items = [{
"type": "text",
"text": "Provide brief captions for these images, one per line."
}]
for img_data in batch_images:
content_items.append({
"type": "image_url",
"image_url": {
"url": "data:image/jpeg;base64," + img_data
}
})
payload = {
"messages": [{
"role": "user",
"content": content_items
}],
"max_tokens": max_response_tokens,
"temperature": model_temperature
}
response = requests.post(
self.workspace_url + "/serving-endpoints/" +
self.vision_endpoint + "/invocations",
headers={
'Authorization': 'Bearer ' + api_token,
'Content-Type': 'application/json'
},
json=payload,
timeout=api_timeout
)
result = response.json()
batch_response = result['choices'][0]['message']['content'].strip()
lines = batch_response.split('\n')
captions = [line.strip() for line in lines if line.strip()]
while len(captions) < len(batch_data):
captions.append(batch_response)
self.results.extend(captions[:len(batch_data)])
$$;
To use the batch image caption UDTF, call it using the sample images table:
Replace your_secret_scope and api_token with the actual secret scope and key name for the Databricks API token.
SELECT
caption
FROM
batch_inference_image_caption(
data => TABLE(sample_images),
api_token => secret('your_secret_scope', 'api_token')
)
+---------------------------------------------------------------------------------------------------------------+
| caption |
+---------------------------------------------------------------------------------------------------------------+
| Wooden boardwalk cutting through vibrant wetland grasses under blue skies |
| Black ant in detailed macro photography standing on a textured surface |
| Tabby cat lounging comfortably on a white ledge against a white wall |
| Stunning spiral galaxy with bright central core and sweeping blue-white arms against the black void of space. |
+---------------------------------------------------------------------------------------------------------------+
You can also generate image captions category by category:
SELECT
*
FROM
batch_inference_image_caption(
TABLE(sample_images)
PARTITION BY category ORDER BY (category),
secret('your_secret_scope', 'api_token')
)
+------------------------------------------------------------------------------------------------------+
| caption |
+------------------------------------------------------------------------------------------------------+
| Black ant in detailed macro photography standing on a textured surface |
| Stunning spiral galaxy with bright center and sweeping blue-tinged arms against the black of space. |
| Tabby cat lounging comfortably on white ledge against white wall |
| Wooden boardwalk cutting through lush wetland grasses under blue skies |
+------------------------------------------------------------------------------------------------------+
Example: ROC curve and AUC computation for ML model evaluation
This example demonstrates computing receiver operating characteristic (ROC) curves and area under the curve (AUC) scores for binary classification model evaluation using scikit-learn.
This example showcases several important patterns:
- External library usage: Integrates scikit-learn for ROC curve computation
- Stateful aggregation: Accumulates predictions across all rows before computing metrics
terminate()method usage: Processes the complete dataset and yields results only after all rows have been evaluated- Error handling: Validates required columns exist in the input table
The UDTF accumulates all predictions in memory using the eval() method, then computes and yields the complete ROC curve in the terminate() method. This pattern is useful for metrics that require the complete dataset for calculation.
CREATE OR REPLACE TEMPORARY FUNCTION compute_roc_curve(t TABLE)
RETURNS TABLE (threshold DOUBLE, true_positive_rate DOUBLE, false_positive_rate DOUBLE, auc DOUBLE)
LANGUAGE PYTHON
HANDLER 'ROCCalculator'
COMMENT 'Compute ROC curve and AUC using scikit-learn'
AS $$
class ROCCalculator:
def __init__(self):
from sklearn import metrics
self._roc_curve = metrics.roc_curve
self._roc_auc_score = metrics.roc_auc_score
self._true_labels = []
self._predicted_scores = []
def eval(self, row):
if 'y_true' not in row or 'y_score' not in row:
raise KeyError("Required columns 'y_true' and 'y_score' not found")
true_label = row['y_true']
predicted_score = row['y_score']
label = float(true_label)
self._true_labels.append(label)
self._predicted_scores.append(float(predicted_score))
def terminate(self):
false_pos_rate, true_pos_rate, thresholds = self._roc_curve(
self._true_labels,
self._predicted_scores,
drop_intermediate=False
)
auc_score = float(self._roc_auc_score(self._true_labels, self._predicted_scores))
for threshold, tpr, fpr in zip(thresholds, true_pos_rate, false_pos_rate):
yield float(threshold), float(tpr), float(fpr), auc_score
$$;
Create sample binary classification data with predictions:
CREATE OR REPLACE TEMPORARY VIEW binary_classification_data AS
SELECT *
FROM VALUES
( 1, 1.0, 0.95, 'high_confidence_positive'),
( 2, 1.0, 0.87, 'high_confidence_positive'),
( 3, 1.0, 0.82, 'medium_confidence_positive'),
( 4, 0.0, 0.78, 'false_positive'),
( 5, 1.0, 0.71, 'medium_confidence_positive'),
( 6, 0.0, 0.65, 'false_positive'),
( 7, 0.0, 0.58, 'true_negative'),
( 8, 1.0, 0.52, 'low_confidence_positive'),
( 9, 0.0, 0.45, 'true_negative'),
(10, 0.0, 0.38, 'true_negative'),
(11, 1.0, 0.31, 'low_confidence_positive'),
(12, 0.0, 0.15, 'true_negative'),
(13, 0.0, 0.08, 'high_confidence_negative'),
(14, 0.0, 0.03, 'high_confidence_negative')
AS data(sample_id, y_true, y_score, prediction_type);
Compute the ROC curve and AUC:
SELECT
threshold,
true_positive_rate,
false_positive_rate,
auc
FROM compute_roc_curve(
TABLE(
SELECT y_true, y_score
FROM binary_classification_data
WHERE y_true IS NOT NULL AND y_score IS NOT NULL
ORDER BY sample_id
)
)
ORDER BY threshold DESC;
+-----------+---------------------+----------------------+-------+
| threshold | true_positive_rate | false_positive_rate | auc |
+-----------+---------------------+----------------------+-------+
| 1.95 | 0.0 | 0.0 | 0.786 |
| 0.95 | 0.167 | 0.0 | 0.786 |
| 0.87 | 0.333 | 0.0 | 0.786 |
| 0.82 | 0.5 | 0.0 | 0.786 |
| 0.78 | 0.5 | 0.125 | 0.786 |
| 0.71 | 0.667 | 0.125 | 0.786 |
| 0.65 | 0.667 | 0.25 | 0.786 |
| 0.58 | 0.667 | 0.375 | 0.786 |
| 0.52 | 0.833 | 0.375 | 0.786 |
| 0.45 | 0.833 | 0.5 | 0.786 |
| 0.38 | 0.833 | 0.625 | 0.786 |
| 0.31 | 1.0 | 0.625 | 0.786 |
| 0.15 | 1.0 | 0.75 | 0.786 |
| 0.08 | 1.0 | 0.875 | 0.786 |
| 0.03 | 1.0 | 1.0 | 0.786 |
+-----------+---------------------+----------------------+-------+
Limitations
The following limitations apply to Unity Catalog Python UDTFs:
- Polymorphic table functions are not supported.
- Unity Catalog service credentials are not supported.
- Custom dependencies are not supported.