Coalesce vs reduce operation parameter

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Coalesce vs reduce operation parameter

pedroT
I was reviewing a spark java application running on aws emr.

The code was like:
RDD.reduceByKey(func).coalesce(number).saveAsTextFile()

That stage took hours to complete.
I changed to:
RDD.reduceByKey(func, number).saveAsTextFile()
And it now takes less than 2 minutes, and the final output is the same.

So, is it a bug or a feature?
Why spark doesn't treat a coalesce after a reduce like a reduce with output partitions parameterized?

Just for understanding,
Thanks,
Pedro.



Reply | Threaded
Open this post in threaded view
|

Re: Coalesce vs reduce operation parameter

vaquar khan
HI Pedro,

What is your usecase ,why you used coqlesce ,coalesce() is very expensive operations as they shuffle the data across many partitions hence try to minimize repartition as much as possible.

Regards,
Vaquar khan


On Thu, Mar 18, 2021, 5:47 PM Pedro Tuero <[hidden email]> wrote:
I was reviewing a spark java application running on aws emr.

The code was like:
RDD.reduceByKey(func).coalesce(number).saveAsTextFile()

That stage took hours to complete.
I changed to:
RDD.reduceByKey(func, number).saveAsTextFile()
And it now takes less than 2 minutes, and the final output is the same.

So, is it a bug or a feature?
Why spark doesn't treat a coalesce after a reduce like a reduce with output partitions parameterized?

Just for understanding,
Thanks,
Pedro.



Reply | Threaded
Open this post in threaded view
|

Re: Coalesce vs reduce operation parameter

Attila Zsolt Piros
Hi!

Actually coalesce() is usually a cheap operation as it moves some existing partitions from one node to another. So it is not a (full) shuffle.

See the documentation, especially:

This results in a narrow dependency, e.g. if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of the 100 new partitions will claim 10 of the current partitions.

 The repartition() is the expensive method.

Regarding Pedro's problem for sure RDD.reduceByKey(func, number).saveAsTextFile() is expected to be better but the hours vs 2 minutes sounds really bad.
What is the number of partitions you are going from and what is the target number of partitions (the number in your example)?

Probably you should compare the stages tab and stage details on the UI. So if you need the community help please share the event logs of the two runs and the applications logs might be needed too (the event log and applications log must be from the same run for both cases). 

Best Regards,
Attila 

On Sat, Mar 20, 2021 at 12:46 PM vaquar khan <[hidden email]> wrote:
HI Pedro,

What is your usecase ,why you used coqlesce ,coalesce() is very expensive operations as they shuffle the data across many partitions hence try to minimize repartition as much as possible.

Regards,
Vaquar khan


On Thu, Mar 18, 2021, 5:47 PM Pedro Tuero <[hidden email]> wrote:
I was reviewing a spark java application running on aws emr.

The code was like:
RDD.reduceByKey(func).coalesce(number).saveAsTextFile()

That stage took hours to complete.
I changed to:
RDD.reduceByKey(func, number).saveAsTextFile()
And it now takes less than 2 minutes, and the final output is the same.

So, is it a bug or a feature?
Why spark doesn't treat a coalesce after a reduce like a reduce with output partitions parameterized?

Just for understanding,
Thanks,
Pedro.