Funções escalares definidas pelo usuário - Python

Este artigo contém exemplos de funções definidas pelo usuário (UDF) do Python. Ele mostra como registrar UDFs, como invocar UDFs e fornece advertências sobre a ordem de avaliação de subexpressões no Spark SQL.

No Databricks Runtime 14.0 e acima, é possível usar as funções de tabela definidas pelo usuário (UDTFs) do Python para registrar funções que retornam relações inteiras em vez de valores escalares. Consulte Funções de tabela definidas pelo usuário (UDTFs) do Python.

Observação

Em Databricks Runtime 12.2 LTS e abaixo, Python UDFs e Pandas UDFs não são suportados em Unity Catalog em compute que usa o modo de acesso compartilhado. Os UDFs Python escalares e os UDFs Pandas escalares são suportados em Databricks Runtime 13.3 LTS e acima para todos os modos de acesso.

Em Databricks Runtime 13.3 LTS e acima, é possível registrar UDFs escalares Python para Unity Catalog usando a sintaxe SQL. Consulte Funções definidas pelo usuário (UDFs) no Unity Catalog.

Graviton O suporte de instância para Python UDFs no Unity Catalog habilitado clusters está disponível em Databricks Runtime 15.2 e acima.

Importante

UDFs e UDAFs são compatíveis com Graviton clusters configurado com o modo de acesso compartilhado e Unity Catalog em Databricks Runtime 15.2 e acima.

registrar uma função como um UDF

def squared(s):
  return s * s
spark.udf.register("squaredWithPython", squared)

Você pode, opcionalmente, definir o tipo de retorno do seu UDF. O tipo de retorno default é StringType.

from pyspark.sql.types import LongType
def squared_typed(s):
  return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())

Chame o UDF no Spark SQL

spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test

Usar UDF com DataFrames

from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

Como alternativa, você pode declarar o mesmo UDF usando a sintaxe de anotação:

from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
  return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

Ordem de avaliação e verificação nula

O Spark SQL (incluindo SQL e DataFrame e API dataset ) não garante a ordem de avaliação das subexpressões. Em particular, as entradas de um operador ou função não são necessariamente avaliadas da esquerda para a direita ou em qualquer outra ordem fixa. Por exemplo, as expressões lógicas AND e OR não têm semântica de "curto-circuito" da esquerda para a direita.

Portanto, é perigoso confiar nos efeitos colaterais ou na ordem de avaliação das expressões Boolean e na ordem das cláusulas WHERE e HAVING, pois essas expressões e cláusulas podem ser reordenadas durante a otimização e o planejamento query . Especificamente, se um UDF depende da semântica de curto-circuito no SQL para verificação nula, não há garantia de que a verificação nula ocorrerá antes de invocar o UDF. Por exemplo,

spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee

Esta cláusula WHERE não garante que o strlen UDF seja invocado após a filtragem de nulos.

Para executar a verificação nula adequada, recomendamos que você faça um dos seguintes:

  • Torne o próprio UDF com reconhecimento nulo e faça a verificação nula dentro do próprio UDF

  • Use as expressões IF ou CASE WHEN para fazer a verificação nula e invocar a UDF em uma ramificação condicional

spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1")   // ok

Limitações

  • PySpark Os UDFs em clusters ou serverless compute compartilhados não podem acessar Git pastas, workspace arquivos ou UC Volumes para importar módulos em Databricks Runtime 14.2 e abaixo.