Funções escalares definidas pelo usuário - Scala
Este artigo contém Scala exemplos de funções definidas pelo usuário (UDF). Ele mostra como registrar UDFs, como invocar UDFs e advertências sobre a ordem de avaliação de subexpressões em Spark SQL. Consulte Funções escalares externas definidas pelo usuário (UDFs) para obter mais detalhes.
Requisitos
-
As UDFs Scala em recursos compute habilitados para o catálogo Unity com modo de acesso padrão exigem Databricks Runtime 14.2 ou superior.
-
O suporte a instâncias ARM para UDFs Scala em clusters com Unity Catalog habilitado requer Databricks Runtime 15.2 ou superior.
registrar uma função como um UDF
val squared = (s: Long) => {
s * s
}
spark.udf.register("square", squared)
Chamar o UDF no Spark SQL
spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, square(id) as id_squared from test
Usar UDF com DataFrames
import org.apache.spark.sql.functions.{col, udf}
val squared = udf((s: Long) => s * s)
display(spark.range(1, 20).select(squared(col("id")) as "id_squared"))
Ordem de avaliação e verificação de nulos
Spark SQL (incluindo SQL e DataFrame e o conjunto de dados APIs) não garante a ordem de avaliação das subexpressões. Em particular, as entradas de um operador ou função não são necessariamente avaliadas da esquerda para a direita ou em qualquer outra ordem fixa. Por exemplo, as expressões lógicas AND e OR não têm semântica de “curto-circuito” da esquerda para a direita.
Portanto, é perigoso confiar nos efeitos colaterais ou na ordem de avaliação das expressões Boolean e na ordem das cláusulas WHERE e HAVING, pois essas expressões e cláusulas podem ser reordenadas durante a otimização e o planejamento da consulta. Especificamente, se um UDF depender da semântica de curto-circuito no SQL para verificação de nulidade, não há garantia de que a verificação de nulidade ocorrerá antes de invocar o UDF. Por exemplo,
spark.udf.register("strlen", (s: String) => s.length)
spark.sql("select s from test1 where s is not null and strlen(s) > 1") // no guarantee
Essa cláusula WHERE não garante que o UDF strlen seja invocado após a filtragem de nulos.
Para realizar uma verificação de nulo adequada, recomendamos que você faça o seguinte:
- Tornar o próprio UDF sensível a nulidade e fazer a verificação de nulidade dentro do próprio UDF
- Use as expressões
IFouCASE WHENpara fazer a verificação de nulidade e chamar o UDF em uma ramificação condicional
spark.udf.register("strlen_nullsafe", (s: String) => if (s != null) s.length else -1)
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") // ok
Conjunto de dados digitados APIs
Esse recurso é suportado no clustering habilitado para o Unity Catalog com modo de acesso padrão em Databricks Runtime 15.4 e acima.
Conjunto de dados digitado APIs permite a execução de transformações como mapa, filtro e agregações no conjunto de dados resultante com uma função definida pelo usuário.
Por exemplo, o aplicativo Scala a seguir usa o map() API para modificar um número em uma coluna de resultados para uma cadeia de caracteres prefixada.
spark.range(3).map(f => s"row-$f").show()
Embora este exemplo use o map() API, isso também se aplica a outros conjuntos de dados digitados APIs, como filter(), mapPartitions(), foreach(), foreachPartition(), reduce() e flatMap().
Scala UDF Recurso e compatibilidade Databricks Runtime
Os seguintes recursos do Scala requerem versões mínimas do Databricks Runtime quando usados em clustering habilitado para Unity Catalog no modo de acesso padrão (compartilhado).
Recurso | Versão mínima do Databricks Runtime |
|---|---|
UDFs escalares | Databricks Runtime 14.2 |
| Databricks Runtime 15.4 |
| Databricks Runtime 15.4 |
(transmissão) | Databricks Runtime 15.4 |
(transmissão) | Databricks Runtime 16.1 |
(transmissão) | Databricks Runtime 16.2 |