Thanks for the lead - I went through the failure logs in the executor.
In my DAG, there is an union operation after the read from HDFS is performed. So the DAG showed like reading the data twice - to optimize the read once, I added df.persist() between read and union. Due to which the memory of executor shooted up and caused this error of BufferHolder. So removing the persist() didn't cause this error - but my follow up question,
1. Is it efficient performing this (UNION) operation though it reads HDFS twice (as I see in DAG) ? Assume the two DFs getting union'ed are mutually exclusive and splits the input DF as two DFs - Will the Spark optimizes reading the raw data first then apply the filter ? or it does IO twice ?
2. To make it optimized what are the other alternatives ?