Pular para o conteúdo principal

Pandas função APIs

Pandas A função APIs permite que o senhor aplique diretamente uma função nativa Python que recebe e envia instâncias Pandas para um PySpark DataFrame. Semelhante às funções definidas pelo usuárioPandas, a função APIs também usa o Apache Arrow para transferir dados e o Pandas para trabalhar com os dados; no entanto, as dicas de tipo Python são opcionais na função Pandas APIs.

Há três tipos de função Pandas APIs:

  • Mapa agrupado
  • Mapa
  • Mapa coagrupado

Pandas A função APIs aproveita a mesma lógica interna que a execução do Pandas UDF usa. Eles compartilham características como o PyArrow, os tipos de SQL compatíveis e as configurações.

Para obter mais informações, consulte a postagem no blog New Pandas UDFs and Python Type Hints in the Upcoming Release of Apache Spark 3.0.

Mapa agrupado

O senhor transforma seu uso agrupado de dados groupBy().applyInPandas() para implementar o padrão "dividir-aplicar-combinar". Split-Apply-Combine consiste em três etapas:

  • Divida os dados em grupos usando DataFrame.groupBy.
  • Aplique uma função em cada grupo. A entrada e a saída da função são pandas.DataFrame. Os dados de entrada contêm todas as linhas e colunas de cada grupo.
  • Combine os resultados em um novo DataFrame.

Para usar groupBy().applyInPandas(), você deve definir o seguinte:

  • Uma função Python que define o cálculo para cada grupo
  • Um objeto StructType ou uma cadeia de caracteres que define o esquema da saída DataFrame

O rótulo da coluna do pandas.DataFrame retornado deve corresponder aos nomes de campo no esquema de saída definido, se especificado como strings, ou corresponder aos tipos de dados de campo por posição, se não for strings, por exemplo, índices inteiros. Consulte pandas.DataFrame para saber como rotular colunas ao construir um pandas.DataFrame.

Todos os dados de um grupo são carregados na memória antes da aplicação da função. Isso pode levar a exceções de falta de memória, especialmente se os tamanhos dos grupos estiverem distorcidos. A configuração de MaxRecordSperBatch não é aplicada em grupos e cabe a você garantir que os dados agrupados caibam na memória disponível.

O exemplo a seguir mostra como usar groupby().apply() para subtrair a média de cada valor no grupo.

Python
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))

def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=v - v.mean())

df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id| v|
# +---+----+
# | 1|-0.5|
# | 1| 0.5|
# | 2|-3.0|
# | 2|-1.0|
# | 2| 4.0|
# +---+----+

Para obter detalhes sobre o uso, consulte PySpark.sql.GroupedData.applyInPandas.

Mapa

O senhor realiza operações de mapa com instâncias de Pandas por DataFrame.mapInPandas() para transformar um iterador de pandas.DataFrame em outro iterador de pandas.DataFrame que representa o PySpark DataFrame atual e retorna o resultado como um PySpark DataFrame.

A função subjacente recebe e gera um iterador de pandas.DataFrame. Ele pode retornar uma saída de comprimento arbitrário, ao contrário de alguns UDFs do site Pandas, como Series to Series.

O exemplo a seguir mostra como usar mapInPandas():

Python
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator):
for pdf in iterator:
yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# | 1| 21|
# +---+---+

Para obter detalhes sobre o uso, consulte PySpark.sql.DataFrame.mapInPandas.

Mapa coagrupado

Para operações de mapas agrupados com instâncias de Pandas, use DataFrame.groupby().cogroup().applyInPandas() para agrupar dois PySpark DataFrames por um key comum e, em seguida, aplique uma função Python a cada cogroup, conforme mostrado:

  • Embaralhe os dados de modo que os grupos de cada DataFrame que compartilham um key sejam agrupados.
  • Aplique uma função a cada cogrupo. A entrada da função é dois pandas.DataFrame (com uma tupla opcional representando o key). A saída da função é pandas.DataFrame.
  • Combine os pandas.DataFrames de todos os grupos em um novo PySpark DataFrame.

Para usar groupBy().cogroup().applyInPandas(), você deve definir o seguinte:

  • Uma função Python que define o cálculo para cada cogroup.
  • Um objeto StructType ou uma cadeia de caracteres que define o esquema da saída PySpark DataFrame.

O rótulo da coluna do pandas.DataFrame retornado deve corresponder aos nomes de campo no esquema de saída definido, se especificado como strings, ou corresponder aos tipos de dados de campo por posição, se não for strings, por exemplo, índices inteiros. Consulte pandas.DataFrame para saber como rotular colunas ao construir um pandas.DataFrame.

Todos os dados de um cogrupo são carregados na memória antes da aplicação da função. Isso pode levar a exceções de falta de memória, especialmente se os tamanhos dos grupos estiverem distorcidos. A configuração de MaxRecordSperBatch não é aplicada e cabe a você garantir que os dados coagrupados caibam na memória disponível.

O exemplo a seguir mostra como usar o site groupby().cogroup().applyInPandas() para executar um asof join entre dois conjuntos de dados.

Python
import pandas as pd

df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"))

df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))

def asof_join(l, r):
return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# | time| id| v1| v2|
# +--------+---+---+---+
# |20000101| 1|1.0| x|
# |20000102| 1|3.0| x|
# |20000101| 2|2.0| y|
# |20000102| 2|4.0| y|
# +--------+---+---+---+

Para obter detalhes sobre o uso, consulte PySpark.sql.PandasCogroupedOps.applyInPandas.