What are Python user-defined table functions?

Preview

This feature is in Public Preview.

A user-defined table function (UDTF) allows you to register functions that return tables instead of scalar values. UDTFs function similarly to common table expressions (CTEs) when referenced in SQL queries. You reference UDTFs in the FROM clause of a SQL statement, and you can chain additional Spark SQL operators to the results.

UDTFs are registered to the local SparkSession and are isolated at the notebook or job level.

UDTFs are supported on compute configured with assigned or no-isolation shared access modes. You cannot use UDTFs on shared access mode.

You cannot register UDTFs as objects in Unity Catalog, and UDTFs cannot be used with SQL warehouses.

What is the basic syntax for a UDTF?

Apache Spark implements Python UDTFs as Python classes with a mandatory eval method.

You emit results as rows using yield.

For Apache Spark to use your class as a UDTF, you must import the PySpark udtf function.

Databricks recommends using this function as a decorator and always explicitly specifying field names and types using the returnType option.

The following example creates a simple table from scalar inputs using a UDTF:

from pyspark.sql.functions import lit, udtf

@udtf(returnType="sum: int, diff: int")
class SimpleUDTF:
    def eval(self, x: int, y: int):
        yield x + y, x - y

SimpleUDTF(lit(1), lit(2)).show()
# +----+-----+
# | sum| diff|
# +----+-----+
# |   3|   -1|
# +----+-----+

You can use Python *args syntax and implement logic to handle an unspecified number of input values. The following example returns the same result while explicitly checking the input length and types for the arguments:

@udtf(returnType="sum: int, diff: int")
class SimpleUDTF:
    def eval(self, *args):
        assert(len(args) == 2)
        assert(isinstance(arg, int) for arg in args)
        x = args[0]
        y = args[1]
        yield x + y, x - y

SimpleUDTF(lit(1), lit(2)).show()
# +----+-----+
# | sum| diff|
# +----+-----+
# |   3|   -1|
# +----+-----+

Register a UDTF

You can register a UDTF to the current SparkSession for use in SQL queries using the following syntax:

spark.udtf.register("<udtf-sql-name>", <udtf-python-name>)

The following example registers a Python UDTF to SQL:

spark.udtf.register("simple_udtf", SimpleUDTF)

Once registered, you can use the UDTF in SQL using either the %sql magic command or spark.sql() function, as in the following examples:

%sql
SELECT * FROM simple_udtf(1,2);
spark.sql("SELECT * FROM simple_udtf(1,2);")

Yielding results

Python UDTFs are implemented with yield to return results. Results are always returned as a table containing 0 or more rows with the specified schema.

When passing scalar arguments, logic in the eval method runs exactly once with the set of scalar arguments passed. For table arguments, the eval method runs once for each row in the input table.

Logic can be written to return 0, 1, or many rows per input.

The following UDTF demonstrates returning 0 or more rows for each input by separating items from a comma separated list into separate entries:

from pyspark.sql.functions import udtf

@udtf(returnType="id: int, item: string")
class Itemize:
    def eval(self, id: int, item_list: str):
        items = item_list.split(",")
        for item in items:
            if item != "":
                yield id, item

Pass a table argument to a UDTF

You can use the SQL keyword TABLE() to pass a table argument to a UDTF. You can use a table name or a query, as in the following examples:

TABLE(table_name);
TABLE(SELECT * FROM table_name);

Table arguments are processed one row at a time. You can use standard PySpark column field annotations to interact with columns in each row. The following example demonstrates explicitly importing the PySpark Row type and then filtering the passed table on the id field:

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="id: int")
class FilterUDTF:
    def eval(self, row: Row):
        if row["id"] > 5:
            yield row["id"],

spark.udtf.register("filter_udtf", FilterUDTF)

spark.sql("SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)))").show()
# +---+
# | id|
# +---+
# |  6|
# |  7|
# |  8|
# |  9|
# +---+

Pass scalar arguments to a UDTF

You can pass scalar arguments to a UDTF using any combination of the following values:

  • Scalar constants

  • Scalar functions

  • Fields in a relation

To pass fields in a relation, you must register the UDTF and use the SQL LATERAL keyword.

Note

You can use in-line table aliases to disambiguate columns.

The following example demonstrates using LATERAL to pass fields from a table to a UDTF:

from pyspark.sql.functions import udtf

@udtf(returnType="id: int, item: string")
class Itemize:
    def eval(self, id: int, item_list: str):
        items = item_list.split(",")
        for item in items:
            if item != "":
                yield id, item

spark.udtf.register("itemize", Itemize)

spark.sql("""
    SELECT b.id, b.item FROM VALUES (1, 'pots,pans,forks'),
    (2, 'spoons,'),
    (3, ''),
    (4, 'knives,cups') t(id, item_list),
    LATERAL itemize(id, item_list) b
""").show()

Set default values for UDTFs

You can optionally implement an __init__ method to set default values for class variables you can reference in your Python logic.

The __init__ method does not accept any arguments and has no access to variables or state information in the SparkSession.

Use Apache Arrow with UDTFs

Databricks recommends using Apache Arrow for UDTFs that receive a small amount of data as input but output a large table.

You can enable Arrow by specifying the useArrow parameter when declaring the UDTF, as in the following example:

from pyspark.sql.functions import udtf

@udtf(returnType="c1: int, c2: int", useArrow=True)
class PlusOne:
    def eval(self, x: int):
        yield x, x + 1