Pular para o conteúdo principal

Pandas funções definidas pelo usuário

Uma função definida pelo usuário Pandas (UDF) - também conhecida como UDFvetorizada - é uma função definida pelo usuário que usa a setaApache para transferir dados e Pandas para trabalhar com os dados. Pandas Os UDFs permitem operações vetorizadas que podem aumentar o desempenho em até 100 vezes em comparação com os Python UDFs row-at-a-time.

Para obter informações básicas, consulte a postagem no blog New Pandas UDFs and Python Type Hints in the Upcoming Release of Apache Spark 3.0.

O senhor define uma Pandas UDF usando a palavra-chave pandas_udf como um decorador e envolve a função com uma dica de tipoPython. Este artigo descreve os diferentes tipos de Pandas UDFs e mostra como usar Pandas UDFs com dicas de tipo.

UDF de série para série

O senhor usa um Series to Series Pandas UDF para vetorizar operações escalares. O senhor pode usá-las com APIs como select e withColumn.

A função Python deve receber uma série Pandas como entrada e retornar uma série Pandas do mesmo comprimento, e o senhor deve especificá-las nas dicas de tipo Python. Spark execução a Pandas UDF dividindo as colunas em lotes, chamando a função para cada lote como um subconjunto dos dados e, em seguida, concatenando os resultados.

O exemplo a seguir mostra como criar um Pandas UDF que calcula o produto de 2 colunas.

Python
import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())

# The function for a pandas_udf should be able to execute with local pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0 1
# 1 4
# 2 9
# dtype: int64

# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# | 1|
# | 4|
# | 9|
# +-------------------+

Iterador de série para Iterador de série UDF

Um iterador UDF é o mesmo que um escalar Pandas UDF , exceto:

  • A função Python

    • Recebe um iterador de lotes em vez de um único lote de entrada como entrada.
    • Retorna um iterador de lotes de saída em vez de um único lote de saída.
  • O comprimento de toda a saída no iterador deve ser igual ao comprimento de toda a entrada.

  • O wrapped Pandas UDF usa uma única coluna Spark como entrada.

O senhor deve especificar a dica de tipo Python como Iterator[pandas.Series] -> Iterator[pandas.Series].

Esse Pandas UDF é útil quando a execução do UDF requer a inicialização de algum estado, por exemplo, carregar um arquivo de modelo de aprendizado de máquina para aplicar inferência a cada lote de entrada.

O exemplo a seguir mostra como criar um Pandas UDF com suporte a iteradores.

Python
import pandas as pd
from typing import Iterator
from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

# When the UDF is called with the column,
# the input to the underlying function is an iterator of pd.Series.
@pandas_udf("long")
def plus_one(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
for x in batch_iter:
yield x + 1

df.select(plus_one(col("x"))).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# | 2|
# | 3|
# | 4|
# +-----------+

# In the UDF, you can initialize some state before processing batches.
# Wrap your code with try/finally or use context managers to ensure
# the release of resources at the end.
y_bc = spark.sparkContext.broadcast(1)

@pandas_udf("long")
def plus_y(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
y = y_bc.value # initialize states
try:
for x in batch_iter:
yield x + y
finally:
pass # release resources here, if any

df.select(plus_y(col("x"))).show()
# +---------+
# |plus_y(x)|
# +---------+
# | 2|
# | 3|
# | 4|
# +---------+

Iterador de várias séries para Iterador de séries UDF

Um Iterator of multiple Series to Iterator of Series UDF tem características semelhantes e restrições como o Iterator of Series to Iterator of Series UDF. A função especificada recebe um iterador de lotes e gera um iterador de lotes. Também é útil quando a execução do UDF exige a inicialização de algum estado do site .

As diferenças são:

  • A função Python subjacente usa um iterador de uma tupla de Pandas Series.
  • O wrapped Pandas UDF usa várias colunas Spark como entrada.

Você especifica as dicas de tipo como Iterator[Tuple[pandas.Series, ...]] - > Iterator[pandas.Series].

Python
from typing import Iterator, Tuple
import pandas as pd

from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

@pandas_udf("long")
def multiply_two_cols(
iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for a, b in iterator:
yield a * b

df.select(multiply_two_cols("x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# | 1|
# | 4|
# | 9|
# +-----------------------+

Série para UDF escalar

Séries para escalar Pandas UDFs são semelhantes às funções agregadas Spark. Uma Series to scalar Pandas UDF define uma agregação de uma ou mais Pandas Series a um valor escalar, em que cada Pandas Series representa uma coluna Spark. O senhor usa uma série para escalonar Pandas UDF com APIs, como select, withColumn, groupBy.agg e PySpark.sql.Window.

Você expressa a dica de tipo como pandas.Series, ... - > Any. O tipo de retorno deve ser um tipo de dados primitivo , e o escalar retornado pode ser um tipo primitivo Python, por exemplo, int ou float ou um tipo de dados NumPy, como numpy.int64 ou numpy.float64. Any deveria idealmente seja um tipo escalar específico.

Esse tipo de UDF não oferece suporte à agregação parcial e todos os dados de cada grupo são carregados na memória.

O exemplo a seguir mostra como usar esse tipo de UDF para compute média com select, groupBy e window operações:

Python
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window

df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))

# Declare the function and create the UDF
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
return v.mean()

df.select(mean_udf(df['v'])).show()
# +-----------+
# |mean_udf(v)|
# +-----------+
# | 4.2|
# +-----------+

df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# | 1| 1.5|
# | 2| 6.0|
# +---+-----------+

w = Window \
.partitionBy('id') \
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id| v|mean_v|
# +---+----+------+
# | 1| 1.0| 1.5|
# | 1| 2.0| 1.5|
# | 2| 3.0| 6.0|
# | 2| 5.0| 6.0|
# | 2|10.0| 6.0|
# +---+----+------+

Para obter detalhes sobre o uso, consulte PySpark.sql.functions.pandas_udf.

Uso

Definição do tamanho dos lotes Arrow

nota

Essa configuração não afeta o site compute configurado com o modo de acesso padrão e Databricks Runtime 13.3 LTS a 14.2.

As partições de dados no Spark são convertidas em lotes de registros Arrow, que podem levar temporariamente a um alto uso de memória na JVM. Para evitar possíveis exceções de falta de memória, o senhor pode ajustar o tamanho dos lotes de registros do Arrow definindo a configuração spark.sql.execution.arrow.maxRecordsPerBatch como um número inteiro que determina o número máximo de linhas para cada lote. O valor default é de 10.000 registros por lote. Se o número de colunas for grande, o o valor deve ser ajustado em conformidade. Usando esse limite, cada partição de dados é dividida em 1 ou mais lotes de registros para processamento.

Carimbo de data/hora com semântica de fuso horário

O Spark armazena internamente os carimbos de data/hora como valores UTC, e os dados de carimbo de data/hora trazidos sem um fuso horário especificado são convertidos como hora local para UTC com resolução de microssegundos.

Quando os dados de registro de data e hora são exportados ou exibidos no Spark, o fuso horário da sessão é usado para localizar os valores de registro de data e hora . O fuso horário da sessão é definido com a configuração spark.sql.session.timeZone e tem como padrão o fuso horário local do sistema JVM. Pandas usa um tipo datetime64 com resolução de nanossegundos, datetime64[ns], com fuso horário opcional por coluna.

Quando os dados de registro de data e hora são transferidos de Spark para Pandas, eles são convertidos em nanossegundos e cada coluna é convertida para o fuso horário da sessão Spark e, em seguida, localizada para esse fuso horário, o que remove o fuso horário e exibe os valores como hora local. Isso ocorre quando chamando toPandas() ou pandas_udf com colunas de carimbo de data/hora.

Quando os dados de registro de data e hora são transferidos de Pandas para Spark, eles são convertidos em microssegundos UTC. Isso ocorre ao chamar createDataFrame com um Pandas DataFrame ou ao retornar um registro de data e hora de um Pandas UDF. Essas conversões são feitas automaticamente para garantir que o Spark tenha dados no formato esperado, portanto, não é necessário fazer nenhuma dessas conversões por conta própria. Qualquer os valores de nanossegundos são truncados.

Um UDF padrão carrega dados de carimbo de data/hora como objetos Python datetime, o que é diferente de um carimbo de data/hora Pandas. Para obter o melhor desempenho, recomendamos que o senhor use a funcionalidade de série temporal do site Pandas ao trabalhar com carimbos de data/hora em um site Pandas UDF. Para obter detalhes, consulte Funcionalidade de séries temporais/datas.

Exemplo de notebook

O Notebook a seguir ilustra as melhorias de desempenho que o senhor pode obter com os UDFs do site Pandas:

Pandas Notebook de referência de UDFs

Open notebook in new tab