Pular para o conteúdo principal

Funções de tabela definidas pelo usuário (UDTFs) do Python

info

Visualização

Esse recurso está em Public Preview em Databricks Runtime 14.3 LTS e acima.

Uma função de tabela definida pelo usuário (UDTF) permite que o senhor registre funções que retornam tabelas em vez de valores escalares. Diferentemente das funções escalares que retornam um único valor de resultado de cada chamada, cada UDTF é chamada em uma cláusula FROM de uma instrução SQL e retorna uma tabela inteira como saída.

Cada chamada de UDTF pode aceitar zero ou mais argumentos. Esses argumentos podem ser expressões escalares ou argumentos de tabela representando tabelas de entrada inteiras.

Sintaxe básica de UDTF

O Apache Spark implementa UDTFs Python como classes Python com um método obrigatório eval que usa yield para emitir linhas de saída.

Para usar sua classe como um UDTF, o senhor deve importar a função PySpark udtf. A Databricks recomenda usar essa função como um decorador e especificar explicitamente os nomes e tipos de campo usando a opção returnType (a menos que a classe defina um método analyze, conforme descrito em uma seção posterior).

A UDTF a seguir cria uma tabela usando uma lista fixa de dois argumentos inteiros:

Python
from pyspark.sql.functions import lit, udtf

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, x: int, y: int):
yield x + y, x - y

GetSumDiff(lit(1), lit(2)).show()
Output
+----+-----+
| sum| diff|
+----+-----+
| 3| -1|
+----+-----+

registro na UDTF

Os UDTFs são registrados no site local SparkSession e são isolados no nível do Notebook ou do Job.

Não é possível registrar UDTFs como objetos em Unity Catalog, e os UDTFs não podem ser usados com o depósito SQL.

O senhor pode registrar um UDTF no SparkSession atual para uso nas consultas do SQL com a função spark.udtf.register(). Forneça um nome para a função SQL e a classe UDTF do Python.

Python
spark.udtf.register("get_sum_diff", GetSumDiff)

Ligue para uma UDTF registrada

Depois de registrado, o senhor pode usar o UDTF no SQL usando o comando mágico %sql ou a função spark.sql() :

Python
spark.udtf.register("get_sum_diff", GetSumDiff)
spark.sql("SELECT * FROM get_sum_diff(1,2);")
Python
%sql
SELECT * FROM get_sum_diff(1,2);

Usar a seta do Apache

Se o seu UDTF receber uma pequena quantidade de dados como entrada, mas produzir uma tabela grande, a Databricks recomenda o uso do Apache Arrow. Você pode habilitá-lo especificando o parâmetro useArrow quando declarando a UDTF:

Python
@udtf(returnType="c1: int, c2: int", useArrow=True)

Listas de argumentos variáveis - *args e**kwargs

O senhor pode usar a sintaxe do Python *args ou **kwargs e implementar a lógica para lidar com um número não especificado de valores de entrada.

O exemplo a seguir retorna o mesmo resultado enquanto verifica explicitamente o tamanho e os tipos de entrada dos argumentos:

Python
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, *args):
assert(len(args) == 2)
assert(isinstance(arg, int) for arg in args)
x = args[0]
y = args[1]
yield x + y, x - y

GetSumDiff(lit(1), lit(2)).show()

Aqui está o mesmo exemplo, mas usando argumentos de palavras-chave:

Python
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, **kwargs):
x = kwargs["x"]
y = kwargs["y"]
yield x + y, x - y

GetSumDiff(x=lit(1), y=lit(2)).show()

Defina um esquema estático no momento do registro

O UDTF retorna linhas com um esquema de saída que compreende uma sequência ordenada de nomes de colunas e tipos. Se o esquema UDTF sempre permanecer o mesmo para todas as consultas, você poderá especificar um esquema estático e fixo esquema após o decorador @udtf. Deve ser um StructType:

Python
StructType().add("c1", StringType())

Ou uma cadeia de caracteres DDL que representa um tipo de estrutura:

c1: string

computar um esquema dinâmico no momento da chamada da função

Os UDTFs também podem compute programar o esquema de saída para cada chamada, dependendo dos valores dos argumentos de entrada. Para fazer isso, defina um método estático chamado analyze que aceite zero ou mais parâmetros que correspondam aos argumentos fornecidos à chamada UDTF específica.

Cada argumento do método analyze é uma instância da classe AnalyzeArgument que contém os seguintes campos:

AnalyzeArgument campo de classe

Descrição

dataType

O tipo do argumento de entrada como DataType. Para argumentos da tabela de entrada, esse é um StructType representando as colunas da tabela.

value

O valor do argumento de entrada como Optional[Any]. Isso é None para argumentos de tabela ou argumentos escalares literais que não são constantes.

isTable

Se o argumento de entrada é uma tabela como BooleanType.

isConstantExpression

Se o argumento de entrada é uma expressão dobrável constante como BooleanType.

O método analyze retorna uma instância da classe AnalyzeResult, que inclui o esquema da tabela de resultados como StructType mais alguns campos opcionais. Se a UDTF aceitar um argumento de tabela de entrada, o AnalyzeResult também poderá incluir uma forma solicitada de particionar e ordenar as linhas da tabela de entrada em várias chamadas de UDTF, conforme descrito posteriormente.

AnalyzeResult campo de classe

Descrição

schema

O esquema da tabela de resultados como StructType.

withSinglePartition

Se todas as linhas de entrada devem ser enviadas para a mesma instância da classe UDTF de BooleanType.

partitionBy

Se definido como não vazio, todas as linhas com cada combinação exclusiva de valores das expressões de particionamento são consumidas por uma instância separada da classe UDTF.

orderBy

Se definido como não vazio, isso especifica uma ordem das linhas em cada partição.

select

Se definido como não vazio, essa é uma sequência de expressões que o UDTF está especificando para o Catalyst avaliar em relação às colunas no argumento TABLE de entrada. A UDTF recebe um atributo de entrada para cada nome na lista na ordem em que são listados.

Este exemplo analyze retorna uma coluna de saída para cada palavra no argumento strings de entrada.

Python
@udtf
class MyUDTF:
@staticmethod
def analyze(text: AnalyzeArgument) -> AnalyzeResult:
schema = StructType()
for index, word in enumerate(sorted(list(set(text.value.split(" "))))):
schema = schema.add(f"word_{index}", IntegerType())
return AnalyzeResult(schema=schema)

def eval(self, text: str):
counts = {}
for word in text.split(" "):
if word not in counts:
counts[word] = 0
counts[word] += 1
result = []
for word in sorted(list(set(text.split(" ")))):
result.append(counts[word])
yield result
Output
['word_0', 'word_1']

Encaminhar o estado para futuras chamadas eval

O método analyze pode servir como um local conveniente para realizar a inicialização e, em seguida, encaminhar os resultados para futuras invocações do método eval para a mesma chamada UDTF.

Para fazer isso, crie uma subclasse de AnalyzeResult e retorne uma instância da subclasse do método analyze. Em seguida, adicione um argumento adicional ao método __init__ para aceitar essa instância.

Este exemplo de analyze retorna um esquema de saída constante, mas adiciona informações personalizadas nos metadados do resultado a serem consumidos por futuras chamadas do método __init__:

Python
@dataclass
class AnalyzeResultWithBuffer(AnalyzeResult):
buffer: str = ""

@udtf
class TestUDTF:
def __init__(self, analyze_result=None):
self._total = 0
if analyze_result is not None:
self._buffer = analyze_result.buffer
else:
self._buffer = ""

@staticmethod
def analyze(argument, _) -> AnalyzeResult:
if (
argument.value is None
or argument.isTable
or not isinstance(argument.value, str)
or len(argument.value) == 0
):
raise Exception("The first argument must be a non-empty string")
assert argument.dataType == StringType()
assert not argument.isTable
return AnalyzeResultWithBuffer(
schema=StructType()
.add("total", IntegerType())
.add("buffer", StringType()),
withSinglePartition=True,
buffer=argument.value,
)

def eval(self, argument, row: Row):
self._total += 1

def terminate(self):
yield self._total, self._buffer

self.spark.udtf.register("test_udtf", TestUDTF)

spark.sql(
"""
WITH t AS (
SELECT id FROM range(1, 21)
)
SELECT total, buffer
FROM test_udtf("abc", TABLE(t))
"""
).show()
Output
+-------+-------+
| count | buffer|
+-------+-------+
| 20 | "abc"|
+-------+-------+

Linhas de saída de rendimento

O método eval é executado uma vez para cada linha do argumento da tabela de entrada (ou apenas uma vez se nenhum argumento da tabela for fornecido), seguido por uma invocação do método terminate no final. Qualquer um dos métodos gera zero ou mais linhas que estão em conformidade com o esquema de resultados produzindo tuplas, listas ou objetos pyspark.sql.Row.

Esse exemplo retorna uma linha fornecendo uma tupla de três elementos:

Python
def eval(self, x, y, z):
yield (x, y, z)

Você também pode omitir os parênteses:

Python
def eval(self, x, y, z):
yield x, y, z

Adicione uma vírgula final para retornar uma linha com apenas uma coluna:

Python
def eval(self, x, y, z):
yield x,

Você também pode gerar um objeto pyspark.sql.Row.

Python
def eval(self, x, y, z)
from pyspark.sql.types import Row
yield Row(x, y, z)

Este exemplo produz linhas de saída do método terminate usando uma lista Python. Você pode armazenar o estado dentro da classe das etapas anteriores na avaliação da UDTF para essa finalidade.

Python
def terminate(self):
yield [self.x, self.y, self.z]

Passe argumentos escalares para uma UDTF

Você pode passar argumentos escalares para uma UDTF como expressões constantes que incluem valores literais ou funções baseadas neles. Por exemplo:

SQL
SELECT * FROM udtf(42, group => upper("finance_department"));

Passar argumentos da tabela para uma UDTF

Os UDTFs do Python podem aceitar uma tabela de entrada como argumento, além dos argumentos de entrada escalares. Uma única UDTF também pode aceitar um argumento de tabela e vários argumentos escalares.

Em seguida, qualquer consulta SQL pode fornecer uma tabela de entrada usando a palavra-chave TABLE seguida de parênteses envolvendo um identificador de tabela apropriado, como TABLE(t). Como alternativa, você pode passar uma mesa subconsulta, como TABLE(SELECT a, b, c FROM t) ou TABLE(SELECT t1.a, t2.b FROM t1 INNER JOIN t2 USING (key)).

O argumento da tabela de entrada é então representado como um argumento pyspark.sql.Row para o método eval, com uma chamada para o método eval para cada linha na tabela de entrada. O senhor pode usar a anotação padrão do campo de coluna PySpark para interagir com as colunas em cada linha. O exemplo a seguir demonstra o site importando explicitamente o tipo PySpark Row e, em seguida, filtrando a tabela passada no campo id:

Python
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="id: int")
class FilterUDTF:
def eval(self, row: Row):
if row["id"] > 5:
yield row["id"],

spark.udtf.register("filter_udtf", FilterUDTF)

Para consultar a função, use a palavra-chave TABLE SQL:

SQL
SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)));
Output
+---+
| id|
+---+
| 6|
| 7|
| 8|
| 9|
+---+

Especifique um particionamento das linhas de entrada das chamadas de função

Ao chamar um UDTF com um argumento de tabela, qualquer consulta SQL pode particionar a tabela de entrada em várias chamadas de UDTF com base nos valores de uma ou mais colunas da tabela de entrada.

Para especificar uma partição, use a cláusula PARTITION BY na chamada da função após o argumento TABLE. Isso garante que todas as linhas de entrada com cada combinação exclusiva de valores do as colunas de particionamento serão consumidas por exatamente uma instância da classe UDTF.

Observe que, além de referências simples de coluna, a cláusula PARTITION BY também aceita referências arbitrárias expressões baseadas nas colunas da tabela de entrada. Por exemplo, o senhor pode especificar o LENGTH de uma cadeia de caracteres , extrair um mês de uma data ou concatenar dois valores.

Também é possível especificar WITH SINGLE PARTITION em vez de PARTITION BY somente para solicitar uma partição em que todas as linhas de entrada devem ser consumidas por exatamente uma instância da classe UDTF.

Em cada partição, você pode, opcionalmente, especificar uma ordem necessária das linhas de entrada como O método eval da UDTF os consome. Para isso, forneça uma cláusula ORDER BY após a cláusula PARTITION BY ou WITH SINGLE PARTITION descrita acima.

Por exemplo, considere a seguinte UDTF:

Python
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="a: string, b: int")
class FilterUDTF:
def __init__(self):
self.key = ""
self.max = 0

def eval(self, row: Row):
self.key = row["a"]
self.max = max(self.max, row["b"])

def terminate(self):
yield self.key, self.max

spark.udtf.register("filter_udtf", FilterUDTF)

Você pode especificar opções de particionamento ao chamar o UDTF pela tabela de entrada de várias maneiras:

SQL
-- Create an input table with some example values.
DROP TABLE IF EXISTS values_table;
CREATE TABLE values_table (a STRING, b INT);
INSERT INTO values_table VALUES ('abc', 2), ('abc', 4), ('def', 6), ('def', 8)";
SELECT * FROM values_table;
Output
+-------+----+
| a | b |
+-------+----+
| "abc" | 2 |
| "abc" | 4 |
| "def" | 6 |
| "def" | 8 |
+-------+----+
SQL
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique value in the `a` column are processed by the same
-- instance of the UDTF class. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY a ORDER BY b) ORDER BY 1;
Output
+-------+----+
| a | b |
+-------+----+
| "abc" | 4 |
| "def" | 8 |
+-------+----+
SQL

-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique result of evaluating the "LENGTH(a)" expression are
-- processed by the same instance of the UDTF class. Within each partition, the rows are ordered
-- by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY LENGTH(a) ORDER BY b) ORDER BY 1;
Output
+-------+---+
| a | b |
+-------+---+
| "def" | 8 |
+-------+---+
SQL
-- Query the UDTF with the input table as an argument and a directive to consider all the input
-- rows in one single partition such that exactly one instance of the UDTF class consumes all of
-- the input rows. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) WITH SINGLE PARTITION ORDER BY b) ORDER BY 1;
Output
+-------+----+
| a | b |
+-------+----+
| "def" | 8 |
+-------+----+

Especifique um particionamento das linhas de entrada do método analyze

Observe que, para cada uma das formas acima de particionar a tabela de entrada ao chamar UDTFs em consultas SQL, há uma forma correspondente de o método analyze do UDTF especificar automaticamente o mesmo método de particionamento.

  • Em vez de chamar uma UDTF como SELECT * FROM udtf(TABLE(t) PARTITION BY a), você pode atualizar o método analyze para definir o campo partitionBy=[PartitioningColumn("a")] e simplesmente chamar a função usando SELECT * FROM udtf(TABLE(t)).
  • Da mesma forma, em vez de especificar TABLE(t) WITH SINGLE PARTITION ORDER BY b na consulta SQL, o senhor pode fazer com que analyze defina os campos withSinglePartition=true e orderBy=[OrderingColumn("b")] e, em seguida, passe apenas TABLE(t).
  • Em vez de passar TABLE(SELECT a FROM t) na consulta SQL, o senhor pode fazer com que analyze defina select=[SelectedColumn("a")] e, em seguida, passe apenas TABLE(t).

No exemplo a seguir, analyze retorna um esquema de saída constante, seleciona um subconjunto de colunas da tabela de entrada e especifica que a tabela de entrada seja particionada em várias chamadas UDTF com base nos valores da coluna date:

Python
@staticmethod
def analyze(*args) -> AnalyzeResult:
"""
The input table will be partitioned across several UDTF calls based on the monthly
values of each `date` column. The rows within each partition will arrive ordered by the `date`
column. The UDTF will only receive the `date` and `word` columns from the input table.
"""
from pyspark.sql.functions import (
AnalyzeResult,
OrderingColumn,
PartitioningColumn,
)

assert len(args) == 1, "This function accepts one argument only"
assert args[0].isTable, "Only table arguments are supported"
return AnalyzeResult(
schema=StructType()
.add("month", DateType())
.add('longest_word", IntegerType()),
partitionBy=[
PartitioningColumn("extract(month from date)")],
orderBy=[
OrderingColumn("date")],
select=[
SelectedColumn("date"),
SelectedColumn(
name="length(word),
alias="length_word")])