APIs de funções do pandas
As APIs de função do pandas permitem que você aplique diretamente uma função nativa do Python que obtém e gera instâncias do pandas para um PySpark DataFrame. Semelhante às funções definidas pelo usuário dos pandas, as APIs de funções também usam o Apache Arrow para transferir dados e os pandas para trabalhar com os dados; no entanto, as dicas de tipo do Python são opcionais nas APIs de funções do pandas.
Existem três tipos de APIs de função pandas:
mapa agrupado
Mapa
mapa coagrupado
As APIs de funções do pandas utilizam a mesma lógica interna que a execução UDF do pandas usa. Eles compartilham características como PyArrow, tipos de SQL suportados e as configurações.
Para obter mais informações, consulte a postagem nos blogs New Pandas UDFs e Python Type Hints in the Next Release of Apache Spark 3.0.
mapa agrupado
Você transforma seu uso de dados agrupado groupBy().applyInPandas()
para implementar o padrão “split-apply-combine”. Split-apply-combine consiste em três passos:
Divida os dados em grupos usando
DataFrame.groupBy
.Aplique uma função em cada grupo. A entrada e a saída da função são
pandas.DataFrame
. Os dados de entrada contêm todas as linhas e colunas de cada grupo.Combine os resultados em um novo
DataFrame
.
Para usar groupBy().applyInPandas()
, você deve definir o seguinte:
Uma função Python que define a computação para cada grupo
Um objeto
StructType
ou strings que definem o esquema da saídaDataFrame
Os rótulos de coluna do pandas.DataFrame
retornado devem corresponder aos nomes de campo no esquema de saída definido, se especificado como strings, ou corresponder aos tipos de dados do campo por posição, se não strings, por exemplo, índices inteiros. Consulte pandas.DataFrame para saber como rotular colunas ao construir um pandas.DataFrame
.
Todos os dados de um grupo são carregados na memória antes que a função seja aplicada. Isso pode levar a exceções de falta de memória, especialmente se os tamanhos dos grupos estiverem distorcidos. A configuração de maxRecordsPerBatch não é aplicada em grupos e cabe a você garantir que os dados agrupados caibam na memória disponível.
O exemplo a seguir mostra como usar groupby().apply()
para subtrair a média de cada valor no grupo.
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=v - v.mean())
df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id| v|
# +---+----+
# | 1|-0.5|
# | 1| 0.5|
# | 2|-3.0|
# | 2|-1.0|
# | 2| 4.0|
# +---+----+
Para uso detalhado, consulte PySpark.sql.GroupedData.applyInPandas.
Mapa
Você executa operações de mapa com instâncias de pandas por DataFrame.mapInPandas()
para transformar um iterador de pandas.DataFrame
em outro iterador de pandas.DataFrame
que representa o PySpark DataFrame atual e retorna o resultado como um PySpark DataFrame.
A função subjacente recebe e gera um iterador de pandas.DataFrame
. Ele pode retornar uma saída de comprimento arbitrário em contraste com alguns UDFs pandas, como Series to Series.
O exemplo a seguir mostra como usar mapInPandas()
:
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def filter_func(iterator):
for pdf in iterator:
yield pdf[pdf.id == 1]
df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# | 1| 21|
# +---+---+
Para uso detalhado, consulte PySpark.sql.DataFrame.mapInPandas.
mapa coagrupado
Para operações de mapa coagrupadas com instâncias de pandas, use DataFrame.groupby().cogroup().applyInPandas()
para agrupar dois PySpark DataFrame
s por uma key comum e, em seguida, aplique uma função Python a cada cogrupo, conforme mostrado:
Embaralhe os dados de forma que os grupos de cada DataFrame que compartilham uma key sejam agrupados.
Aplique uma função a cada cogrupo. A entrada da função é dois
pandas.DataFrame
(com uma tupla opcional representando a key). A saída da função é umpandas.DataFrame
.Combine os
pandas.DataFrame
s de todos os grupos em um novo PySparkDataFrame
.
Para usar groupBy().cogroup().applyInPandas()
, você deve definir o seguinte:
Uma função Python que define a computação para cada cogrupo.
Um objeto
StructType
ou strings que definem o esquema da saída PySparkDataFrame
.
Os rótulos de coluna do pandas.DataFrame
retornado devem corresponder aos nomes de campo no esquema de saída definido, se especificado como strings, ou corresponder aos tipos de dados do campo por posição, se não strings, por exemplo, índices inteiros. Consulte pandas.DataFrame para saber como rotular colunas ao construir um pandas.DataFrame
.
Todos os dados de um cogrupo são carregados na memória antes que a função seja aplicada. Isso pode levar a exceções de falta de memória, especialmente se os tamanhos dos grupos estiverem distorcidos. A configuração para maxRecordsPerBatch não é aplicada e cabe a você garantir que os dados coagrupados caibam na memória disponível.
O exemplo a seguir mostra como usar groupby().cogroup().applyInPandas()
para executar um asof join
entre dois dataset.
import pandas as pd
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))
def asof_join(l, r):
return pd.merge_asof(l, r, on="time", by="id")
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# | time| id| v1| v2|
# +--------+---+---+---+
# |20000101| 1|1.0| x|
# |20000102| 1|3.0| x|
# |20000101| 2|2.0| y|
# |20000102| 2|4.0| y|
# +--------+---+---+---+
Para uso detalhado, consulte PySpark.sql.PandasCogroupedOps.applyInPandas.