Funções escalares definidas pelo usuário - Python
Este artigo contém exemplos de funções definidas pelo usuário (UDF) do Python. Ele mostra como registrar UDFs, como invocar UDFs e fornece advertências sobre a ordem de avaliação de subexpressões no Spark SQL.
No Databricks Runtime 14.0 e acima, você pode usar funções de tabela definidas pelo usuário (UDTFs) do Python para funções de registro que retornam relações inteiras em vez de valores escalares. Consulte O que são funções de tabela definidas pelo usuário do Python?.
Observação
Em Databricks Runtime 12.2 LTS e abaixo, Python UDFs e Pandas UDFs não são suportados em Unity Catalog em compute que usa o modo de acesso compartilhado. Os UDFs Python escalares e os UDFs Pandas escalares são suportados em Databricks Runtime 13.3 LTS e acima para todos os modos de acesso.
Em Databricks Runtime 13.3 LTS e acima, é possível registrar UDFs escalares Python para Unity Catalog usando a sintaxe SQL. Consulte Funções definidas pelo usuário (UDFs) no Unity Catalog.
As instâncias Graviton não oferecem suporte a UDFs Python em clusters habilitados para Unity Catalog.
Importante
UDFs e UDAFs não são suportados em clusters Graviton configurados com modo de acesso compartilhado e Unity Catalog.
registrar uma função como um UDF
def squared(s):
return s * s
spark.udf.register("squaredWithPython", squared)
Você pode, opcionalmente, definir o tipo de retorno do seu UDF. O tipo de retorno default é StringType
.
from pyspark.sql.types import LongType
def squared_typed(s):
return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())
Chame o UDF no Spark SQL
spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test
Usar UDF com DataFrames
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
Como alternativa, você pode declarar o mesmo UDF usando a sintaxe de anotação:
from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
Ordem de avaliação e verificação nula
O Spark SQL (incluindo SQL e DataFrame e API dataset ) não garante a ordem de avaliação das subexpressões. Em particular, as entradas de um operador ou função não são necessariamente avaliadas da esquerda para a direita ou em qualquer outra ordem fixa. Por exemplo, as expressões lógicas AND
e OR
não têm semântica de "curto-circuito" da esquerda para a direita.
Portanto, é perigoso confiar nos efeitos colaterais ou na ordem de avaliação das expressões Boolean e na ordem das cláusulas WHERE
e HAVING
, pois essas expressões e cláusulas podem ser reordenadas durante a otimização e o planejamento query . Especificamente, se um UDF depende da semântica de curto-circuito no SQL para verificação nula, não há garantia de que a verificação nula ocorrerá antes de invocar o UDF. Por exemplo,
spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee
Esta cláusula WHERE
não garante que o strlen
UDF seja invocado após a filtragem de nulos.
Para executar a verificação nula adequada, recomendamos que você faça um dos seguintes:
Torne o próprio UDF com reconhecimento nulo e faça a verificação nula dentro do próprio UDF
Use as expressões
IF
ouCASE WHEN
para fazer a verificação nula e invocar a UDF em uma ramificação condicional
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") // ok