Skip to main content

aggregate

Applies a binary operator to an initial state and all elements in the array, and reduces this to a single state. The final state is converted into the final result by applying a finish function. Supports Spark Connect.

For the corresponding Databricks SQL function, see aggregate function.

Syntax

Python
from pyspark.databricks.sql import functions as dbf

dbf.aggregate(col=<col>, initialValue=<initialValue>, merge=<merge>, finish=<finish>)

Parameters

Parameter

Type

Description

col

pyspark.sql.Column or str

Name of column or expression.

initialValue

pyspark.sql.Column or str

Initial value. Name of column or expression.

merge

function

A binary function that returns expression of the same type as initialValue.

finish

function, optional

An optional unary function used to convert accumulated value.

Returns

pyspark.sql.Column: final value after aggregate function is applied.

Examples

Example 1: Simple aggregation with sum

Python
from pyspark.databricks.sql import functions as dbf
df = spark.createDataFrame([(1, [20.0, 4.0, 2.0, 6.0, 10.0])], ("id", "values"))
df.select(dbf.aggregate("values", dbf.lit(0.0), lambda acc, x: acc + x).alias("sum")).show()
Output
+----+
| sum|
+----+
|42.0|
+----+

Example 2: Aggregation with finish function

Python
from pyspark.databricks.sql import functions as dbf
df = spark.createDataFrame([(1, [20.0, 4.0, 2.0, 6.0, 10.0])], ("id", "values"))
def merge(acc, x):
count = acc.count + 1
sum = acc.sum + x
return dbf.struct(count.alias("count"), sum.alias("sum"))
df.select(
dbf.aggregate(
"values",
dbf.struct(dbf.lit(0).alias("count"), dbf.lit(0.0).alias("sum")),
merge,
lambda acc: acc.sum / acc.count,
).alias("mean")
).show()
Output
+----+
|mean|
+----+
| 8.4|
+----+