O que são funções definidas pelo usuário (UDFs)?
As funções definidas pelo usuário (UDFs) permitem que o senhor reutilize e compartilhe códigos que ampliam a funcionalidade integrada em Databricks. Use UDFs para executar tarefas específicas, como cálculos complexos, transformações ou manipulações de dados personalizadas.
Quando usar um UDF em vez de uma função do Apache Spark?
Use UDFs para lógica que seja difícil de expressar com funções integradas Apache Spark. integrada Apache Spark funções são otimizadas para processamento distribuído e oferecem melhor desempenho em escala. Para obter mais informações, consulte Funções.
Databricks recomenda UDFs para consultas ad hoc, limpeza manual de dados, análise exploratória de dados e operações em conjuntos de dados de pequeno e médio porte. Os casos de uso comuns de UDFs incluem criptografia, descriptografia, hashing, análise de JSON e validação de dados.
Use os métodos Apache Spark para operações em conjuntos de dados muito grandes e quaisquer cargas de trabalho executadas regularmente ou continuamente, incluindo ETL Job e operações de transmissão.
Compreender os tipos de UDF
Selecione um tipo de UDF na guia a seguir para ver uma descrição, um exemplo e um link para saber mais.
- Scalar UDF
- Batch Scalar UDFs
- Non-Scalar UDFs
- UDAF
- UDTFs
As UDFs escalares operam em uma única linha e retornam um único valor de resultado para cada linha. Eles podem ser controlados pelo Unity Catalog ou com escopo de sessão.
O exemplo a seguir usa um UDF escalar para calcular o comprimento de cada nome em uma coluna name
e adicionar o valor em uma nova coluna name_length
.
+-------+-------+
| name | score |
+-------+-------+
| alice | 10.0 |
| bob | 20.0 |
| carol | 30.0 |
| dave | 40.0 |
| eve | 50.0 |
+-------+-------+
-- Create a SQL UDF for name length
CREATE OR REPLACE FUNCTION main.test.get_name_length(name STRING)
RETURNS INT
RETURN LENGTH(name);
-- Use the UDF in a SQL query
SELECT name, main.test.get_name_length(name) AS name_length
FROM your_table;
+-------+-------+-------------+
| name | score | name_length |
+-------+-------+-------------+
| alice | 10.0 | 5 |
| bob | 20.0 | 3 |
| carol | 30.0 | 5 |
| dave | 40.0 | 4 |
| eve | 50.0 | 3 |
+-------+-------+-------------+
Para implementar isso em um notebook Databricks usando PySpark:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
@udf(returnType=IntegerType())
def get_name_length(name):
return len(name)
df = df.withColumn("name_length", get_name_length(df.name))
# Show the result
display(df)
Consulte Funções definidas pelo usuário (UDFs) no Unity Catalog e Funções escalares definidas pelo usuário - Python.
Processar dados em lotes, mantendo a paridade de linha de entrada/saída de 1:1. Isso reduz a sobrecarga das operações linha a linha para o processamento de dados em grande escala. Os UDFs de lotes também mantêm o estado entre os lotes para execução mais eficiente, reutilizam recursos e lidam com cálculos complexos que precisam de contexto em blocos de dados.
Eles podem ser controlados pelo Unity Catalog ou com escopo de sessão.
Os seguintes lotes Unity Catalog Python UDF calculam o IMC enquanto processam lotes de linhas:
+-------------+-------------+
| weight_kg | height_m |
+-------------+-------------+
| 90 | 1.8 |
| 77 | 1.6 |
| 50 | 1.5 |
+-------------+-------------+
%sql
CREATE OR REPLACE FUNCTION main.test.calculate_bmi_pandas(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple
def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for weight_series, height_series in batch_iter:
yield weight_series / (height_series ** 2)
$$;
select main.test.calculate_bmi_pandas(cast(70 as double), cast(1.8 as double));
+--------+
| BMI |
+--------+
| 27.8 |
| 30.1 |
| 22.2 |
+--------+
Consulte Funções definidas pelo usuário (UDFs) em Unity Catalog e lotes Python Funções definidas pelo usuário (UDFs) em Unity Catalog.
Os UDFs não escalares operam em conjuntos de dados/colunas inteiros com relações de entrada/saída flexíveis (1 ou muitos).
Lotes com escopo de sessão Pandas Os UDFs podem ser dos seguintes tipos:
- Série para série
- Iterador de série para iterador de série
- Iterador de várias séries para iterador de séries
- Série em escalar
A seguir, um exemplo de uma série para série Pandas UDF.
from pyspark.sql.functions import pandas_udf
import pandas as pd
df = spark.createDataFrame([(70, 1.75), (80, 1.80), (60, 1.65)], ["Weight", "Height"])
@pandas_udf("double")
def calculate_bmi_pandas(weight: pd.Series, height: pd.Series) -> pd.Series:
return weight / (height ** 2)
df.withColumn("BMI", calculate_bmi_pandas(df["Weight"], df["Height"])).display()
Consulte Pandas funções definidas pelo usuário.
Os UDAFs operam em várias linhas e retornam um único resultado agregado. Os UDAFs têm escopo de sessão apenas.
O exemplo de UDAF a seguir agrega pontuações por tamanho de nome.
from pyspark.sql.functions import pandas_udf
from pyspark.sql import SparkSession
import pandas as pd
# Define a pandas UDF for aggregating scores
@pandas_udf("int")
def total_score_udf(scores: pd.Series) -> int:
return scores.sum()
# Group by name length and aggregate
result_df = (df.groupBy("name_length")
.agg(total_score_udf(df["score"]).alias("total_score")))
display(result_df)
+-------------+-------------+
| name_length | total_score |
+-------------+-------------+
| 3 | 70.0 |
| 4 | 40.0 |
| 5 | 40.0 |
+-------------+-------------+
Consulte Pandas funções definidas pelo usuário para Python e Funções agregadas definidas pelo usuário - Scala.
Uma UDTF usa um ou mais argumentos de entrada e retorna várias linhas (e possivelmente várias colunas) para cada linha de entrada. Os UDTFs têm apenas o escopo da sessão.
No exemplo a seguir, cada valor na coluna de pontuação corresponde a uma lista de categorias. A UDTF divide a lista separada por vírgulas em várias linhas.
+-------+-------+-----------------+
| name | score | categories |
+-------+-------+-----------------+
| alice | 10.0 | math,science |
| bob | 20.0 | history,math |
| carol | 30.0 | science,history |
| dave | 40.0 | math,art |
| eve | 50.0 | science,art |
+-------+-------+-----------------+
from pyspark.sql.functions import udtf
@udtf(returnType="name: string, score: int, category: string")
class ScoreCategoriesUDTF:
def eval(self, name: str, score: int, categories: str):
category_list = categories.split(',')
for category in category_list:
yield (name, score, category)
ScoreCategoriesUDTF(lit("Alice"), lit(85), lit("Math,Science,English")).display()
+-------+-------+----------+
| name | score | category |
+-------+-------+----------+
| alice | 10.0 | math |
| alice | 10.0 | science |
| bob | 20.0 | history |
| bob | 20.0 | math |
| carol | 30.0 | science |
| carol | 30.0 | history |
| dave | 40.0 | math |
| dave | 40.0 | art |
| eve | 50.0 | science |
| eve | 50.0 | art |
+-------+-------+----------+
Consulte Funções de tabela definidas pelo usuário (UDTFs) do Python.
UDFs governados pelo Unity Catalog vs. UDFs com escopo de sessão
Unity Catalog Python UDFs e lotes Unity Catalog Python Os UDFs são mantidos em Unity Catalog para melhorar a governança, a reutilização e a capacidade de descoberta. Todos os outros UDFs são baseados em sessão, o que significa que são definidos em um Notebook ou Job e têm escopo para o site atual SparkSession. O senhor pode definir e acessar UDFs com escopo de sessão usando Scala ou Python.
Folha de dicas de UDFs governados pelo Unity Catalog
As UDFs governadas pelo Unity Catalog permitem que funções personalizadas sejam definidas, usadas, compartilhadas com segurança e governadas em ambientes de computação. Consulte Funções definidas pelo usuário (UDFs) no Unity Catalog.
Tipo de UDF | Com suporte compute | Descrição |
---|---|---|
Unity Catalog Python UDF |
| Defina um UDF em Python e registre-o em Unity Catalog para governança. As UDFs escalares operam em uma única linha e retornam um único valor de resultado para cada linha. |
lotes Unity Catalog Python UDFs |
| Defina um UDF em Python e registre-o em Unity Catalog para governança. lotes operações em vários valores e retornam vários valores. Reduz a sobrecarga de operações linha a linha para processamento de dados em grande escala. |
Folha de dicas de UDFs com escopo de sessão para usuários isolados compute
Os UDFs com escopo de sessão são definidos em um Notebook ou Job e têm escopo para o site atual SparkSession. O senhor pode definir e acessar UDFs com escopo de sessão usando Scala ou Python.
Tipo de UDF | Versão do Databricks Runtime | Com suporte compute | Descrição |
---|---|---|---|
Escalar Python |
| As UDFs escalares operam em uma única linha e retornam um único valor de resultado para cada linha. | |
Pandas UDF (vetorizado) |
| Pandas Os UDFs usam o Apache Arrow para transferir dados e o Pandas para trabalhar com os dados. Os UDFs do Pandas oferecem suporte a operações vetorizadas que podem aumentar muito o desempenho em relação aos UDFs escalonados linha a linha.length. | |
Python UDTFs |
| Uma UDTF usa um ou mais argumentos de entrada e retorna várias linhas (e possivelmente várias colunas) para cada linha de entrada. | |
UDFs escalares em Scala |
| As UDFs escalares operam em uma única linha e retornam um único valor de resultado para cada linha. | |
UDAFs em Scala |
| Os UDAFs operam em várias linhas e retornam um único resultado agregado. |
Considerações sobre o desempenho
-
As funções integradas e os UDFs do siteSQL são as opções mais eficientes.
-
As UDFs do Scala geralmente são mais rápidas do que as UDFs do Python.
- Unisolated Scala Os UDFs são executados na máquina virtual Java (JVM), portanto, evitam a sobrecarga de mover dados para dentro e para fora do JVM.
- Os UDFs isolados do Scala precisam mover os dados para dentro e para fora da JVM, mas ainda assim podem ser mais rápidos do que os UDFs do Python porque lidam com a memória de forma mais eficiente.
-
Python Os UDFs e osPandas UDFs tendem a ser mais lentos do que os Scala UDFs porque precisam serializar os dados e transferi-los do JVM para o interpretador Python.
- As UDFs do Pandas são até 100 vezes mais rápidas do que as UDFs do Python porque usam o Apache Arrow para reduzir os custos de serialização.