Skip to main content

Python user-defined table functions (UDTFs) in Unity Catalog

Preview

Registering Python UDTFs in Unity Catalog is in Public Preview.

A Unity Catalog user-defined table function (UDTF) registers functions that return complete tables instead of scalar values. Unlike scalar functions that return a single result value from each call, UDTFs are invoked in a SQL statement's FROM clause and can return multiple rows and columns.

UDTFs are particularly useful for:

  • Transforming arrays or complex data structures into multiple rows
  • Integrating external APIs or services into SQL workflows
  • Implementing custom data generation or enrichment logic
  • Processing data that requires stateful operations across rows

Each UDTF call accepts zero or more arguments. These arguments can be scalar expressions or table arguments representing entire input tables.

UDTFs can be registered in two ways:

  • Unity Catalog: Register the UDTF as a governed object in Unity Catalog.
  • Session-scoped: Register to the local SparkSession, isolated to the current notebook or job. See Python user-defined table functions (UDTFs).

Requirements

Unity Catalog Python UDTFs are supported on the following compute types:

  • Classic compute with standard access mode (Databricks Runtime 17.1 and above)
  • SQL warehouse (serverless or pro)

Create a UDTF in Unity Catalog

Use SQL DDL to create a governed UDTF in Unity Catalog. UDTFs are invoked using a SQL statement's FROM clause.

SQL
CREATE OR REPLACE FUNCTION square_numbers(start INT, end INT)
RETURNS TABLE (num INT, squared INT)
LANGUAGE PYTHON
HANDLER 'SquareNumbers'
DETERMINISTIC
AS $$
class SquareNumbers:
"""
Basic UDTF that computes a sequence of integers
and includes the square of each number in the range.
"""
def eval(self, start: int, end: int):
for num in range(start, end + 1):
yield (num, num * num)
$$;

SELECT * FROM square_numbers(1, 5);

Output
+-----+---------+
| num | squared |
+-----+---------+
| 1 | 1 |
| 2 | 4 |
| 3 | 9 |
| 4 | 16 |
| 5 | 25 |
+-----+---------+

Databricks implements Python UDTFs as Python classes with a mandatory eval method that yields output rows.

Table arguments

note

TABLE arguments are supported in Databricks Runtime 17.2 and above.

UDTFs can accept entire tables as input arguments, enabling complex stateful transformations and aggregations.

eval() and terminate() lifecycle methods

Table arguments in UDTFs make use of the following functions to process each row:

  • eval(): Called once for each row in the input table. This is the main processing method and is required.
  • terminate(): Called once at the end of each partition, after all rows have been processed by eval(). Use this method to yield final aggregated results or perform cleanup operations. This method is optional but essential for stateful operations like aggregations, counting, or batch processing.

For more information about eval() and terminate() methods, see Apache Spark documentation: Python UDTF.

Row access patterns

eval() receives rows from TABLE arguments as pyspark.sql.Row objects. You can access values by column name (row['id'], row['name']) or by index (row[0], row[1]).

  • Schema flexibility: Declare TABLE arguments without schema definitions (for example, data TABLE, t TABLE). The function accepts any table structure, so your code should validate that required columns exist.

See Example: Match IP addresses against CIDR network blocks and Example: Batch image captioning using Databricks vision endpoints.

Environment isolation

note

Shared isolation environments require Databricks Runtime 17.2 and above. In earlier versions, all Unity Catalog Python UDTFs run in strict isolation mode.

Unity Catalog Python UDTFs with the same owner and session can share an isolation environment by default. This improves performance and reduces memory usage by reducing the number of separate environments that need to be launched.

Strict isolation

To ensure a UDTF always runs in its own, fully isolated environment, add the STRICT ISOLATION characteristic clause.

Most UDTFs don't need strict isolation. Standard data processing UDTFs benefit from the default shared isolation environment and run faster with lower memory consumption.

Add the STRICT ISOLATION characteristic clause to UDTFs that:

  • Run input as code using eval(), exec(), or similar functions.
  • Write files to the local file system.
  • Modify global variables or system state.
  • Access or modify environment variables.

The following UDTF example sets a custom environment variable, reads the variable back, and multiplies a set of numbers using the variable. Because the UDTF mutates the process environment, run it in STRICT ISOLATION. Otherwise, it could leak or override environment variables for other UDFs/UDTFs in the same environment, causing incorrect behavior.

SQL
CREATE OR REPLACE TEMPORARY FUNCTION multiply_numbers(factor STRING)
RETURNS TABLE (original INT, scaled INT)
LANGUAGE PYTHON
STRICT ISOLATION
HANDLER 'Multiplier'
AS $$
import os

class Multiplier:
def eval(self, factor: str):
# Save the factor as an environment variable
os.environ["FACTOR"] = factor

# Read it back and convert it to a number
scale = int(os.getenv("FACTOR", "1"))

# Multiply 0 through 4 by the factor
for i in range(5):
yield (i, i * scale)
$$;

SELECT * FROM multiply_numbers("3");

Set DETERMINISTIC if your function produces consistent results

Add DETERMINISTIC to your function definition if it produces the same outputs for the same inputs. This allows query optimizations to improve performance.

By default, Batch Unity Catalog Python UDTFs are assumed to be non-deterministic unless explicitly declared. Examples of non-deterministic functions include: generating random values, accessing current times or dates, or making external API calls.

See CREATE FUNCTION (SQL and Python).

Practical examples

The following examples demonstrate real-world use cases for Unity Catalog Python UDTFs, progressing from simple data transformations to complex external integrations.

Example: Re-implementing explode

While Spark provides a built-in explode function, creating your own version demonstrates the fundamental UDTF pattern of taking a single input and producing multiple output rows.

SQL
CREATE OR REPLACE FUNCTION my_explode(arr ARRAY<STRING>)
RETURNS TABLE (element STRING)
LANGUAGE PYTHON
HANDLER 'MyExplode'
DETERMINISTIC
AS $$
class MyExplode:
def eval(self, arr):
if arr is None:
return
for element in arr:
yield (element,)
$$;

Use the function directly in a SQL query:

SQL
SELECT element FROM my_explode(array('apple', 'banana', 'cherry'));
Output
+---------+
| element |
+---------+
| apple |
| banana |
| cherry |
+---------+

Or apply it to existing table data with a LATERAL join:

SQL
SELECT s.*, e.element
FROM my_items AS s,
LATERAL my_explode(s.items) AS e;

Example: IP address geolocation via REST API

This example demonstrates how UDTFs can integrate external APIs directly into your SQL workflow. Analysts can enrich data with real-time API calls using familiar SQL syntax, without requiring separate ETL processes.

SQL
CREATE OR REPLACE FUNCTION ip_to_location(ip_address STRING)
RETURNS TABLE (city STRING, country STRING)
LANGUAGE PYTHON
HANDLER 'IPToLocationAPI'
AS $$
class IPToLocationAPI:
def eval(self, ip_address):
import requests
api_url = f"https://api.ip-lookup.example.com/{ip_address}"
try:
response = requests.get(api_url)
response.raise_for_status()
data = response.json()
yield (data.get('city'), data.get('country'))
except requests.exceptions.RequestException as e:
# Return nothing if the API request fails
return
$$;
note

Python UDTFs allow TCP/UDP network traffic over ports 80, 443, and 53 when using serverless compute or compute configured with standard access mode.

Use the function to enrich web log data with geographic information:

SQL
SELECT
l.timestamp,
l.request_path,
geo.city,
geo.country
FROM web_logs AS l,
LATERAL ip_to_location(l.ip_address) AS geo;

This approach enables real-time geographic analysis without requiring pre-processed lookup tables or separate data pipelines. The UDTF handles HTTP requests, JSON parsing, and error handling, making external data sources accessible through standard SQL queries.

Example: Match IP addresses against CIDR network blocks

This example demonstrates matching IP addresses against CIDR network blocks, a common data engineering task that requires complex SQL logic.

First, create sample data with both IPv4 and IPv6 addresses:

SQL
-- An example IP logs with both IPv4 and IPv6 addresses
CREATE OR REPLACE TEMPORARY VIEW ip_logs AS
VALUES
('log1', '192.168.1.100'),
('log2', '10.0.0.5'),
('log3', '172.16.0.10'),
('log4', '8.8.8.8'),
('log5', '2001:db8::1'),
('log6', '2001:db8:85a3::8a2e:370:7334'),
('log7', 'fe80::1'),
('log8', '::1'),
('log9', '2001:db8:1234:5678::1')
t(log_id, ip_address);

Next, define and register the UDTF. Notice the Python class structure:

  • The t TABLE parameter accepts an input table with any schema. The UDTF automatically adapts to process whatever columns are provided. This flexibility means you can use the same function across different tables without modifying the function signature. However, you must carefully check the schema of the rows to ensure compatibility.
  • The __init__ method is used for heavy, one-time setup, like loading the large network list. This work takes place once per partition of the input table.
  • The eval method processes each row and contains the core matching logic. This method executes exactly once for each row in the input partition, and each execution is performed by the corresponding instance of the IpMatcher UDTF class for that partition.
  • The HANDLER clause specifies the name of the Python class that implements the UDTF logic.
SQL
CREATE OR REPLACE TEMPORARY FUNCTION ip_cidr_matcher(t TABLE)
RETURNS TABLE(log_id STRING, ip_address STRING, network STRING, ip_version INT)
LANGUAGE PYTHON
HANDLER 'IpMatcher'
COMMENT 'Match IP addresses against a list of network CIDR blocks'
AS $$
class IpMatcher:
def __init__(self):
import ipaddress
# Heavy initialization - load networks once per partition
self.nets = []
cidrs = ['192.168.0.0/16', '10.0.0.0/8', '172.16.0.0/12',
'2001:db8::/32', 'fe80::/10', '::1/128']
for cidr in cidrs:
self.nets.append(ipaddress.ip_network(cidr))

def eval(self, row):
import ipaddress
# Validate that required fields exist
required_fields = ['log_id', 'ip_address']
for field in required_fields:
if field not in row:
raise ValueError(f"Missing required field: {field}")
try:
ip = ipaddress.ip_address(row['ip_address'])
for net in self.nets:
if ip in net:
yield (row['log_id'], row['ip_address'], str(net), ip.version)
return
yield (row['log_id'], row['ip_address'], None, ip.version)
except ValueError:
yield (row['log_id'], row['ip_address'], 'Invalid', None)
$$;

Now that ip_cidr_matcher is registered in Unity Catalog, call it directly from SQL using the TABLE() syntax:

SQL
-- Process all IP addresses
SELECT
*
FROM
ip_cidr_matcher(t => TABLE(ip_logs))
ORDER BY
log_id;
Output
+--------+-------------------------------+-----------------+-------------+
| log_id | ip_address | network | ip_version |
+--------+-------------------------------+-----------------+-------------+
| log1 | 192.168.1.100 | 192.168.0.0/16 | 4 |
| log2 | 10.0.0.5 | 10.0.0.0/8 | 4 |
| log3 | 172.16.0.10 | 172.16.0.0/12 | 4 |
| log4 | 8.8.8.8 | null | 4 |
| log5 | 2001:db8::1 | 2001:db8::/32 | 6 |
| log6 | 2001:db8:85a3::8a2e:370:7334 | 2001:db8::/32 | 6 |
| log7 | fe80::1 | fe80::/10 | 6 |
| log8 | ::1 | ::1/128 | 6 |
| log9 | 2001:db8:1234:5678::1 | 2001:db8::/32 | 6 |
+--------+-------------------------------+-----------------+-------------+

Example: Batch image captioning using Databricks vision endpoints

This example demonstrates batch image captioning using a Databricks vision model serving endpoint. It showcases using terminate() for batch processing and partition-based execution.

  1. Create a table with public image URLs:

    SQL
    CREATE OR REPLACE TEMPORARY VIEW sample_images AS
    VALUES
    ('https://upload.wikimedia.org/wikipedia/commons/thumb/d/dd/Gfp-wisconsin-madison-the-nature-boardwalk.jpg/2560px-Gfp-wisconsin-madison-the-nature-boardwalk.jpg', 'scenery'),
    ('https://upload.wikimedia.org/wikipedia/commons/thumb/a/a7/Camponotus_flavomarginatus_ant.jpg/1024px-Camponotus_flavomarginatus_ant.jpg', 'animals'),
    ('https://upload.wikimedia.org/wikipedia/commons/thumb/1/15/Cat_August_2010-4.jpg/1200px-Cat_August_2010-4.jpg', 'animals'),
    ('https://upload.wikimedia.org/wikipedia/commons/thumb/c/c5/M101_hires_STScI-PRC2006-10a.jpg/1024px-M101_hires_STScI-PRC2006-10a.jpg', 'scenery')
    images(image_url, category);
  2. Create a Unity Catalog Python UDTF to generate image captions:

    1. Initialize the UDTF with the configuration, including batch size, Databricks API token, vision model endpoint, and workspace URL.
    2. In the eval method, collect the image URLs into a buffer. When the buffer reaches the batch size, trigger batch processing. This ensures that multiple images are processed together in a single API call rather than individual calls per image.
    3. In the batch processing method, download all buffered images, encode them as base64, and send them to a single API request to Databricks VisionModel. The model processes all images simultaneously and returns captions for the entire batch.
    4. The terminate method is executed exactly once at the end of each partition. In the terminate method, process any remaining images in the buffer and yield all collected captions as results.
note

Replace <workspace-url> with your actual Databricks workspace URL (https://your-workspace.cloud.databricks.com).

SQL
CREATE OR REPLACE TEMPORARY FUNCTION batch_inference_image_caption(data TABLE, api_token STRING)
RETURNS TABLE (caption STRING)
LANGUAGE PYTHON
HANDLER 'BatchInferenceImageCaption'
COMMENT 'batch image captioning by sending groups of image URLs to a Databricks vision endpoint and returning concise captions for each image.'
AS $$
class BatchInferenceImageCaption:
def __init__(self):
self.batch_size = 3
self.vision_endpoint = "databricks-claude-3-7-sonnet"
self.workspace_url = "<workspace-url>"
self.image_buffer = []
self.results = []

def eval(self, row, api_token):
self.image_buffer.append((str(row[0]), api_token))
if len(self.image_buffer) >= self.batch_size:
self._process_batch()

def terminate(self):
if self.image_buffer:
self._process_batch()
for caption in self.results:
yield (caption,)

def _process_batch(self):
batch_data = self.image_buffer.copy()
self.image_buffer.clear()

import base64
import httpx
import requests

# API request timeout in seconds
api_timeout = 60
# Maximum tokens for vision model response
max_response_tokens = 300
# Temperature controls randomness (lower = more deterministic)
model_temperature = 0.3

# create a batch for the images
batch_images = []
api_token = batch_data[0][1] if batch_data else None

for image_url, _ in batch_data:
image_response = httpx.get(image_url, timeout=15)
image_data = base64.standard_b64encode(image_response.content).decode("utf-8")
batch_images.append(image_data)

content_items = [{
"type": "text",
"text": "Provide brief captions for these images, one per line."
}]
for img_data in batch_images:
content_items.append({
"type": "image_url",
"image_url": {
"url": "data:image/jpeg;base64," + img_data
}
})

payload = {
"messages": [{
"role": "user",
"content": content_items
}],
"max_tokens": max_response_tokens,
"temperature": model_temperature
}

response = requests.post(
self.workspace_url + "/serving-endpoints/" +
self.vision_endpoint + "/invocations",
headers={
'Authorization': 'Bearer ' + api_token,
'Content-Type': 'application/json'
},
json=payload,
timeout=api_timeout
)

result = response.json()
batch_response = result['choices'][0]['message']['content'].strip()

lines = batch_response.split('\n')
captions = [line.strip() for line in lines if line.strip()]

while len(captions) < len(batch_data):
captions.append(batch_response)

self.results.extend(captions[:len(batch_data)])
$$;

To use the batch image caption UDTF, call it using the sample images table:

note

Replace your_secret_scope and api_token with the actual secret scope and key name for the Databricks API token.

SQL
SELECT
caption
FROM
batch_inference_image_caption(
data => TABLE(sample_images),
api_token => secret('your_secret_scope', 'api_token')
)
Output
+---------------------------------------------------------------------------------------------------------------+
| caption |
+---------------------------------------------------------------------------------------------------------------+
| Wooden boardwalk cutting through vibrant wetland grasses under blue skies |
| Black ant in detailed macro photography standing on a textured surface |
| Tabby cat lounging comfortably on a white ledge against a white wall |
| Stunning spiral galaxy with bright central core and sweeping blue-white arms against the black void of space. |
+---------------------------------------------------------------------------------------------------------------+

You can also generate image captions category by category:

SQL
SELECT
*
FROM
batch_inference_image_caption(
TABLE(sample_images)
PARTITION BY category ORDER BY (category),
secret('your_secret_scope', 'api_token')
)
Output
+------------------------------------------------------------------------------------------------------+
| caption |
+------------------------------------------------------------------------------------------------------+
| Black ant in detailed macro photography standing on a textured surface |
| Stunning spiral galaxy with bright center and sweeping blue-tinged arms against the black of space. |
| Tabby cat lounging comfortably on white ledge against white wall |
| Wooden boardwalk cutting through lush wetland grasses under blue skies |
+------------------------------------------------------------------------------------------------------+

Example: ROC curve and AUC computation for ML model evaluation

This example demonstrates computing receiver operating characteristic (ROC) curves and area under the curve (AUC) scores for binary classification model evaluation using scikit-learn.

This example showcases several important patterns:

  • External library usage: Integrates scikit-learn for ROC curve computation
  • Stateful aggregation: Accumulates predictions across all rows before computing metrics
  • terminate() method usage: Processes the complete dataset and yields results only after all rows have been evaluated
  • Error handling: Validates required columns exist in the input table

The UDTF accumulates all predictions in memory using the eval() method, then computes and yields the complete ROC curve in the terminate() method. This pattern is useful for metrics that require the complete dataset for calculation.

SQL
CREATE OR REPLACE TEMPORARY FUNCTION compute_roc_curve(t TABLE)
RETURNS TABLE (threshold DOUBLE, true_positive_rate DOUBLE, false_positive_rate DOUBLE, auc DOUBLE)
LANGUAGE PYTHON
HANDLER 'ROCCalculator'
COMMENT 'Compute ROC curve and AUC using scikit-learn'
AS $$
class ROCCalculator:
def __init__(self):
from sklearn import metrics
self._roc_curve = metrics.roc_curve
self._roc_auc_score = metrics.roc_auc_score

self._true_labels = []
self._predicted_scores = []

def eval(self, row):
if 'y_true' not in row or 'y_score' not in row:
raise KeyError("Required columns 'y_true' and 'y_score' not found")

true_label = row['y_true']
predicted_score = row['y_score']

label = float(true_label)
self._true_labels.append(label)
self._predicted_scores.append(float(predicted_score))

def terminate(self):
false_pos_rate, true_pos_rate, thresholds = self._roc_curve(
self._true_labels,
self._predicted_scores,
drop_intermediate=False
)

auc_score = float(self._roc_auc_score(self._true_labels, self._predicted_scores))

for threshold, tpr, fpr in zip(thresholds, true_pos_rate, false_pos_rate):
yield float(threshold), float(tpr), float(fpr), auc_score
$$;

Create sample binary classification data with predictions:

SQL
CREATE OR REPLACE TEMPORARY VIEW binary_classification_data AS
SELECT *
FROM VALUES
( 1, 1.0, 0.95, 'high_confidence_positive'),
( 2, 1.0, 0.87, 'high_confidence_positive'),
( 3, 1.0, 0.82, 'medium_confidence_positive'),
( 4, 0.0, 0.78, 'false_positive'),
( 5, 1.0, 0.71, 'medium_confidence_positive'),
( 6, 0.0, 0.65, 'false_positive'),
( 7, 0.0, 0.58, 'true_negative'),
( 8, 1.0, 0.52, 'low_confidence_positive'),
( 9, 0.0, 0.45, 'true_negative'),
(10, 0.0, 0.38, 'true_negative'),
(11, 1.0, 0.31, 'low_confidence_positive'),
(12, 0.0, 0.15, 'true_negative'),
(13, 0.0, 0.08, 'high_confidence_negative'),
(14, 0.0, 0.03, 'high_confidence_negative')
AS data(sample_id, y_true, y_score, prediction_type);

Compute the ROC curve and AUC:

SQL
SELECT
threshold,
true_positive_rate,
false_positive_rate,
auc
FROM compute_roc_curve(
TABLE(
SELECT y_true, y_score
FROM binary_classification_data
WHERE y_true IS NOT NULL AND y_score IS NOT NULL
ORDER BY sample_id
)
)
ORDER BY threshold DESC;
Output
+-----------+---------------------+----------------------+-------+
| threshold | true_positive_rate | false_positive_rate | auc |
+-----------+---------------------+----------------------+-------+
| 1.95 | 0.0 | 0.0 | 0.786 |
| 0.95 | 0.167 | 0.0 | 0.786 |
| 0.87 | 0.333 | 0.0 | 0.786 |
| 0.82 | 0.5 | 0.0 | 0.786 |
| 0.78 | 0.5 | 0.125 | 0.786 |
| 0.71 | 0.667 | 0.125 | 0.786 |
| 0.65 | 0.667 | 0.25 | 0.786 |
| 0.58 | 0.667 | 0.375 | 0.786 |
| 0.52 | 0.833 | 0.375 | 0.786 |
| 0.45 | 0.833 | 0.5 | 0.786 |
| 0.38 | 0.833 | 0.625 | 0.786 |
| 0.31 | 1.0 | 0.625 | 0.786 |
| 0.15 | 1.0 | 0.75 | 0.786 |
| 0.08 | 1.0 | 0.875 | 0.786 |
| 0.03 | 1.0 | 1.0 | 0.786 |
+-----------+---------------------+----------------------+-------+

Limitations

The following limitations apply to Unity Catalog Python UDTFs:

Next steps