[Spark Core]: Shuffle Internals and Memory Usage - What's getting pulled into memory on the "reduce" side?

Previous Topic Next Topic
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view

[Spark Core]: Shuffle Internals and Memory Usage - What's getting pulled into memory on the "reduce" side?

This post has NOT been accepted by the mailing list yet.
Hi all - I'm attempting to migrate from Cascading and MapReduce over to Spark and have encountered some issues with jobs running out of memory. I've read through a bunch of the spark code trying to track down what allocates memory and what doesn't and was hoping I could get some insight from someone familiar with the internals of the Shuffle process.

1) It appears from my code perusal that Spark necessarily allocates memory for the entire output of one "map" task when reading Shuffle blocks on the "reduce" side. E.g. if the number of values for a single key coming from a single map is very high, this will use lots of memory on the reduce side by reading them all in at once, rather than Spark trying to stream them from disk and operate on them in a way that would not allocate the memory all at once. I'm using some custom grouping code that makes use of repartitionAndSortWithingPartitions and mapPartitions rather than using the normal groupBy() helper, so my assumption was that memory would not need to be allocated, but it appears that it's being allocated anyway.

2) Based on my assumption that the answer to question one is "yes", we tried various strategies to reduce the amount of data being output by any single map task for any single reduce task, including dramatically increasing the partitions for the incoming data as well as the number of partitions being shuffled into. This delayed the OOMs we were experiencing, but did not cure them. So my follow up is - is spark holding ALL the data that goes into a given "reduce" in memory? This would mean that no matter the number of incoming partitions, a skewed single key could cause a large increase in memory usage on a single executor.

Thanks to anyone who is able to address any of the questions here! My hope is that I'm doing something silly that's using all this extra memory but if it's a known feature of Spark internals that would be great to know.