PySpark aggregation w/pandas_udf

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view

PySpark aggregation w/pandas_udf

Andrew Melo
Hi all,

For our use case, we would like to perform an aggregation using a
pandas_udf with dataframes that have O(100m) rows and a few 10s of
columns. Conceptually, this looks a bit like pyspark.RDD.aggregate,
where the user provides:

* A "seqOp" which accepts pandas series(*) and outputs an intermediate output
* A "combOp" which combines the intermediate outputs into a final output

There's no direct DataFrame equivalent to RDD.aggregate(), but you can
somewhat emulate the functionality with


However, it seems like using groupBy() w/o any columns isn't the
intended use. The docs for groupBy().applyInPandas() has the following

> Note This function requires a full shuffle. All the data of a group will be loaded into > memory, so the user should be aware of the potential OOM risk if data is skewed > and certain groups are too large to fit in memory.

The Spark SQL guide has the following note as well:

> The configuration for maxRecordsPerBatch is not applied on groups and it is up to > the user to ensure that the grouped data will fit into the available memory.

Since we want to perform this aggregation over the entire DataFrame,
we end up with one group who is entirely loaded into memory which
immediately OOMs (requiring a shuffle probably doesn't help either).

To work around this, we make smaller groups by repartitioning, adding
a new column with the partition ID, then do the groupBy against that
column to make smaller groups, but that's not great -- it's a lot of
extra data movement.

Am I missing something obvious? Or is this simply a part of the API
that's not fleshed out yet?


* Unfortunately, Pandas' data model is less rich than spark/arrow/our
code, so the JVM composes an arrow stream and transmits it to the
python worker who then converts it to pandas before passing it to the
UDF. We then have to undo the conversion to get the original data
back. It'd be nice to have more control over that intermediate

To unsubscribe e-mail: [hidden email]