Spark 2.4 partitions and tasks

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Spark 2.4 partitions and tasks

pedroT
Hi,
I am running a job in spark (using aws emr) and some stages are taking a lot more using spark  2.4 instead of Spark 2.3.1:

Spark 2.4:
image.png

Spark 2.3.1:
image.png

With Spark 2.4, the keyBy operation take more than 10X what it took with Spark 2.3.1
It seems to be related to the number of tasks / partitions.

Questions:
- Is it not supposed that the number of task of a job is related to number of parts of the RDD left by the previous job? Did that change in version 2.4??
- Which tools/ configuration may I try, to reduce this aberrant downgrade of performance??

Thanks.
Pedro.
Reply | Threaded
Open this post in threaded view
|

Re: Spark 2.4 partitions and tasks

pedroT
128 is the default parallelism defined for the cluster.
The question now is why keyBy operation is using default parallelism instead of the number of partition of the RDD created by the previous step (5580).
Any clues?

El jue., 7 de feb. de 2019 a la(s) 15:30, Pedro Tuero ([hidden email]) escribió:
Hi,
I am running a job in spark (using aws emr) and some stages are taking a lot more using spark  2.4 instead of Spark 2.3.1:

Spark 2.4:
image.png

Spark 2.3.1:
image.png

With Spark 2.4, the keyBy operation take more than 10X what it took with Spark 2.3.1
It seems to be related to the number of tasks / partitions.

Questions:
- Is it not supposed that the number of task of a job is related to number of parts of the RDD left by the previous job? Did that change in version 2.4??
- Which tools/ configuration may I try, to reduce this aberrant downgrade of performance??

Thanks.
Pedro.
Reply | Threaded
Open this post in threaded view
|

Re: Spark 2.4 partitions and tasks

pedroT
I did a repartition to 10000 (hardcoded) before the keyBy and it ends in 1.2 minutes.
The questions remain open, because I don't want to harcode paralellism.

El vie., 8 de feb. de 2019 a la(s) 12:50, Pedro Tuero ([hidden email]) escribió:
128 is the default parallelism defined for the cluster.
The question now is why keyBy operation is using default parallelism instead of the number of partition of the RDD created by the previous step (5580).
Any clues?

El jue., 7 de feb. de 2019 a la(s) 15:30, Pedro Tuero ([hidden email]) escribió:
Hi,
I am running a job in spark (using aws emr) and some stages are taking a lot more using spark  2.4 instead of Spark 2.3.1:

Spark 2.4:
image.png

Spark 2.3.1:
image.png

With Spark 2.4, the keyBy operation take more than 10X what it took with Spark 2.3.1
It seems to be related to the number of tasks / partitions.

Questions:
- Is it not supposed that the number of task of a job is related to number of parts of the RDD left by the previous job? Did that change in version 2.4??
- Which tools/ configuration may I try, to reduce this aberrant downgrade of performance??

Thanks.
Pedro.
Reply | Threaded
Open this post in threaded view
|

Re: Spark 2.4 partitions and tasks

Jacek Laskowski
Hi,

Can you show the plans with explain(extended=true) for both versions? That's where I'd start to pinpoint the issue. Perhaps the underlying execution engine change to affect keyBy? Dunno and guessing...

On Fri, Feb 8, 2019 at 5:09 PM Pedro Tuero <[hidden email]> wrote:
I did a repartition to 10000 (hardcoded) before the keyBy and it ends in 1.2 minutes.
The questions remain open, because I don't want to harcode paralellism.

El vie., 8 de feb. de 2019 a la(s) 12:50, Pedro Tuero ([hidden email]) escribió:
128 is the default parallelism defined for the cluster.
The question now is why keyBy operation is using default parallelism instead of the number of partition of the RDD created by the previous step (5580).
Any clues?

El jue., 7 de feb. de 2019 a la(s) 15:30, Pedro Tuero ([hidden email]) escribió:
Hi,
I am running a job in spark (using aws emr) and some stages are taking a lot more using spark  2.4 instead of Spark 2.3.1:

Spark 2.4:
image.png

Spark 2.3.1:
image.png

With Spark 2.4, the keyBy operation take more than 10X what it took with Spark 2.3.1
It seems to be related to the number of tasks / partitions.

Questions:
- Is it not supposed that the number of task of a job is related to number of parts of the RDD left by the previous job? Did that change in version 2.4??
- Which tools/ configuration may I try, to reduce this aberrant downgrade of performance??

Thanks.
Pedro.
Reply | Threaded
Open this post in threaded view
|

Re: Spark 2.4 partitions and tasks

pedroT
Hi Jacek.
I 'm not using SparkSql, I'm using RDD API directly.
I can confirm that the jobs and stages are the same on both executions.
In the environment tab of the web UI, when using spark 2.4 spark.default.parallelism=128 is shown while in 2.3.1 is not.
But in 2.3.1 should be the same, because 128 is the number of cores of cluster * 2 and it didn't change in the latest version.

In the example I gave, 5580 is the number of parts left by a previous job in S3, in Hadoop sequence files. So the initial RDD has 5580 partitions.
While in 2.3.1, RDDs that are created with transformations from the initial RDD conserve the same number of partitions, in 2.4 the number of partitions reset to default.  
So RDD1, the product of the first mapToPair, prints 5580 when getPartitions() is called in 2.3.1, while prints 128 in 2.4.

Regards,
Pedro


El mar., 12 de feb. de 2019 a la(s) 09:13, Jacek Laskowski ([hidden email]) escribió:
Hi,

Can you show the plans with explain(extended=true) for both versions? That's where I'd start to pinpoint the issue. Perhaps the underlying execution engine change to affect keyBy? Dunno and guessing...

On Fri, Feb 8, 2019 at 5:09 PM Pedro Tuero <[hidden email]> wrote:
I did a repartition to 10000 (hardcoded) before the keyBy and it ends in 1.2 minutes.
The questions remain open, because I don't want to harcode paralellism.

El vie., 8 de feb. de 2019 a la(s) 12:50, Pedro Tuero ([hidden email]) escribió:
128 is the default parallelism defined for the cluster.
The question now is why keyBy operation is using default parallelism instead of the number of partition of the RDD created by the previous step (5580).
Any clues?

El jue., 7 de feb. de 2019 a la(s) 15:30, Pedro Tuero ([hidden email]) escribió:
Hi,
I am running a job in spark (using aws emr) and some stages are taking a lot more using spark  2.4 instead of Spark 2.3.1:

Spark 2.4:
image.png

Spark 2.3.1:
image.png

With Spark 2.4, the keyBy operation take more than 10X what it took with Spark 2.3.1
It seems to be related to the number of tasks / partitions.

Questions:
- Is it not supposed that the number of task of a job is related to number of parts of the RDD left by the previous job? Did that change in version 2.4??
- Which tools/ configuration may I try, to reduce this aberrant downgrade of performance??

Thanks.
Pedro.
Reply | Threaded
Open this post in threaded view
|

Re: Spark 2.4 partitions and tasks

pedroT
And this is happening in every job I run. It is not just one case. If I add a forced repartitions it works fine, even better than before. But I run the same code for different inputs so the number to make repartitions must be related to the input. 


El mar., 12 de feb. de 2019 a la(s) 11:22, Pedro Tuero ([hidden email]) escribió:
Hi Jacek.
I 'm not using SparkSql, I'm using RDD API directly.
I can confirm that the jobs and stages are the same on both executions.
In the environment tab of the web UI, when using spark 2.4 spark.default.parallelism=128 is shown while in 2.3.1 is not.
But in 2.3.1 should be the same, because 128 is the number of cores of cluster * 2 and it didn't change in the latest version.

In the example I gave, 5580 is the number of parts left by a previous job in S3, in Hadoop sequence files. So the initial RDD has 5580 partitions.
While in 2.3.1, RDDs that are created with transformations from the initial RDD conserve the same number of partitions, in 2.4 the number of partitions reset to default.  
So RDD1, the product of the first mapToPair, prints 5580 when getPartitions() is called in 2.3.1, while prints 128 in 2.4.

Regards,
Pedro


El mar., 12 de feb. de 2019 a la(s) 09:13, Jacek Laskowski ([hidden email]) escribió:
Hi,

Can you show the plans with explain(extended=true) for both versions? That's where I'd start to pinpoint the issue. Perhaps the underlying execution engine change to affect keyBy? Dunno and guessing...

On Fri, Feb 8, 2019 at 5:09 PM Pedro Tuero <[hidden email]> wrote:
I did a repartition to 10000 (hardcoded) before the keyBy and it ends in 1.2 minutes.
The questions remain open, because I don't want to harcode paralellism.

El vie., 8 de feb. de 2019 a la(s) 12:50, Pedro Tuero ([hidden email]) escribió:
128 is the default parallelism defined for the cluster.
The question now is why keyBy operation is using default parallelism instead of the number of partition of the RDD created by the previous step (5580).
Any clues?

El jue., 7 de feb. de 2019 a la(s) 15:30, Pedro Tuero ([hidden email]) escribió:
Hi,
I am running a job in spark (using aws emr) and some stages are taking a lot more using spark  2.4 instead of Spark 2.3.1:

Spark 2.4:
image.png

Spark 2.3.1:
image.png

With Spark 2.4, the keyBy operation take more than 10X what it took with Spark 2.3.1
It seems to be related to the number of tasks / partitions.

Questions:
- Is it not supposed that the number of task of a job is related to number of parts of the RDD left by the previous job? Did that change in version 2.4??
- Which tools/ configuration may I try, to reduce this aberrant downgrade of performance??

Thanks.
Pedro.
Reply | Threaded
Open this post in threaded view
|

Re: Spark 2.4 partitions and tasks

pedroT
* It is not getPartitions() but getNumPartitions().

El mar., 12 de feb. de 2019 a la(s) 13:08, Pedro Tuero ([hidden email]) escribió:
And this is happening in every job I run. It is not just one case. If I add a forced repartitions it works fine, even better than before. But I run the same code for different inputs so the number to make repartitions must be related to the input. 


El mar., 12 de feb. de 2019 a la(s) 11:22, Pedro Tuero ([hidden email]) escribió:
Hi Jacek.
I 'm not using SparkSql, I'm using RDD API directly.
I can confirm that the jobs and stages are the same on both executions.
In the environment tab of the web UI, when using spark 2.4 spark.default.parallelism=128 is shown while in 2.3.1 is not.
But in 2.3.1 should be the same, because 128 is the number of cores of cluster * 2 and it didn't change in the latest version.

In the example I gave, 5580 is the number of parts left by a previous job in S3, in Hadoop sequence files. So the initial RDD has 5580 partitions.
While in 2.3.1, RDDs that are created with transformations from the initial RDD conserve the same number of partitions, in 2.4 the number of partitions reset to default.  
So RDD1, the product of the first mapToPair, prints 5580 when getPartitions() is called in 2.3.1, while prints 128 in 2.4.

Regards,
Pedro


El mar., 12 de feb. de 2019 a la(s) 09:13, Jacek Laskowski ([hidden email]) escribió:
Hi,

Can you show the plans with explain(extended=true) for both versions? That's where I'd start to pinpoint the issue. Perhaps the underlying execution engine change to affect keyBy? Dunno and guessing...

On Fri, Feb 8, 2019 at 5:09 PM Pedro Tuero <[hidden email]> wrote:
I did a repartition to 10000 (hardcoded) before the keyBy and it ends in 1.2 minutes.
The questions remain open, because I don't want to harcode paralellism.

El vie., 8 de feb. de 2019 a la(s) 12:50, Pedro Tuero ([hidden email]) escribió:
128 is the default parallelism defined for the cluster.
The question now is why keyBy operation is using default parallelism instead of the number of partition of the RDD created by the previous step (5580).
Any clues?

El jue., 7 de feb. de 2019 a la(s) 15:30, Pedro Tuero ([hidden email]) escribió:
Hi,
I am running a job in spark (using aws emr) and some stages are taking a lot more using spark  2.4 instead of Spark 2.3.1:

Spark 2.4:
image.png

Spark 2.3.1:
image.png

With Spark 2.4, the keyBy operation take more than 10X what it took with Spark 2.3.1
It seems to be related to the number of tasks / partitions.

Questions:
- Is it not supposed that the number of task of a job is related to number of parts of the RDD left by the previous job? Did that change in version 2.4??
- Which tools/ configuration may I try, to reduce this aberrant downgrade of performance??

Thanks.
Pedro.
Reply | Threaded
Open this post in threaded view
|

Re: Spark 2.4 partitions and tasks

pedroT
I changed the explicit repartitions to parameters  of parallelism to pairRddFunctions, and it works better but I still have to get a magic number.
From https://spark.apache.org/docs/latest/tuning.html:
Clusters will not be fully utilized unless you set the level of parallelism for each operation high enough. Spark automatically sets the number of “map” tasks to run on each file according to its size (though you can control it through optional parameters to SparkContext.textFile, etc), and for distributed “reduce” operations, such as groupByKey and reduceByKey, it uses the largest parent RDD’s number of partitions. You can pass the level of parallelism as a second argument (see the spark.PairRDDFunctions documentation), or set the config property spark.default.parallelism to change the default. In general, we recommend 2-3 tasks per CPU core in your cluster.

I wonder how is it possible that my jobs work fine with tens of task per cores and work so badly (or even do not) with 2-3 task per core, as it is recommended in Spark documentation.
As the number of tasks in previous versions depended on the input partitions, I can assume that jobs were working by chance, because the input partitions could be more or less, left by another Spark job or Hadoop job or just text.
It is possible that the jobs could run in much less time if I choose better parallelism. I have a dozen jobs, running with different input size and partitions... so it is very hard to find a calculation to fit all scenarios.








El mar., 12 de feb. de 2019 a la(s) 15:25, Pedro Tuero ([hidden email]) escribió:
* It is not getPartitions() but getNumPartitions().

El mar., 12 de feb. de 2019 a la(s) 13:08, Pedro Tuero ([hidden email]) escribió:
And this is happening in every job I run. It is not just one case. If I add a forced repartitions it works fine, even better than before. But I run the same code for different inputs so the number to make repartitions must be related to the input. 


El mar., 12 de feb. de 2019 a la(s) 11:22, Pedro Tuero ([hidden email]) escribió:
Hi Jacek.
I 'm not using SparkSql, I'm using RDD API directly.
I can confirm that the jobs and stages are the same on both executions.
In the environment tab of the web UI, when using spark 2.4 spark.default.parallelism=128 is shown while in 2.3.1 is not.
But in 2.3.1 should be the same, because 128 is the number of cores of cluster * 2 and it didn't change in the latest version.

In the example I gave, 5580 is the number of parts left by a previous job in S3, in Hadoop sequence files. So the initial RDD has 5580 partitions.
While in 2.3.1, RDDs that are created with transformations from the initial RDD conserve the same number of partitions, in 2.4 the number of partitions reset to default.  
So RDD1, the product of the first mapToPair, prints 5580 when getPartitions() is called in 2.3.1, while prints 128 in 2.4.

Regards,
Pedro


El mar., 12 de feb. de 2019 a la(s) 09:13, Jacek Laskowski ([hidden email]) escribió:
Hi,

Can you show the plans with explain(extended=true) for both versions? That's where I'd start to pinpoint the issue. Perhaps the underlying execution engine change to affect keyBy? Dunno and guessing...

On Fri, Feb 8, 2019 at 5:09 PM Pedro Tuero <[hidden email]> wrote:
I did a repartition to 10000 (hardcoded) before the keyBy and it ends in 1.2 minutes.
The questions remain open, because I don't want to harcode paralellism.

El vie., 8 de feb. de 2019 a la(s) 12:50, Pedro Tuero ([hidden email]) escribió:
128 is the default parallelism defined for the cluster.
The question now is why keyBy operation is using default parallelism instead of the number of partition of the RDD created by the previous step (5580).
Any clues?

El jue., 7 de feb. de 2019 a la(s) 15:30, Pedro Tuero ([hidden email]) escribió:
Hi,
I am running a job in spark (using aws emr) and some stages are taking a lot more using spark  2.4 instead of Spark 2.3.1:

Spark 2.4:
image.png

Spark 2.3.1:
image.png

With Spark 2.4, the keyBy operation take more than 10X what it took with Spark 2.3.1
It seems to be related to the number of tasks / partitions.

Questions:
- Is it not supposed that the number of task of a job is related to number of parts of the RDD left by the previous job? Did that change in version 2.4??
- Which tools/ configuration may I try, to reduce this aberrant downgrade of performance??

Thanks.
Pedro.