APIs de funções do pandas

As APIs de função do pandas permitem que você aplique diretamente uma função nativa do Python que obtém e gera instâncias do pandas para um PySpark DataFrame. Semelhante às funções definidas pelo usuário dos pandas, as APIs de funções também usam o Apache Arrow para transferir dados e os pandas para trabalhar com os dados; no entanto, as dicas de tipo do Python são opcionais nas APIs de funções do pandas.

Existem três tipos de APIs de função pandas:

  • mapa agrupado

  • Mapa

  • mapa coagrupado

As APIs de funções do pandas utilizam a mesma lógica interna que a execução UDF do pandas usa. Eles compartilham características como PyArrow, tipos de SQL suportados e as configurações.

Para obter mais informações, consulte a postagem nos blogs New Pandas UDFs e Python Type Hints in the Next Release of Apache Spark 3.0.

mapa agrupado

Você transforma seu uso de dados agrupado groupBy().applyInPandas() para implementar o padrão “split-apply-combine”. Split-apply-combine consiste em três passos:

  • Divida os dados em grupos usando DataFrame.groupBy.

  • Aplique uma função em cada grupo. A entrada e a saída da função são pandas.DataFrame. Os dados de entrada contêm todas as linhas e colunas de cada grupo.

  • Combine os resultados em um novo DataFrame.

Para usar groupBy().applyInPandas(), você deve definir o seguinte:

  • Uma função Python que define a computação para cada grupo

  • Um objeto StructType ou strings que definem o esquema da saída DataFrame

Os rótulos de coluna do pandas.DataFrame retornado devem corresponder aos nomes de campo no esquema de saída definido, se especificado como strings, ou corresponder aos tipos de dados do campo por posição, se não strings, por exemplo, índices inteiros. Consulte pandas.DataFrame para saber como rotular colunas ao construir um pandas.DataFrame.

Todos os dados de um grupo são carregados na memória antes que a função seja aplicada. Isso pode levar a exceções de falta de memória, especialmente se os tamanhos dos grupos estiverem distorcidos. A configuração de maxRecordsPerBatch não é aplicada em grupos e cabe a você garantir que os dados agrupados caibam na memória disponível.

O exemplo a seguir mostra como usar groupby().apply() para subtrair a média de cada valor no grupo.

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id|   v|
# +---+----+
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---+----+

Para uso detalhado, consulte PySpark.sql.GroupedData.applyInPandas.

Mapa

Você executa operações de mapa com instâncias de pandas por DataFrame.mapInPandas() para transformar um iterador de pandas.DataFrame em outro iterador de pandas.DataFrame que representa o PySpark DataFrame atual e retorna o resultado como um PySpark DataFrame.

A função subjacente recebe e gera um iterador de pandas.DataFrame. Ele pode retornar uma saída de comprimento arbitrário em contraste com alguns UDFs pandas, como Series to Series.

O exemplo a seguir mostra como usar mapInPandas():

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# |  1| 21|
# +---+---+

Para uso detalhado, consulte PySpark.sql.DataFrame.mapInPandas.

mapa coagrupado

Para operações de mapa coagrupadas com instâncias de pandas, use DataFrame.groupby().cogroup().applyInPandas() para agrupar dois PySpark DataFrames por uma key comum e, em seguida, aplique uma função Python a cada cogrupo, conforme mostrado:

  • Embaralhe os dados de forma que os grupos de cada DataFrame que compartilham uma key sejam agrupados.

  • Aplique uma função a cada cogrupo. A entrada da função é dois pandas.DataFrame (com uma tupla opcional representando a key). A saída da função é um pandas.DataFrame.

  • Combine os pandas.DataFrames de todos os grupos em um novo PySpark DataFrame.

Para usar groupBy().cogroup().applyInPandas(), você deve definir o seguinte:

  • Uma função Python que define a computação para cada cogrupo.

  • Um objeto StructType ou strings que definem o esquema da saída PySpark DataFrame.

Os rótulos de coluna do pandas.DataFrame retornado devem corresponder aos nomes de campo no esquema de saída definido, se especificado como strings, ou corresponder aos tipos de dados do campo por posição, se não strings, por exemplo, índices inteiros. Consulte pandas.DataFrame para saber como rotular colunas ao construir um pandas.DataFrame.

Todos os dados de um cogrupo são carregados na memória antes que a função seja aplicada. Isso pode levar a exceções de falta de memória, especialmente se os tamanhos dos grupos estiverem distorcidos. A configuração para maxRecordsPerBatch não é aplicada e cabe a você garantir que os dados coagrupados caibam na memória disponível.

O exemplo a seguir mostra como usar groupby().cogroup().applyInPandas() para executar um asof join entre dois dataset.

import pandas as pd

df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

def asof_join(l, r):
    return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# |    time| id| v1| v2|
# +--------+---+---+---+
# |20000101|  1|1.0|  x|
# |20000102|  1|3.0|  x|
# |20000101|  2|2.0|  y|
# |20000102|  2|4.0|  y|
# +--------+---+---+---+

Para uso detalhado, consulte PySpark.sql.PandasCogroupedOps.applyInPandas.