O que são funções de tabela definidas pelo usuário em Python?
Visualização
Esse recurso está na Prévia Pública.
Uma função de tabela definida pelo usuário (UDTF) permite registrar funções que retornam tabelas em vez de valores escalares. Os UDTFs funcionam de forma semelhante às expressões de tabela comuns (CTEs) quando referenciados em query SQL. Você faz referência a UDTFs na cláusula FROM
de uma instrução SQL e pode encadear operadores Spark SQL adicionais aos resultados.
Os UDTFs são registrados no SparkSession local e são isolados no nível do Notebook ou Job .
Os UDTFs são suportados em compute configurada com modos de acesso compartilhado atribuídos ou sem isolamento. Você não pode usar UDTFs no modo de acesso compartilhado.
Você não pode registrar UDTFs como objetos no Unity Catalog e UDTFs não podem ser usados com SQL warehouse.
Qual é a sintaxe básica de um UDTF?
O Apache Spark implementa UDTFs Python como classes Python com um método eval
obrigatório.
Você emite resultados como linhas usando yield
.
Para que o Apache Spark use sua classe como UDTF, você deve importar a função PySpark udtf
.
A Databricks recomenda usar essa função como decorador e sempre especificar explicitamente nomes e tipos de campos usando a opção returnType
.
O exemplo a seguir cria uma tabela simples a partir de entradas escalares usando um UDTF:
from pyspark.sql.functions import lit, udtf
@udtf(returnType="sum: int, diff: int")
class SimpleUDTF:
def eval(self, x: int, y: int):
yield x + y, x - y
SimpleUDTF(lit(1), lit(2)).show()
# +----+-----+
# | sum| diff|
# +----+-----+
# | 3| -1|
# +----+-----+
Você pode usar a sintaxe *args
do Python e implementar lógica para lidar com um número não especificado de valores de entrada. O exemplo a seguir retorna o mesmo resultado enquanto verifica explicitamente o comprimento e os tipos de entrada dos argumentos:
@udtf(returnType="sum: int, diff: int")
class SimpleUDTF:
def eval(self, *args):
assert(len(args) == 2)
assert(isinstance(arg, int) for arg in args)
x = args[0]
y = args[1]
yield x + y, x - y
SimpleUDTF(lit(1), lit(2)).show()
# +----+-----+
# | sum| diff|
# +----+-----+
# | 3| -1|
# +----+-----+
registrar-se em UDTF
Você pode registrar um UDTF no SparkSession atual para uso na query SQL usando a seguinte sintaxe:
spark.udtf.register("<udtf-sql-name>", <udtf-python-name>)
O exemplo a seguir registra um UDTF do Python para SQL:
spark.udtf.register("simple_udtf", SimpleUDTF)
Depois de registrado, você poderá usar o UDTF em SQL usando o comando mágico %sql
ou a função spark.sql()
, como nos exemplos a seguir:
%sql
SELECT * FROM simple_udtf(1,2);
spark.sql("SELECT * FROM simple_udtf(1,2);")
Gerando resultados
UDTFs Python são implementados com yield
para retornar resultados. Os resultados são sempre retornados como uma tabela contendo 0 ou mais linhas com o esquema especificado.
Ao passar argumentos escalares, a lógica na execução do método eval
exatamente uma vez com o conjunto de argumentos escalares passados. Para argumentos de tabela, o método eval
é executado uma vez para cada linha da tabela de entrada.
A lógica pode ser escrita para retornar 0, 1 ou muitas linhas por entrada.
O UDTF a seguir demonstra o retorno de 0 ou mais linhas para cada entrada separando itens de uma lista separada por vírgula em entradas separadas:
from pyspark.sql.functions import udtf
@udtf(returnType="id: int, item: string")
class Itemize:
def eval(self, id: int, item_list: str):
items = item_list.split(",")
for item in items:
if item != "":
yield id, item
Passar um argumento de tabela para um UDTF
Você pode usar a palavra-chave SQL TABLE()
para transmitir um argumento de tabela para um UDTF. Você pode usar um nome de tabela ou uma query, como nos exemplos a seguir:
TABLE(table_name);
TABLE(SELECT * FROM table_name);
Os argumentos da tabela são processados uma linha por vez. Você pode usar anotação de campo de coluna padrão do PySpark para interagir com colunas em cada linha. O exemplo a seguir demonstra a importação explícita do tipo Row
do PySpark e a filtragem da tabela transmitida no campo id
:
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="id: int")
class FilterUDTF:
def eval(self, row: Row):
if row["id"] > 5:
yield row["id"],
spark.udtf.register("filter_udtf", FilterUDTF)
spark.sql("SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)))").show()
# +---+
# | id|
# +---+
# | 6|
# | 7|
# | 8|
# | 9|
# +---+
Passar argumentos escalares para um UDTF
É possível transmitir argumentos escalares para um UDTF utilizando qualquer combinação dos seguintes valores:
Constantes escalares
Funções escalares
Campos em uma relação
Para passar campos em uma relação, é necessário registrar o UDTF e utilizar a palavra-chave SQL LATERAL
.
Observação
Você pode usar aliases de tabela em linha para desambiguar colunas.
O exemplo a seguir demonstra o uso LATERAL
para transmitir campos de uma tabela para um UDTF:
from pyspark.sql.functions import udtf
@udtf(returnType="id: int, item: string")
class Itemize:
def eval(self, id: int, item_list: str):
items = item_list.split(",")
for item in items:
if item != "":
yield id, item
spark.udtf.register("itemize", Itemize)
spark.sql("""
SELECT b.id, b.item FROM VALUES (1, 'pots,pans,forks'),
(2, 'spoons,'),
(3, ''),
(4, 'knives,cups') t(id, item_list),
LATERAL itemize(id, item_list) b
""").show()
Definir valores padrão para UDTFs
Opcionalmente, você pode implementar um método __init__
para definir valores default para variáveis de classe que podem ser referenciadas na lógica do Python.
O método __init__
não aceita argumentos e não tem acesso a variáveis ou informações de estado no SparkSession.
Use Apache Arrow com UDTFs
A Databricks recomenda a utilização do Apache Arrow para UDTFs que recebem uma pequena quantidade de dados como entrada, mas produzem uma tabela grande.
Você pode ativar a seta especificando o parâmetro useArrow
ao declarar o UDTF, como no exemplo a seguir:
from pyspark.sql.functions import udtf
@udtf(returnType="c1: int, c2: int", useArrow=True)
class PlusOne:
def eval(self, x: int):
yield x, x + 1