pandas function APIs

pandas function APIs enable you to directly apply a Python native function that takes and outputs pandas instances to a PySpark DataFrame. Similar to pandas user-defined functions, function APIs also use Apache Arrow to transfer data and pandas to work with the data; however, Python type hints are optional in pandas function APIs.

There are three types of pandas function APIs:

  • Grouped map

  • Map

  • Cogrouped map

pandas function APIs leverage the same internal logic that pandas UDF execution uses. They share characteristics such as PyArrow, supported SQL types, and the configurations.

For more information, see the blog post New Pandas UDFs and Python Type Hints in the Upcoming Release of Apache Spark 3.0.

Grouped map

You transform your grouped data using groupBy().applyInPandas() to implement the “split-apply-combine” pattern. Split-apply-combine consists of three steps:

  • Split the data into groups by using DataFrame.groupBy.

  • Apply a function on each group. The input and output of the function are both pandas.DataFrame. The input data contains all the rows and columns for each group.

  • Combine the results into a new DataFrame.

To use groupBy().applyInPandas(), you must define the following:

  • A Python function that defines the computation for each group

  • A StructType object or a string that defines the schema of the output DataFrame

The column labels of the returned pandas.DataFrame must either match the field names in the defined output schema if specified as strings, or match the field data types by position if not strings, for example, integer indices. See pandas.DataFrame for how to label columns when constructing a pandas.DataFrame.

All data for a group is loaded into memory before the function is applied. This can lead to out of memory exceptions, especially if the group sizes are skewed. The configuration for maxRecordsPerBatch is not applied on groups and it is up to you to ensure that the grouped data fits into the available memory.

The following example shows how to use groupby().apply() to subtract the mean from each value in the group.

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id|   v|
# +---+----+
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---+----+

For detailed usage, see pyspark.sql.GroupedData.applyInPandas.

Map

You perform map operations with pandas instances by DataFrame.mapInPandas() in order to transform an iterator of pandas.DataFrame to another iterator of pandas.DataFrame that represents the current PySpark DataFrame and returns the result as a PySpark DataFrame.

The underlying function takes and outputs an iterator of pandas.DataFrame. It can return output of arbitrary length in contrast to some pandas UDFs such as Series to Series.

The following example shows how to use mapInPandas():

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# |  1| 21|
# +---+---+

For detailed usage, see pyspark.sql.DataFrame.mapInPandas.

Cogrouped map

For cogrouped map operations with pandas instances, use DataFrame.groupby().cogroup().applyInPandas() to cogroup two PySpark DataFrames by a common key and then apply a Python function to each cogroup as shown:

  • Shuffle the data such that the groups of each DataFrame which share a key are cogrouped together.

  • Apply a function to each cogroup. The input of the function is two pandas.DataFrame (with an optional tuple representing the key). The output of the function is a pandas.DataFrame.

  • Combine the pandas.DataFrames from all groups into a new PySpark DataFrame.

To use groupBy().cogroup().applyInPandas(), you must define the following:

  • A Python function that defines the computation for each cogroup.

  • A StructType object or a string that defines the schema of the output PySpark DataFrame.

The column labels of the returned pandas.DataFrame must either match the field names in the defined output schema if specified as strings, or match the field data types by position if not strings, for example, integer indices. See pandas.DataFrame for how to label columns when constructing a pandas.DataFrame.

All data for a cogroup is loaded into memory before the function is applied. This can lead to out of memory exceptions, especially if the group sizes are skewed. The configuration for maxRecordsPerBatch is not applied and it is up to you to ensure that the cogrouped data fits into the available memory.

The following example shows how to use groupby().cogroup().applyInPandas() to perform an asof join between two datasets.

import pandas as pd

df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

def asof_join(l, r):
    return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# |    time| id| v1| v2|
# +--------+---+---+---+
# |20000101|  1|1.0|  x|
# |20000102|  1|3.0|  x|
# |20000101|  2|2.0|  y|
# |20000102|  2|4.0|  y|
# +--------+---+---+---+

For detailed usage, see pyspark.sql.PandasCogroupedOps.applyInPandas.