[pyspark2.4+] A lot of tasks failed, but job eventually completes

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

[pyspark2.4+] A lot of tasks failed, but job eventually completes

rishishah.star
Hello All,

One of my jobs, keep getting into this situation where 100s of tasks keep failing with below error but job eventually completes.

org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 16384 bytes of memory

Could someone advice? 

--
Regards,

Rishi Shah
Reply | Threaded
Open this post in threaded view
|

Re: [pyspark2.4+] A lot of tasks failed, but job eventually completes

hemant singh
You can try increasing the executor memory, generally this error comes when there is not enough memory in individual executors. 
Job is getting completed may be because when tasks are re-scheduled it would be going through.

Thanks.

On Mon, 6 Jan 2020 at 5:47 AM, Rishi Shah <[hidden email]> wrote:
Hello All,

One of my jobs, keep getting into this situation where 100s of tasks keep failing with below error but job eventually completes.

org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 16384 bytes of memory

Could someone advice? 

--
Regards,

Rishi Shah
Reply | Threaded
Open this post in threaded view
|

Re: [pyspark2.4+] A lot of tasks failed, but job eventually completes

rishishah.star
Thanks Hemant, underlying data volume increased from 550GB to 690GB and now the same job doesn't succeed. I tried incrementing executor memory to 20G as well, still fails. I am running this in Databricks and start cluster with 20G assigned to spark.executor.memory property.

Also some more information on the job, I have about 4 window functions on this dataset before it gets written out. 

Any other ideas?

Thanks,
-Shraddha

On Sun, Jan 5, 2020 at 11:06 PM hemant singh <[hidden email]> wrote:
You can try increasing the executor memory, generally this error comes when there is not enough memory in individual executors. 
Job is getting completed may be because when tasks are re-scheduled it would be going through.

Thanks.

On Mon, 6 Jan 2020 at 5:47 AM, Rishi Shah <[hidden email]> wrote:
Hello All,

One of my jobs, keep getting into this situation where 100s of tasks keep failing with below error but job eventually completes.

org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 16384 bytes of memory

Could someone advice? 

--
Regards,

Rishi Shah


--
Regards,

Rishi Shah
Reply | Threaded
Open this post in threaded view
|

Re: [pyspark2.4+] A lot of tasks failed, but job eventually completes

hemant singh
You can try repartitioning the data, if it’s a skewed data then you may need to salt the keys for better partitioning.
Are you using a coalesce or any other fn which brings the data to lesser nodes. Window function also incurs shuffling that could be an issue.

On Mon, 6 Jan 2020 at 9:49 AM, Rishi Shah <[hidden email]> wrote:
Thanks Hemant, underlying data volume increased from 550GB to 690GB and now the same job doesn't succeed. I tried incrementing executor memory to 20G as well, still fails. I am running this in Databricks and start cluster with 20G assigned to spark.executor.memory property.

Also some more information on the job, I have about 4 window functions on this dataset before it gets written out. 

Any other ideas?

Thanks,
-Shraddha

On Sun, Jan 5, 2020 at 11:06 PM hemant singh <[hidden email]> wrote:
You can try increasing the executor memory, generally this error comes when there is not enough memory in individual executors. 
Job is getting completed may be because when tasks are re-scheduled it would be going through.

Thanks.

On Mon, 6 Jan 2020 at 5:47 AM, Rishi Shah <[hidden email]> wrote:
Hello All,

One of my jobs, keep getting into this situation where 100s of tasks keep failing with below error but job eventually completes.

org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 16384 bytes of memory

Could someone advice? 

--
Regards,

Rishi Shah


--
Regards,

Rishi Shah
Reply | Threaded
Open this post in threaded view
|

Re: [pyspark2.4+] A lot of tasks failed, but job eventually completes

Enrico Minack
Note that repartitioning helps to increase the number of partitions (and hence to reduce the size of partitions and required executor memory), but subsequent transformations like join will repartition data again with the configured number of partitions (spark.sql.shuffle.partitions), virtually undoing the repartitioning, e.g.:

data                    // may have any number of partitions
  .repartition(1000)    // has 1000 partitions
  .join(table)          // has spark.sql.shuffle.partitions partitions

If you use RDDs, you need to configure spark.default.parallelism rather than spark.sql.shuffle.partitions.

Given you have 700GB of data, the default of 200 partitions mean that each partition is 3,5 GB (equivalent of input data) in size. Since increasing executor memory is limited by the available memory, executor memory does not scale for big data. Increasing the number of partitions is the natural way of scaling in Spark land.

Having hundreds of tasks that fail is an indication that you do not suffer from skewed data but from large partitions. Skewed data usually has a few tasks that keep failing.

It is easy to check for skewed data in the Spark UI. Open a stage that has failing tasks and look at the Summary Metrics, e.g.:
If the Max number of Shuffle Read Size is way higher than the 75th percentile, than this indicates a poor distribution of the data (or more precise the partitioning key) of this stage.

You can also sort the tasks by the "Shuffle Read Size / Records" column and see if numbers are evenly distributed (ideally).

I hope this helped.

Enrico



Am 06.01.20 um 06:27 schrieb hemant singh:
You can try repartitioning the data, if it’s a skewed data then you may need to salt the keys for better partitioning.
Are you using a coalesce or any other fn which brings the data to lesser nodes. Window function also incurs shuffling that could be an issue.

On Mon, 6 Jan 2020 at 9:49 AM, Rishi Shah <[hidden email]> wrote:
Thanks Hemant, underlying data volume increased from 550GB to 690GB and now the same job doesn't succeed. I tried incrementing executor memory to 20G as well, still fails. I am running this in Databricks and start cluster with 20G assigned to spark.executor.memory property.

Also some more information on the job, I have about 4 window functions on this dataset before it gets written out. 

Any other ideas?

Thanks,
-Shraddha

On Sun, Jan 5, 2020 at 11:06 PM hemant singh <[hidden email]> wrote:
You can try increasing the executor memory, generally this error comes when there is not enough memory in individual executors. 
Job is getting completed may be because when tasks are re-scheduled it would be going through.

Thanks.

On Mon, 6 Jan 2020 at 5:47 AM, Rishi Shah <[hidden email]> wrote:
Hello All,

One of my jobs, keep getting into this situation where 100s of tasks keep failing with below error but job eventually completes.

org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 16384 bytes of memory

Could someone advice? 

--
Regards,

Rishi Shah


--
Regards,

Rishi Shah


Reply | Threaded
Open this post in threaded view
|

Re: [pyspark2.4+] A lot of tasks failed, but job eventually completes

rishishah.star
Thank you Hemant and Enrico. Much appreciated. 

your input really got me closer to the issue, I realized every task didn't get enough memory and hence tasks with large partitions kept failing. I increased executor memory and at the same time increased number of partitions as well. This made the job succeed with flying colors. Really appreciate the help here.

I do have one more question, when do you recommend using RDDs over data frames? Because at time using windows may get a bit complicated but there's always some or the other way to use windows on data frames. I always get confused as to when to fall back on RDD approach? Any use case in your experience warrant for RDD use, for better performance?

Thanks,
Rishi 

On Mon, Jan 6, 2020 at 4:18 AM Enrico Minack <[hidden email]> wrote:
Note that repartitioning helps to increase the number of partitions (and hence to reduce the size of partitions and required executor memory), but subsequent transformations like join will repartition data again with the configured number of partitions (spark.sql.shuffle.partitions), virtually undoing the repartitioning, e.g.:

data                    // may have any number of partitions
  .repartition(1000)    // has 1000 partitions
  .join(table)          // has spark.sql.shuffle.partitions partitions

If you use RDDs, you need to configure spark.default.parallelism rather than spark.sql.shuffle.partitions.

Given you have 700GB of data, the default of 200 partitions mean that each partition is 3,5 GB (equivalent of input data) in size. Since increasing executor memory is limited by the available memory, executor memory does not scale for big data. Increasing the number of partitions is the natural way of scaling in Spark land.

Having hundreds of tasks that fail is an indication that you do not suffer from skewed data but from large partitions. Skewed data usually has a few tasks that keep failing.

It is easy to check for skewed data in the Spark UI. Open a stage that has failing tasks and look at the Summary Metrics, e.g.:
If the Max number of Shuffle Read Size is way higher than the 75th percentile, than this indicates a poor distribution of the data (or more precise the partitioning key) of this stage.

You can also sort the tasks by the "Shuffle Read Size / Records" column and see if numbers are evenly distributed (ideally).

I hope this helped.

Enrico



Am 06.01.20 um 06:27 schrieb hemant singh:
You can try repartitioning the data, if it’s a skewed data then you may need to salt the keys for better partitioning.
Are you using a coalesce or any other fn which brings the data to lesser nodes. Window function also incurs shuffling that could be an issue.

On Mon, 6 Jan 2020 at 9:49 AM, Rishi Shah <[hidden email]> wrote:
Thanks Hemant, underlying data volume increased from 550GB to 690GB and now the same job doesn't succeed. I tried incrementing executor memory to 20G as well, still fails. I am running this in Databricks and start cluster with 20G assigned to spark.executor.memory property.

Also some more information on the job, I have about 4 window functions on this dataset before it gets written out. 

Any other ideas?

Thanks,
-Shraddha

On Sun, Jan 5, 2020 at 11:06 PM hemant singh <[hidden email]> wrote:
You can try increasing the executor memory, generally this error comes when there is not enough memory in individual executors. 
Job is getting completed may be because when tasks are re-scheduled it would be going through.

Thanks.

On Mon, 6 Jan 2020 at 5:47 AM, Rishi Shah <[hidden email]> wrote:
Hello All,

One of my jobs, keep getting into this situation where 100s of tasks keep failing with below error but job eventually completes.

org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 16384 bytes of memory

Could someone advice? 

--
Regards,

Rishi Shah


--
Regards,

Rishi Shah




--
Regards,

Rishi Shah
Reply | Threaded
Open this post in threaded view
|

Re: [pyspark2.4+] When to choose RDD over Dataset, was: A lot of tasks failed, but job eventually completes

Enrico Minack
Hi Rishi,

generally it is better to avoid RDDs if you can and use the Dataset API. With Datasets (formerly DataFrames) Spark can optimize your query / tree of transformations, RDDs are opaque. Datasets have an optimized memory footprint. Pure Dataset operations provide you helpful information on the SQL tab in the Spark UI. For large transformations it is then easier to identify the transformations that cause you trouble. Switching from Dataset to RDD at some point hides all operations that happen before accessing the RDD so you lose the query debugging capability for that part.

That is my experience.

Enrico


Am 06.01.20 um 14:35 schrieb Rishi Shah:
Thank you Hemant and Enrico. Much appreciated. 

your input really got me closer to the issue, I realized every task didn't get enough memory and hence tasks with large partitions kept failing. I increased executor memory and at the same time increased number of partitions as well. This made the job succeed with flying colors. Really appreciate the help here.

I do have one more question, when do you recommend using RDDs over data frames? Because at time using windows may get a bit complicated but there's always some or the other way to use windows on data frames. I always get confused as to when to fall back on RDD approach? Any use case in your experience warrant for RDD use, for better performance?

Thanks,
Rishi 

On Mon, Jan 6, 2020 at 4:18 AM Enrico Minack <[hidden email]> wrote:
Note that repartitioning helps to increase the number of partitions (and hence to reduce the size of partitions and required executor memory), but subsequent transformations like join will repartition data again with the configured number of partitions (spark.sql.shuffle.partitions), virtually undoing the repartitioning, e.g.:

data                    // may have any number of partitions
  .repartition(1000)    // has 1000 partitions
  .join(table)          // has spark.sql.shuffle.partitions partitions

If you use RDDs, you need to configure spark.default.parallelism rather than spark.sql.shuffle.partitions.

Given you have 700GB of data, the default of 200 partitions mean that each partition is 3,5 GB (equivalent of input data) in size. Since increasing executor memory is limited by the available memory, executor memory does not scale for big data. Increasing the number of partitions is the natural way of scaling in Spark land.

Having hundreds of tasks that fail is an indication that you do not suffer from skewed data but from large partitions. Skewed data usually has a few tasks that keep failing.

It is easy to check for skewed data in the Spark UI. Open a stage that has failing tasks and look at the Summary Metrics, e.g.:
If the Max number of Shuffle Read Size is way higher than the 75th percentile, than this indicates a poor distribution of the data (or more precise the partitioning key) of this stage.

You can also sort the tasks by the "Shuffle Read Size / Records" column and see if numbers are evenly distributed (ideally).

I hope this helped.

Enrico



Am 06.01.20 um 06:27 schrieb hemant singh:
You can try repartitioning the data, if it’s a skewed data then you may need to salt the keys for better partitioning.
Are you using a coalesce or any other fn which brings the data to lesser nodes. Window function also incurs shuffling that could be an issue.

On Mon, 6 Jan 2020 at 9:49 AM, Rishi Shah <[hidden email]> wrote:
Thanks Hemant, underlying data volume increased from 550GB to 690GB and now the same job doesn't succeed. I tried incrementing executor memory to 20G as well, still fails. I am running this in Databricks and start cluster with 20G assigned to spark.executor.memory property.

Also some more information on the job, I have about 4 window functions on this dataset before it gets written out. 

Any other ideas?

Thanks,
-Shraddha

On Sun, Jan 5, 2020 at 11:06 PM hemant singh <[hidden email]> wrote:
You can try increasing the executor memory, generally this error comes when there is not enough memory in individual executors. 
Job is getting completed may be because when tasks are re-scheduled it would be going through.

Thanks.

On Mon, 6 Jan 2020 at 5:47 AM, Rishi Shah <[hidden email]> wrote:
Hello All,

One of my jobs, keep getting into this situation where 100s of tasks keep failing with below error but job eventually completes.

org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 16384 bytes of memory

Could someone advice? 

--
Regards,

Rishi Shah


--
Regards,

Rishi Shah




--
Regards,

Rishi Shah