[pyspark 2.4] broadcasting DataFrame throws error

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

[pyspark 2.4] broadcasting DataFrame throws error

rishishah.star
Hello All,

Hope this email finds you well. I have a dataframe of size 8TB (parquet snappy compressed), however I group it by a column and get a much smaller aggregated dataframe of size 700 rows (just two columns, key and count). When I use it like below to broadcast this aggregated result, it throws dataframe can not be broadcasted error.

df_agg = df.groupBy('column1').count().cache()
# df_agg.count()
df_join = df.join(broadcast(df_agg), 'column1', 'left_outer')
df_join.write.parquet('PATH')

The same code works with input df size of 3TB without any modifications. 

Any suggestions?

--
Regards,

Rishi Shah
Reply | Threaded
Open this post in threaded view
|

Re: [pyspark 2.4] broadcasting DataFrame throws error

Amit Joshi
Hi,

I think problem lies with driver memory. Broadcast in spark work by collecting all the data to driver and then driver broadcasting to all the executors. Different strategy could be employed for trasfer like bit torrent though.

Please try increasing the driver memory. See if it works.

Regards,
Amit


On Thursday, September 17, 2020, Rishi Shah <[hidden email]> wrote:
Hello All,

Hope this email finds you well. I have a dataframe of size 8TB (parquet snappy compressed), however I group it by a column and get a much smaller aggregated dataframe of size 700 rows (just two columns, key and count). When I use it like below to broadcast this aggregated result, it throws dataframe can not be broadcasted error.

df_agg = df.groupBy('column1').count().cache()
# df_agg.count()
df_join = df.join(broadcast(df_agg), 'column1', 'left_outer')
df_join.write.parquet('PATH')

The same code works with input df size of 3TB without any modifications. 

Any suggestions?

--
Regards,

Rishi Shah
Reply | Threaded
Open this post in threaded view
|

Re: [pyspark 2.4] broadcasting DataFrame throws error

rishishah.star
Thanks Amit. I have tried increasing driver memory , also tried increasing max result size returned to the driver. Nothing works, I believe spark is not able to determine the fact that the result to be broadcasted is small enough because input data is huge? When I tried this in 2 stages, write out the grouped data and use that to join using broadcast, spark has no issues broadcasting this.

When I was checking Spark 3 documentation, it seems like this issue may have been addressed in Spark 3 but not in earlier version?

On Thu, Sep 17, 2020 at 11:35 PM Amit Joshi <[hidden email]> wrote:
Hi,

I think problem lies with driver memory. Broadcast in spark work by collecting all the data to driver and then driver broadcasting to all the executors. Different strategy could be employed for trasfer like bit torrent though.

Please try increasing the driver memory. See if it works.

Regards,
Amit


On Thursday, September 17, 2020, Rishi Shah <[hidden email]> wrote:
Hello All,

Hope this email finds you well. I have a dataframe of size 8TB (parquet snappy compressed), however I group it by a column and get a much smaller aggregated dataframe of size 700 rows (just two columns, key and count). When I use it like below to broadcast this aggregated result, it throws dataframe can not be broadcasted error.

df_agg = df.groupBy('column1').count().cache()
# df_agg.count()
df_join = df.join(broadcast(df_agg), 'column1', 'left_outer')
df_join.write.parquet('PATH')

The same code works with input df size of 3TB without any modifications. 

Any suggestions?

--
Regards,

Rishi Shah


--
Regards,

Rishi Shah
Reply | Threaded
Open this post in threaded view
|

Re: [pyspark 2.4] broadcasting DataFrame throws error

Amit Joshi
Hi Rishi,

May be you have aready done these steps.
Can you check the size of the dataframe you are trying to broadcast using 
logInfo(SizeEstimator.estimate(df))
and adjust the driver similarly.

There is one more issue which I found was in spark 2.
Broadcast does not work in cache data. It is possible this may not be the issue. You can check at your end the same problem.


And can you pls tell what issue was solved in spark 3, which you are referring.

Regards
Amit


On Saturday, September 19, 2020, Rishi Shah <[hidden email]> wrote:
Thanks Amit. I have tried increasing driver memory , also tried increasing max result size returned to the driver. Nothing works, I believe spark is not able to determine the fact that the result to be broadcasted is small enough because input data is huge? When I tried this in 2 stages, write out the grouped data and use that to join using broadcast, spark has no issues broadcasting this.

When I was checking Spark 3 documentation, it seems like this issue may have been addressed in Spark 3 but not in earlier version?

On Thu, Sep 17, 2020 at 11:35 PM Amit Joshi <[hidden email]> wrote:
Hi,

I think problem lies with driver memory. Broadcast in spark work by collecting all the data to driver and then driver broadcasting to all the executors. Different strategy could be employed for trasfer like bit torrent though.

Please try increasing the driver memory. See if it works.

Regards,
Amit


On Thursday, September 17, 2020, Rishi Shah <[hidden email]> wrote:
Hello All,

Hope this email finds you well. I have a dataframe of size 8TB (parquet snappy compressed), however I group it by a column and get a much smaller aggregated dataframe of size 700 rows (just two columns, key and count). When I use it like below to broadcast this aggregated result, it throws dataframe can not be broadcasted error.

df_agg = df.groupBy('column1').count().cache()
# df_agg.count()
df_join = df.join(broadcast(df_agg), 'column1', 'left_outer')
df_join.write.parquet('PATH')

The same code works with input df size of 3TB without any modifications. 

Any suggestions?

--
Regards,

Rishi Shah


--
Regards,

Rishi Shah
Reply | Threaded
Open this post in threaded view
|

Re: [pyspark 2.4] broadcasting DataFrame throws error

rishishah.star
Thanks Amit, I was referring to dynamic partition pruning (https://issues.apache.org/jira/browse/SPARK-11150) & adaptive query execution (https://issues.apache.org/jira/browse/SPARK-31412) in Sparkk 3 - where it would figure out right partitions & pushes the filters to input before applying the join.

On Sat, Sep 19, 2020 at 1:31 AM Amit Joshi <[hidden email]> wrote:
Hi Rishi,

May be you have aready done these steps.
Can you check the size of the dataframe you are trying to broadcast using 
logInfo(SizeEstimator.estimate(df))
and adjust the driver similarly.

There is one more issue which I found was in spark 2.
Broadcast does not work in cache data. It is possible this may not be the issue. You can check at your end the same problem.


And can you pls tell what issue was solved in spark 3, which you are referring.

Regards
Amit


On Saturday, September 19, 2020, Rishi Shah <[hidden email]> wrote:
Thanks Amit. I have tried increasing driver memory , also tried increasing max result size returned to the driver. Nothing works, I believe spark is not able to determine the fact that the result to be broadcasted is small enough because input data is huge? When I tried this in 2 stages, write out the grouped data and use that to join using broadcast, spark has no issues broadcasting this.

When I was checking Spark 3 documentation, it seems like this issue may have been addressed in Spark 3 but not in earlier version?

On Thu, Sep 17, 2020 at 11:35 PM Amit Joshi <[hidden email]> wrote:
Hi,

I think problem lies with driver memory. Broadcast in spark work by collecting all the data to driver and then driver broadcasting to all the executors. Different strategy could be employed for trasfer like bit torrent though.

Please try increasing the driver memory. See if it works.

Regards,
Amit


On Thursday, September 17, 2020, Rishi Shah <[hidden email]> wrote:
Hello All,

Hope this email finds you well. I have a dataframe of size 8TB (parquet snappy compressed), however I group it by a column and get a much smaller aggregated dataframe of size 700 rows (just two columns, key and count). When I use it like below to broadcast this aggregated result, it throws dataframe can not be broadcasted error.

df_agg = df.groupBy('column1').count().cache()
# df_agg.count()
df_join = df.join(broadcast(df_agg), 'column1', 'left_outer')
df_join.write.parquet('PATH')

The same code works with input df size of 3TB without any modifications. 

Any suggestions?

--
Regards,

Rishi Shah


--
Regards,

Rishi Shah


--
Regards,

Rishi Shah