pandas function APIs

pandas function APIs enable you to directly apply a Python native function, which takes and outputs pandas instances, to a PySpark DataFrame. Similar to pandas user-defined functions, function APIs also use Apache Arrow to transfer data and pandas to work with the data; however, Python type hints are optional in pandas function APIs.

There are three types of pandas function APIs:

  • Grouped map
  • Map
  • Cogrouped map

pandas function APIs leverage the same internal logic that pandas UDF executions use. Therefore, it shares the same characteristics with pandas UDFs such as PyArrow, supported SQL types, and the configurations.

For more information, see the blog post New Pandas UDFs and Python Type Hints in the Upcoming Release of Apache Spark 3.0.

Grouped map

You transform your grouped data via groupBy().applyInPandas() to implement the “split-apply-combine” pattern. Split-apply-combine consists of three steps:

  • Split the data into groups by using DataFrame.groupBy.
  • Apply a function on each group. The input and output of the function are both pandas.DataFrame. The input data contains all the rows and columns for each group.
  • Combine the results into a new DataFrame.

To use groupBy().applyInPandas(), you must define the following:

  • A Python function that defines the computation for each group
  • A StructType object or a string that defines the schema of the output DataFrame

The column labels of the returned pandas.DataFrame must either match the field names in the defined output schema if specified as strings, or match the field data types by position if not strings, for example, integer indices. See pandas.DataFrame for how to label columns when constructing a pandas.DataFrame.

All data for a group is loaded into memory before the function is applied. This can lead to out of memory exceptions, especially if the group sizes are skewed. The configuration for maxRecordsPerBatch is not applied on groups and it is up to you to ensure that the grouped data fits into the available memory.

The following example shows how to use groupby().apply() to subtract the mean from each value in the group.

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id|   v|
# +---+----+
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---+----+

For detailed usage, see pyspark.sql.GroupedData.applyInPandas.

Map

You perform map operations with pandas instances by DataFrame.mapInPandas() in order to transform an iterator of pandas.DataFrame to another iterator of pandas.DataFrame that represents the current PySpark DataFrame and returns the result as a PySpark DataFrame.

The underlying function takes and outputs an iterator of pandas.DataFrame. It can return the output of arbitrary length in contrast to some pandas UDFs such as Series to Series pandas UDF.

The following example shows how to use mapInPandas():

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# |  1| 21|
# +---+---+

For detailed usage, see pyspark.sql.DataFrame.mapInPandas.

Cogrouped map

For cogrouped map operations with pandas instances, use DataFrame.groupby().cogroup().applyInPandas() for two PySpark DataFrames to be cogrouped by a common key and then a Python function applied to each cogroup. It consists of the following steps:

  • Shuffle the data such that the groups of each DataFrame which share a key are cogrouped together.
  • Apply a function to each cogroup. The input of the function is two pandas.DataFrame (with an optional tuple representing the key). The output of the function is a pandas.DataFrame.
  • Combine the pandas.DataFrames from all groups into a new PySpark DataFrame.

To use groupBy().cogroup().applyInPandas(), you must define the following:

  • A Python function that defines the computation for each cogroup.
  • A StructType object or a string that defines the schema of the output PySpark DataFrame.

The column labels of the returned pandas.DataFrame must either match the field names in the defined output schema if specified as strings, or match the field data types by position if not strings, for example, integer indices. See pandas.DataFrame for how to label columns when constructing a pandas.DataFrame.

All data for a cogroup is loaded into memory before the function is applied. This can lead to out of memory exceptions, especially if the group sizes are skewed. The configuration for maxRecordsPerBatch is not applied and it is up to you to ensure that the cogrouped data fits into the available memory.

The following example shows how to use groupby().cogroup().applyInPandas() to perform an asof join between two datasets.

import pandas as pd

df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

def asof_join(l, r):
    return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# |    time| id| v1| v2|
# +--------+---+---+---+
# |20000101|  1|1.0|  x|
# |20000102|  1|3.0|  x|
# |20000101|  2|2.0|  y|
# |20000102|  2|4.0|  y|
# +--------+---+---+---+

For detailed usage, see pyspark.sql.PandasCogroupedOps.applyInPandas.