Funções escalares definidas pelo usuário - Python
Este artigo contém Python exemplos de funções definidas pelo usuário (UDF). Ele mostra como registrar UDFs, como invocar UDFs e fornece advertências sobre a ordem de avaliação de subexpressões em Spark SQL.
No Databricks Runtime 14.0 e acima, é possível usar as funções de tabela definidas pelo usuário (UDTFs) do Python para registrar funções que retornam relações inteiras em vez de valores escalares. Consulte Funções de tabela definidas pelo usuário (UDTFs) do Python.
Em Databricks Runtime 12.2 LTS e abaixo, Python UDFs e Pandas UDFs não são suportados em Unity Catalog compute que usa o modo de acesso padrão. Os UDFs escalares Python e Pandas são suportados em Databricks Runtime 13.3 LTS e acima para todos os modos de acesso.
Em Databricks Runtime 13.3 LTS e acima, é possível registrar UDFs escalares Python para Unity Catalog usando a sintaxe SQL. Consulte Funções definidas pelo usuário (UDFs) no Unity Catalog.
Graviton O suporte de instância para Python UDFs no clustering habilitado para o Unity Catalog está disponível em Databricks Runtime 15.2 e acima.
Os UDFs e UDAFs são compatíveis com o clustering Graviton configurado com o modo de acesso padrão e Unity Catalog em Databricks Runtime 15.2 e acima.
registrar uma função como um UDF
def squared(s):
return s * s
spark.udf.register("squaredWithPython", squared)
Opcionalmente, o senhor pode definir o tipo de retorno do seu UDF. O tipo de retorno do default é StringType
.
from pyspark.sql.types import LongType
def squared_typed(s):
return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())
Chamar o UDF no Spark SQL
spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test
Usar UDF com DataFrames
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
Como alternativa, o senhor pode declarar o mesmo UDF usando a sintaxe de anotação:
from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
Ordem de avaliação e verificação de nulos
Spark SQL (incluindo SQL e DataFrame e o conjunto de dados API) não garante a ordem de avaliação das subexpressões. Em particular, as entradas de um operador ou função não são
necessariamente avaliado da esquerda para a direita ou em qualquer outra ordem fixa. Por exemplo, lógico AND
e as expressões OR
não têm semântica de “curto-circuito” da esquerda para a direita.
Portanto, é perigoso confiar nos efeitos colaterais ou na ordem de avaliação das expressões Boolean
e na ordem das cláusulas WHERE
e HAVING
, pois essas expressões e cláusulas podem ser reordenadas durante a otimização e o planejamento da consulta. Especificamente, se um UDF depender da semântica de curto-circuito no SQL para verificação de nulidade, não há garantia de que a verificação de nulidade ocorrerá antes de invocar o UDF. Por exemplo,
spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee
Essa cláusula WHERE
não garante que o UDF strlen
seja invocado após a filtragem de nulos.
Para realizar uma verificação de nulo adequada, recomendamos que você faça o seguinte:
- Tornar o próprio UDF sensível a nulidade e fazer a verificação de nulidade dentro do próprio UDF
- Use as expressões
IF
ouCASE WHEN
para fazer a verificação de nulidade e chamar o UDF em uma ramificação condicional
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") // ok
Limitações
As seguintes limitações se aplicam às UDFs do PySpark:
-
Restrições de importação de módulos: PySpark Os UDFs em clustering de modo de acesso padrão e serverless compute não podem acessar pastas Git, arquivos workspace ou volumes Unity Catalog para importar módulos em Databricks Runtime 14.2 e abaixo.
-
Variáveis de difusão: PySpark UDFs em clustering de modo de acesso padrão e serverless compute não oferecem suporte a variáveis de broadcast.
-
perfil de instância: PySpark UDFs em clustering de modo de acesso padrão e serverless compute não são compatíveis com o perfil de instância.
-
Limite de memória : PySpark UDFs em serverless compute têm um limite de memória de 1 GB por PySpark UDF. Exceder esse limite resulta no seguinte erro:
[UDF_PYSPARK_ERROR.OOM] Python worker exited unexpectedly (crashed) due to running out of memory.