Pandas funções definidas pelo usuário
Uma função definida pelo usuário Pandas (UDF) - também conhecida como UDFvetorizada - é uma função definida pelo usuário que usa a setaApache para transferir dados e Pandas para trabalhar com os dados. Pandas Os UDFs permitem operações vetorizadas que podem aumentar o desempenho em até 100 vezes em comparação com os Python UDFs row-at-a-time.
Para obter informações básicas, consulte a postagem no blog New Pandas UDFs and Python Type Hints in the Upcoming Release of Apache Spark 3.0.
O senhor define uma Pandas UDF usando a palavra-chave pandas_udf
como um decorador e envolve a função com uma dica de tipoPython.
Este artigo descreve os diferentes tipos de Pandas UDFs e mostra como usar Pandas UDFs com dicas de tipo.
UDF de série para série
O senhor usa um Series to Series Pandas UDF para vetorizar operações escalares.
O senhor pode usá-las com APIs como select
e withColumn
.
A função Python deve receber uma série Pandas como entrada e retornar uma série Pandas do mesmo comprimento, e o senhor deve especificá-las nas dicas de tipo Python. Spark execução a Pandas UDF dividindo as colunas em lotes, chamando a função para cada lote como um subconjunto dos dados e, em seguida, concatenando os resultados.
O exemplo a seguir mostra como criar um Pandas UDF que calcula o produto de 2 colunas.
import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType
# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
return a * b
multiply = pandas_udf(multiply_func, returnType=LongType())
# The function for a pandas_udf should be able to execute with local pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0 1
# 1 4
# 2 9
# dtype: int64
# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))
# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# | 1|
# | 4|
# | 9|
# +-------------------+
Iterador de série para Iterador de série UDF
Um iterador UDF é o mesmo que um escalar Pandas UDF , exceto:
-
A função Python
- Recebe um iterador de lotes em vez de um único lote de entrada como entrada.
- Retorna um iterador de lotes de saída em vez de um único lote de saída.
-
O comprimento de toda a saída no iterador deve ser igual ao comprimento de toda a entrada.
-
O wrapped Pandas UDF usa uma única coluna Spark como entrada.
O senhor deve especificar a dica de tipo Python como
Iterator[pandas.Series]
-> Iterator[pandas.Series]
.
Esse Pandas UDF é útil quando a execução do UDF requer a inicialização de algum estado, por exemplo, carregar um arquivo de modelo de aprendizado de máquina para aplicar inferência a cada lote de entrada.
O exemplo a seguir mostra como criar um Pandas UDF com suporte a iteradores.
import pandas as pd
from typing import Iterator
from pyspark.sql.functions import col, pandas_udf, struct
pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)
# When the UDF is called with the column,
# the input to the underlying function is an iterator of pd.Series.
@pandas_udf("long")
def plus_one(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
for x in batch_iter:
yield x + 1
df.select(plus_one(col("x"))).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# | 2|
# | 3|
# | 4|
# +-----------+
# In the UDF, you can initialize some state before processing batches.
# Wrap your code with try/finally or use context managers to ensure
# the release of resources at the end.
y_bc = spark.sparkContext.broadcast(1)
@pandas_udf("long")
def plus_y(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
y = y_bc.value # initialize states
try:
for x in batch_iter:
yield x + y
finally:
pass # release resources here, if any
df.select(plus_y(col("x"))).show()
# +---------+
# |plus_y(x)|
# +---------+
# | 2|
# | 3|
# | 4|
# +---------+
Iterador de várias séries para Iterador de séries UDF
Um Iterator of multiple Series to Iterator of Series UDF tem características semelhantes e restrições como o Iterator of Series to Iterator of Series UDF. A função especificada recebe um iterador de lotes e gera um iterador de lotes. Também é útil quando a execução do UDF exige a inicialização de algum estado do site .
As diferenças são:
- A função Python subjacente usa um iterador de uma tupla de Pandas Series.
- O wrapped Pandas UDF usa várias colunas Spark como entrada.
Você especifica as dicas de tipo como Iterator[Tuple[pandas.Series, ...]]
- > Iterator[pandas.Series]
.
from typing import Iterator, Tuple
import pandas as pd
from pyspark.sql.functions import col, pandas_udf, struct
pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)
@pandas_udf("long")
def multiply_two_cols(
iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for a, b in iterator:
yield a * b
df.select(multiply_two_cols("x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# | 1|
# | 4|
# | 9|
# +-----------------------+
Série para UDF escalar
Séries para escalar Pandas UDFs são semelhantes às funções agregadas Spark.
Uma Series to scalar Pandas UDF define uma agregação de uma ou mais
Pandas Series a um valor escalar, em que cada Pandas Series representa uma coluna Spark.
O senhor usa uma série para escalonar Pandas UDF com APIs, como select
, withColumn
, groupBy.agg
e
PySpark.sql.Window.
Você expressa a dica de tipo como pandas.Series, ...
- > Any
. O tipo de retorno deve ser um tipo de dados primitivo
, e o escalar retornado pode ser um tipo primitivo Python, por exemplo,
int
ou float
ou um tipo de dados NumPy, como numpy.int64
ou numpy.float64
. Any
deveria idealmente
seja um tipo escalar específico.
Esse tipo de UDF não oferece suporte à agregação parcial e todos os dados de cada grupo são carregados na memória.
O exemplo a seguir mostra como usar esse tipo de UDF para compute média com select
, groupBy
e window
operações:
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
# Declare the function and create the UDF
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
return v.mean()
df.select(mean_udf(df['v'])).show()
# +-----------+
# |mean_udf(v)|
# +-----------+
# | 4.2|
# +-----------+
df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# | 1| 1.5|
# | 2| 6.0|
# +---+-----------+
w = Window \
.partitionBy('id') \
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id| v|mean_v|
# +---+----+------+
# | 1| 1.0| 1.5|
# | 1| 2.0| 1.5|
# | 2| 3.0| 6.0|
# | 2| 5.0| 6.0|
# | 2|10.0| 6.0|
# +---+----+------+
Para obter detalhes sobre o uso, consulte PySpark.sql.functions.pandas_udf.
Uso
Definição do tamanho dos lotes Arrow
Essa configuração não afeta o site compute configurado com o modo de acesso padrão e Databricks Runtime 13.3 LTS a 14.2.
As partições de dados no Spark são convertidas em lotes de registros Arrow, que
podem levar temporariamente a um alto uso de memória na JVM. Para evitar possíveis
exceções de falta de memória, o senhor pode ajustar o tamanho dos lotes de registros do Arrow
definindo a configuração spark.sql.execution.arrow.maxRecordsPerBatch
como um número inteiro que
determina o número máximo de linhas para cada lote. O valor default é de 10.000 registros por lote. Se o número de colunas for grande, o
o valor deve ser ajustado em conformidade. Usando esse limite, cada partição de dados
é dividida em 1 ou mais lotes de registros para processamento.
Carimbo de data/hora com semântica de fuso horário
O Spark armazena internamente os carimbos de data/hora como valores UTC, e os dados de carimbo de data/hora trazidos sem um fuso horário especificado são convertidos como hora local para UTC com resolução de microssegundos.
Quando os dados de registro de data e hora são exportados ou exibidos no Spark,
o fuso horário da sessão é usado para localizar os valores de registro de data e hora
. O fuso horário da sessão é definido com a configuração
spark.sql.session.timeZone
e tem como padrão o fuso horário local do sistema JVM. Pandas usa um tipo datetime64
com resolução de nanossegundos, datetime64[ns]
, com fuso horário opcional por coluna.
Quando os dados de registro de data e hora são transferidos de Spark para Pandas, eles são convertidos em nanossegundos e cada coluna é convertida para o fuso horário da sessão Spark
e, em seguida, localizada para esse fuso horário, o que remove o fuso horário e exibe os valores como hora local. Isso ocorre quando
chamando toPandas()
ou pandas_udf
com colunas de carimbo de data/hora.
Quando os dados de registro de data e hora são transferidos de Pandas para Spark, eles são convertidos em microssegundos UTC. Isso ocorre ao chamar
createDataFrame
com um Pandas DataFrame ou ao retornar um registro de data e hora de um Pandas UDF. Essas conversões são feitas
automaticamente para garantir que o Spark tenha dados no formato esperado, portanto,
não é necessário fazer nenhuma dessas conversões por conta própria. Qualquer
os valores de nanossegundos são truncados.
Um UDF padrão carrega dados de carimbo de data/hora como objetos Python datetime, o que é diferente de um carimbo de data/hora Pandas. Para obter o melhor desempenho, recomendamos que o senhor use a funcionalidade de série temporal do site Pandas ao trabalhar com carimbos de data/hora em um site Pandas UDF. Para obter detalhes, consulte Funcionalidade de séries temporais/datas.
Exemplo de notebook
O Notebook a seguir ilustra as melhorias de desempenho que o senhor pode obter com os UDFs do site Pandas: