Spark executors exceeding heap space allocated

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view

Spark executors exceeding heap space allocated

Akshay Mendole
     We have been dealing with memory issues in our spark container since few days. We have tried various optimisation techniques, tried tuning GC, changing default GC and what not. Still we are not able to figure out what must be going inside spark containers that is causing the issue. Here is the summary of what we have been facing, the actions we took and the queries we have.

Here is the DAG we are trying to process
Screen Shot 2018-12-21 at 8.36.08 PM.png

The stats for the job stages :
Screen Shot 2018-12-21 at 8.37.55 PM.png

Task level failures :
Screen Shot 2018-12-21 at 8.40.06 PM.png

As you can see we are churning descent amount of data (around 1 TB hadoop sequence files) which generated ~10k tasks with input size as ~ 100 MB . The fanout from the stage 0 is 1:30 approx. which means for a single record stage 0 would spit out 30 records that would be shuffled. 

We could also see that per task level in stage 1, there is ~1GB of shuffle read.
Any value below 7 GBs for the executor memory would lead to OOM issue. Our job succeeded only when we increased the executor memory to 8 GBs. 
We looked into GC logs and saw that there is lot of young GCs when container size is below 7GBs. We tried several tuning parameters, G1 GC but nothing seemed to work out for containers less than 7 GBs.
We want to know why spark executors are needing so much memory. Today our job is working fine with 8 GB container size. But, if we increase the input size in future, we certainly know that the executor memory has to be increased without which we will run into above issues. Now this sounds more like vertical scaling than horizontal. We know that there is a way to repartition the input and split them so that memory load on each executor reduces, but we believe the current input size per task (~100 MB in stage 0) and shuffle read size (~1 GB in stage 1) is itself very less. 
We could not take a JFR profile as we do not know which task is going to fail out of 10 K containers and the problem does not reproduce when we reduce the input size.
Is there a way to look at the intermediate shuffle data? We believe there is no skewness and all the partitions are more or less of same size but just want to confirm this.

Can you please guide us through this? Is this a common scenario in spark? How to deal with such problems given the fact that we tries almost everything and finally resorted to increasing the executor memory.

Let me know if you need more information.