pandas_udf
Cria uma função definida pelo usuário Pandas .
As UDFs Pandas são funções definidas pelo usuário que são executadas pelo Spark usando o Arrow para transferir dados e Pandas para trabalhar com os dados, o que permite que Pandas realize operações. Uma UDF (Função Definida pelo Usuário) do Pandas é definida usando pandas_udf como um decorador ou para envolver a função, e nenhuma configuração adicional é necessária. Uma UDF (Função Definida pelo Usuário) do Pandas se comporta, em geral, como uma API de função PySpark comum.
Sintaxe
import pyspark.sql.functions as sf
# As a decorator
@sf.pandas_udf(returnType=<returnType>, functionType=<functionType>)
def function_name(col):
# function body
pass
# As a function wrapper
sf.pandas_udf(f=<function>, returnType=<returnType>, functionType=<functionType>)
Parâmetros
Parâmetro | Tipo | Descrição |
|---|---|---|
|
| Opcional. Função definida pelo usuário. Uma função Python, se usada como uma função independente. |
|
| Opcional. O tipo de retorno da função definida pelo usuário. O valor pode ser um objeto DataType ou uma cadeia de caracteres formatada em DDL. |
|
| Opcional. Um valor de enumeração em PandasUDFType. padrão: ESCALAR. Este parâmetro existe para fins de compatibilidade. É recomendável o uso de dicas de tipo em Python. |
Exemplos
Exemplo 1 : Série para Série - Converter strings para maiúsculas.
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf("string")
def to_upper(s: pd.Series) -> pd.Series:
return s.str.upper()
df = spark.createDataFrame([("John Doe",)], ("name",))
df.select(to_upper("name")).show()
+--------------+
|to_upper(name)|
+--------------+
| JOHN DOE|
+--------------+
Exemplo 2 : Série para Série com argumentos nomeados.
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import IntegerType
from pyspark.sql import functions as sf
@pandas_udf(returnType=IntegerType())
def calc(a: pd.Series, b: pd.Series) -> pd.Series:
return a + 10 * b
spark.range(2).select(calc(b=sf.col("id") * 10, a=sf.col("id"))).show()
+-----------------------------+
|calc(b => (id * 10), a => id)|
+-----------------------------+
| 0|
| 101|
+-----------------------------+
Exemplo 3 : Iterador de Série para Iterador de Série.
import pandas as pd
from typing import Iterator
from pyspark.sql.functions import pandas_udf
@pandas_udf("long")
def plus_one(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
for s in iterator:
yield s + 1
df = spark.createDataFrame(pd.DataFrame([1, 2, 3], columns=["v"]))
df.select(plus_one(df.v)).show()
+-----------+
|plus_one(v)|
+-----------+
| 2|
| 3|
| 4|
+-----------+
Exemplo 4 : Série para Escalar - Agregação agrupada.
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
return v.mean()
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v"))
df.groupby("id").agg(mean_udf(df['v'])).show()
+---+-----------+
| id|mean_udf(v)|
+---+-----------+
| 1| 1.5|
| 2| 6.0|
+---+-----------+
Exemplo 5 : Conversão de séries para escalares com funções de janela.
import pandas as pd
from pyspark.sql import Window
from pyspark.sql.functions import pandas_udf
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
return v.mean()
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v"))
w = Window.partitionBy('id').orderBy('v').rowsBetween(-1, 0)
df.withColumn('mean_v', mean_udf("v").over(w)).show()
+---+----+------+
| id| v|mean_v|
+---+----+------+
| 1| 1.0| 1.0|
| 1| 2.0| 1.5|
| 2| 3.0| 3.0|
| 2| 5.0| 4.0|
| 2|10.0| 7.5|
+---+----+------+
Exemplo 6 : Iterador de Série para Escalar - Agregação agrupada com uso eficiente de memória.
import pandas as pd
from typing import Iterator
from pyspark.sql.functions import pandas_udf
@pandas_udf("double")
def pandas_mean_iter(it: Iterator[pd.Series]) -> float:
sum_val = 0.0
cnt = 0
for v in it:
sum_val += v.sum()
cnt += len(v)
return sum_val / cnt
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v"))
df.groupby("id").agg(pandas_mean_iter(df['v'])).show()
+---+-------------------+
| id|pandas_mean_iter(v)|
+---+-------------------+
| 1| 1.5|
| 2| 6.0|
+---+-------------------+