Query on Spark Dataframe Aggregations

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Query on Spark Dataframe Aggregations

Subash Prabakar
Hi,

My job does an aggregation over some dimensions - but I heavily use spark dataframe api's. 
But when I execute - few times I get the below error,

Screenshot 2020-05-06 at 7.42.44 PM.png

Spark Version: 2.3.0 (CDH) 2.6.5
Data format : ORC
Aggregation performed: countDistinct and sum ( Total 5 aggregations. )


Does anyone have any recommendations or idea why this happens ?

Thanks,
Subash
Reply | Threaded
Open this post in threaded view
|

Re: Query on Spark Dataframe Aggregations

Jeff Evans
I believe this is what you're running into: https://stackoverflow.com/a/55435329/375670

On Wed, May 6, 2020 at 9:19 AM Subash Prabakar <[hidden email]> wrote:
Hi,

My job does an aggregation over some dimensions - but I heavily use spark dataframe api's. 
But when I execute - few times I get the below error,

Screenshot 2020-05-06 at 7.42.44 PM.png

Spark Version: 2.3.0 (CDH) 2.6.5
Data format : ORC
Aggregation performed: countDistinct and sum ( Total 5 aggregations. )


Does anyone have any recommendations or idea why this happens ?

Thanks,
Subash
Reply | Threaded
Open this post in threaded view
|

Re: Query on Spark Dataframe Aggregations

Subash Prabakar
Hi Jeff,

Thanks for the lead - I went through the failure logs in the executor.

In my DAG, there is an union operation after the read from HDFS is performed. So the DAG showed like reading the data twice - to optimize the read once, I added df.persist() between read and union. Due to which the memory of executor shooted up and caused this error of BufferHolder. So removing the persist() didn't cause this error - but my follow up question, 

To Everyone,

1. Is it efficient performing this (UNION) operation though it reads HDFS twice (as I see in DAG) ? Assume the two DFs getting union'ed are mutually exclusive and splits the input DF as two DFs - Will the Spark optimizes reading the raw data first then apply the filter ? or it does IO twice ?

2. To make it optimized what are the other alternatives ? 


Thanks,
Subash.




On Wed, May 6, 2020 at 7:53 PM Jeff Evans <[hidden email]> wrote:
I believe this is what you're running into: https://stackoverflow.com/a/55435329/375670

On Wed, May 6, 2020 at 9:19 AM Subash Prabakar <[hidden email]> wrote:
Hi,

My job does an aggregation over some dimensions - but I heavily use spark dataframe api's. 
But when I execute - few times I get the below error,

Screenshot 2020-05-06 at 7.42.44 PM.png

Spark Version: 2.3.0 (CDH) 2.6.5
Data format : ORC
Aggregation performed: countDistinct and sum ( Total 5 aggregations. )


Does anyone have any recommendations or idea why this happens ?

Thanks,
Subash