How to address seemingly low core utilization on a spark workload?

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

How to address seemingly low core utilization on a spark workload?

Vitaliy Pisarev
I have a workload that runs on a cluster of 300 cores. 
Below is a plot of the amount of active tasks over time during the execution of this workload:

image.png

What I deduce is that there are substantial intervals where the cores are heavily under-utilised. 

What actions can I take to:
  • Increase the efficiency (== core utilisation) of the cluster?
  • Understand the root causes behind the drops in core utilisation?
Reply | Threaded
Open this post in threaded view
|

Re: How to address seemingly low core utilization on a spark workload?

Brandon Geise

I recently came across this (haven’t tried it out yet) but maybe it can help guide you to identify the root cause.

https://github.com/groupon/sparklint

 

 

From: Vitaliy Pisarev <[hidden email]>
Date: Thursday, November 15, 2018 at 10:08 AM
To: user <[hidden email]>
Cc: David Markovitz <[hidden email]>
Subject: How to address seemingly low core utilization on a spark workload?

 

I have a workload that runs on a cluster of 300 cores. 

Below is a plot of the amount of active tasks over time during the execution of this workload:

 

image.png

 

What I deduce is that there are substantial intervals where the cores are heavily under-utilised. 

 

What actions can I take to:

  • Increase the efficiency (== core utilisation) of the cluster?
  • Understand the root causes behind the drops in core utilisation?


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: How to address seemingly low core utilization on a spark workload?

Thakrar, Jayesh
In reply to this post by Vitaliy Pisarev

Can you shed more light on what kind of processing you are doing?

 

One common pattern that I have seen for active core/executor utilization dropping to zero is while reading ORC data and the driver seems (I think) to be doing schema validation.

In my case I would have hundreds of thousands of ORC data files and there is dead silence for about 1-2 hours.

I have tried providing a schema and disabling schema validation while reading the ORC data, but that does not seem to help (Spark 2.2.1).

 

And as you know, in most cases, there is a linear relationship between number of partitions in your data and the concurrently active executors.

 

Another thing I would suggest is use the following two API calls/method – they will annotate the spark stages and jobs with what is being executed in the Spark UI.

SparkContext.setJobGroup(….)

SparkContext.setJobDescription(….)

 

From: Vitaliy Pisarev <[hidden email]>
Date: Thursday, November 15, 2018 at 8:51 AM
To: user <[hidden email]>
Cc: David Markovitz <[hidden email]>
Subject: How to address seemingly low core utilization on a spark workload?

 

I have a workload that runs on a cluster of 300 cores. 

Below is a plot of the amount of active tasks over time during the execution of this workload:

 

image.png

 

What I deduce is that there are substantial intervals where the cores are heavily under-utilised. 

 

What actions can I take to:

  • Increase the efficiency (== core utilisation) of the cluster?
  • Understand the root causes behind the drops in core utilisation?
Reply | Threaded
Open this post in threaded view
|

Re: How to address seemingly low core utilization on a spark workload?

Vitaliy Pisarev
I am working with parquets and the metadata reading there is quite fast as there are at most 16 files (a couple of gigs each).

I find it very hard to answer the question: "how many partitions do you have?", many spark operations do not preserve partitioning and I have a lot of filtering and grouping going on.
What I can say is that I specified spark.sql.shuffle.partitions to 30,000. 

I am not worried that there are not enough partitions to keep the cores working. Having said that I do see that the high utilisation correlates heavily with shuffle read/write. Whereas low utilisation correlates with no shuffling. 
This leads me to the conclusion that compared to the amount of shuffling, the cluster is doing very little work.

Question is what can I do about it.

On Thu, Nov 15, 2018 at 5:29 PM Thakrar, Jayesh <[hidden email]> wrote:

Can you shed more light on what kind of processing you are doing?

 

One common pattern that I have seen for active core/executor utilization dropping to zero is while reading ORC data and the driver seems (I think) to be doing schema validation.

In my case I would have hundreds of thousands of ORC data files and there is dead silence for about 1-2 hours.

I have tried providing a schema and disabling schema validation while reading the ORC data, but that does not seem to help (Spark 2.2.1).

 

And as you know, in most cases, there is a linear relationship between number of partitions in your data and the concurrently active executors.

 

Another thing I would suggest is use the following two API calls/method – they will annotate the spark stages and jobs with what is being executed in the Spark UI.

SparkContext.setJobGroup(….)

SparkContext.setJobDescription(….)

 

From: Vitaliy Pisarev <[hidden email]>
Date: Thursday, November 15, 2018 at 8:51 AM
To: user <[hidden email]>
Cc: David Markovitz <[hidden email]>
Subject: How to address seemingly low core utilization on a spark workload?

 

I have a workload that runs on a cluster of 300 cores. 

Below is a plot of the amount of active tasks over time during the execution of this workload:

 

 

What I deduce is that there are substantial intervals where the cores are heavily under-utilised. 

 

What actions can I take to:

  • Increase the efficiency (== core utilisation) of the cluster?
  • Understand the root causes behind the drops in core utilisation?

image001.png (60K) Download Attachment
image001.png (60K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: How to address seemingly low core utilization on a spark workload?

Thakrar, Jayesh

For that little data, I find spark.sql.shuffle.partitions = 30000 to be very high.

Any reason for that high value?

 

Do you have a baseline observation with the default value?

 

Also, enabling the jobgroup and job info through the API and observing through the UI will help you understand the code snippets when you have low utilization.

 

Finally, high utilization does not equate to high efficiency.

Its very likely that for your workload, you may only need 16-128 executors.

I would suggest getting the partition count for the various datasets/dataframes/rdds in your code by using

 

dataset.rdd. getNumPartitions

 

I would also suggest doing a number of tests with different number of executors too.

 

But coming back to the objective behind your quest – are you trying to maximize utilization hoping that by having high parallelism will reduce your total runtime?

 

 

From: Vitaliy Pisarev <[hidden email]>
Date: Thursday, November 15, 2018 at 10:07 AM
To: <[hidden email]>
Cc: user <[hidden email]>, David Markovitz <[hidden email]>
Subject: Re: How to address seemingly low core utilization on a spark workload?

 

I am working with parquets and the metadata reading there is quite fast as there are at most 16 files (a couple of gigs each).

 

I find it very hard to answer the question: "how many partitions do you have?", many spark operations do not preserve partitioning and I have a lot of filtering and grouping going on.

What I can say is that I specified spark.sql.shuffle.partitions to 30,000. 

 

I am not worried that there are not enough partitions to keep the cores working. Having said that I do see that the high utilisation correlates heavily with shuffle read/write. Whereas low utilisation correlates with no shuffling. 

This leads me to the conclusion that compared to the amount of shuffling, the cluster is doing very little work.

 

Question is what can I do about it.

 

On Thu, Nov 15, 2018 at 5:29 PM Thakrar, Jayesh <[hidden email]> wrote:

Can you shed more light on what kind of processing you are doing?

 

One common pattern that I have seen for active core/executor utilization dropping to zero is while reading ORC data and the driver seems (I think) to be doing schema validation.

In my case I would have hundreds of thousands of ORC data files and there is dead silence for about 1-2 hours.

I have tried providing a schema and disabling schema validation while reading the ORC data, but that does not seem to help (Spark 2.2.1).

 

And as you know, in most cases, there is a linear relationship between number of partitions in your data and the concurrently active executors.

 

Another thing I would suggest is use the following two API calls/method – they will annotate the spark stages and jobs with what is being executed in the Spark UI.

SparkContext.setJobGroup(….)

SparkContext.setJobDescription(….)

 

From: Vitaliy Pisarev <[hidden email]>
Date: Thursday, November 15, 2018 at 8:51 AM
To: user <[hidden email]>
Cc: David Markovitz <[hidden email]>
Subject: How to address seemingly low core utilization on a spark workload?

 

I have a workload that runs on a cluster of 300 cores. 

Below is a plot of the amount of active tasks over time during the execution of this workload:

 

 

What I deduce is that there are substantial intervals where the cores are heavily under-utilised. 

 

What actions can I take to:

  • Increase the efficiency (== core utilisation) of the cluster?
  • Understand the root causes behind the drops in core utilisation?
Reply | Threaded
Open this post in threaded view
|

Re: How to address seemingly low core utilization on a spark workload?

Vitaliy Pisarev
The quest is dual:

  • Increase utilisation- because cores cost money and I want to make sure that if I fully utilise what I pay for. This is very blunt of corse, because there is always i/o and at least some degree of skew. Bottom line is do the same thing over the same time but with fewer (but better utilised) resources.
  • Reduce runtime by increasing parallelism. 
While not the same, I am looking at these as two sides of the same coin. 





On Thu, Nov 15, 2018 at 6:58 PM Thakrar, Jayesh <[hidden email]> wrote:

For that little data, I find spark.sql.shuffle.partitions = 30000 to be very high.

Any reason for that high value?

 

Do you have a baseline observation with the default value?

 

Also, enabling the jobgroup and job info through the API and observing through the UI will help you understand the code snippets when you have low utilization.

 

Finally, high utilization does not equate to high efficiency.

Its very likely that for your workload, you may only need 16-128 executors.

I would suggest getting the partition count for the various datasets/dataframes/rdds in your code by using

 

dataset.rdd. getNumPartitions

 

I would also suggest doing a number of tests with different number of executors too.

 

But coming back to the objective behind your quest – are you trying to maximize utilization hoping that by having high parallelism will reduce your total runtime?

 

 

From: Vitaliy Pisarev <[hidden email]>
Date: Thursday, November 15, 2018 at 10:07 AM
To: <[hidden email]>
Cc: user <[hidden email]>, David Markovitz <[hidden email]>
Subject: Re: How to address seemingly low core utilization on a spark workload?

 

I am working with parquets and the metadata reading there is quite fast as there are at most 16 files (a couple of gigs each).

 

I find it very hard to answer the question: "how many partitions do you have?", many spark operations do not preserve partitioning and I have a lot of filtering and grouping going on.

What I can say is that I specified spark.sql.shuffle.partitions to 30,000. 

 

I am not worried that there are not enough partitions to keep the cores working. Having said that I do see that the high utilisation correlates heavily with shuffle read/write. Whereas low utilisation correlates with no shuffling. 

This leads me to the conclusion that compared to the amount of shuffling, the cluster is doing very little work.

 

Question is what can I do about it.

 

On Thu, Nov 15, 2018 at 5:29 PM Thakrar, Jayesh <[hidden email]> wrote:

Can you shed more light on what kind of processing you are doing?

 

One common pattern that I have seen for active core/executor utilization dropping to zero is while reading ORC data and the driver seems (I think) to be doing schema validation.

In my case I would have hundreds of thousands of ORC data files and there is dead silence for about 1-2 hours.

I have tried providing a schema and disabling schema validation while reading the ORC data, but that does not seem to help (Spark 2.2.1).

 

And as you know, in most cases, there is a linear relationship between number of partitions in your data and the concurrently active executors.

 

Another thing I would suggest is use the following two API calls/method – they will annotate the spark stages and jobs with what is being executed in the Spark UI.

SparkContext.setJobGroup(….)

SparkContext.setJobDescription(….)

 

From: Vitaliy Pisarev <[hidden email]>
Date: Thursday, November 15, 2018 at 8:51 AM
To: user <[hidden email]>
Cc: David Markovitz <[hidden email]>
Subject: How to address seemingly low core utilization on a spark workload?

 

I have a workload that runs on a cluster of 300 cores. 

Below is a plot of the amount of active tasks over time during the execution of this workload:

 

image.png

 

What I deduce is that there are substantial intervals where the cores are heavily under-utilised. 

 

What actions can I take to:

  • Increase the efficiency (== core utilisation) of the cluster?
  • Understand the root causes behind the drops in core utilisation?
Reply | Threaded
Open this post in threaded view
|

Re: How to address seemingly low core utilization on a spark workload?

Vitaliy Pisarev
Oh, regarding and shuffle.partitions being 30k, don't know. I inherited the workload from an engineer that is no longer around and am trying to make sense of things in general. 

On Thu, Nov 15, 2018 at 7:26 PM Vitaliy Pisarev <[hidden email]> wrote:
The quest is dual:

  • Increase utilisation- because cores cost money and I want to make sure that if I fully utilise what I pay for. This is very blunt of corse, because there is always i/o and at least some degree of skew. Bottom line is do the same thing over the same time but with fewer (but better utilised) resources.
  • Reduce runtime by increasing parallelism. 
While not the same, I am looking at these as two sides of the same coin. 





On Thu, Nov 15, 2018 at 6:58 PM Thakrar, Jayesh <[hidden email]> wrote:

For that little data, I find spark.sql.shuffle.partitions = 30000 to be very high.

Any reason for that high value?

 

Do you have a baseline observation with the default value?

 

Also, enabling the jobgroup and job info through the API and observing through the UI will help you understand the code snippets when you have low utilization.

 

Finally, high utilization does not equate to high efficiency.

Its very likely that for your workload, you may only need 16-128 executors.

I would suggest getting the partition count for the various datasets/dataframes/rdds in your code by using

 

dataset.rdd. getNumPartitions

 

I would also suggest doing a number of tests with different number of executors too.

 

But coming back to the objective behind your quest – are you trying to maximize utilization hoping that by having high parallelism will reduce your total runtime?

 

 

From: Vitaliy Pisarev <[hidden email]>
Date: Thursday, November 15, 2018 at 10:07 AM
To: <[hidden email]>
Cc: user <[hidden email]>, David Markovitz <[hidden email]>
Subject: Re: How to address seemingly low core utilization on a spark workload?

 

I am working with parquets and the metadata reading there is quite fast as there are at most 16 files (a couple of gigs each).

 

I find it very hard to answer the question: "how many partitions do you have?", many spark operations do not preserve partitioning and I have a lot of filtering and grouping going on.

What I can say is that I specified spark.sql.shuffle.partitions to 30,000. 

 

I am not worried that there are not enough partitions to keep the cores working. Having said that I do see that the high utilisation correlates heavily with shuffle read/write. Whereas low utilisation correlates with no shuffling. 

This leads me to the conclusion that compared to the amount of shuffling, the cluster is doing very little work.

 

Question is what can I do about it.

 

On Thu, Nov 15, 2018 at 5:29 PM Thakrar, Jayesh <[hidden email]> wrote:

Can you shed more light on what kind of processing you are doing?

 

One common pattern that I have seen for active core/executor utilization dropping to zero is while reading ORC data and the driver seems (I think) to be doing schema validation.

In my case I would have hundreds of thousands of ORC data files and there is dead silence for about 1-2 hours.

I have tried providing a schema and disabling schema validation while reading the ORC data, but that does not seem to help (Spark 2.2.1).

 

And as you know, in most cases, there is a linear relationship between number of partitions in your data and the concurrently active executors.

 

Another thing I would suggest is use the following two API calls/method – they will annotate the spark stages and jobs with what is being executed in the Spark UI.

SparkContext.setJobGroup(….)

SparkContext.setJobDescription(….)

 

From: Vitaliy Pisarev <[hidden email]>
Date: Thursday, November 15, 2018 at 8:51 AM
To: user <[hidden email]>
Cc: David Markovitz <[hidden email]>
Subject: How to address seemingly low core utilization on a spark workload?

 

I have a workload that runs on a cluster of 300 cores. 

Below is a plot of the amount of active tasks over time during the execution of this workload:

 

image.png

 

What I deduce is that there are substantial intervals where the cores are heavily under-utilised. 

 

What actions can I take to:

  • Increase the efficiency (== core utilisation) of the cluster?
  • Understand the root causes behind the drops in core utilisation?
Reply | Threaded
Open this post in threaded view
|

Re: How to address seemingly low core utilization on a spark workload?

Shahbaz-2
30k Sql shuffle partitions is extremely high.Core to Partition is 1 to  1 ,default value of Sql shuffle partitions is  200 ,set it to 300 or leave it to default ,see which one gives best performance,after you do that ,see how cores are being used?

Regards,
Shahbaz

On Thu, Nov 15, 2018 at 10:58 PM Vitaliy Pisarev <[hidden email]> wrote:
Oh, regarding and shuffle.partitions being 30k, don't know. I inherited the workload from an engineer that is no longer around and am trying to make sense of things in general. 

On Thu, Nov 15, 2018 at 7:26 PM Vitaliy Pisarev <[hidden email]> wrote:
The quest is dual:

  • Increase utilisation- because cores cost money and I want to make sure that if I fully utilise what I pay for. This is very blunt of corse, because there is always i/o and at least some degree of skew. Bottom line is do the same thing over the same time but with fewer (but better utilised) resources.
  • Reduce runtime by increasing parallelism. 
While not the same, I am looking at these as two sides of the same coin. 





On Thu, Nov 15, 2018 at 6:58 PM Thakrar, Jayesh <[hidden email]> wrote:

For that little data, I find spark.sql.shuffle.partitions = 30000 to be very high.

Any reason for that high value?

 

Do you have a baseline observation with the default value?

 

Also, enabling the jobgroup and job info through the API and observing through the UI will help you understand the code snippets when you have low utilization.

 

Finally, high utilization does not equate to high efficiency.

Its very likely that for your workload, you may only need 16-128 executors.

I would suggest getting the partition count for the various datasets/dataframes/rdds in your code by using

 

dataset.rdd. getNumPartitions

 

I would also suggest doing a number of tests with different number of executors too.

 

But coming back to the objective behind your quest – are you trying to maximize utilization hoping that by having high parallelism will reduce your total runtime?

 

 

From: Vitaliy Pisarev <[hidden email]>
Date: Thursday, November 15, 2018 at 10:07 AM
To: <[hidden email]>
Cc: user <[hidden email]>, David Markovitz <[hidden email]>
Subject: Re: How to address seemingly low core utilization on a spark workload?

 

I am working with parquets and the metadata reading there is quite fast as there are at most 16 files (a couple of gigs each).

 

I find it very hard to answer the question: "how many partitions do you have?", many spark operations do not preserve partitioning and I have a lot of filtering and grouping going on.

What I can say is that I specified spark.sql.shuffle.partitions to 30,000. 

 

I am not worried that there are not enough partitions to keep the cores working. Having said that I do see that the high utilisation correlates heavily with shuffle read/write. Whereas low utilisation correlates with no shuffling. 

This leads me to the conclusion that compared to the amount of shuffling, the cluster is doing very little work.

 

Question is what can I do about it.

 

On Thu, Nov 15, 2018 at 5:29 PM Thakrar, Jayesh <[hidden email]> wrote:

Can you shed more light on what kind of processing you are doing?

 

One common pattern that I have seen for active core/executor utilization dropping to zero is while reading ORC data and the driver seems (I think) to be doing schema validation.

In my case I would have hundreds of thousands of ORC data files and there is dead silence for about 1-2 hours.

I have tried providing a schema and disabling schema validation while reading the ORC data, but that does not seem to help (Spark 2.2.1).

 

And as you know, in most cases, there is a linear relationship between number of partitions in your data and the concurrently active executors.

 

Another thing I would suggest is use the following two API calls/method – they will annotate the spark stages and jobs with what is being executed in the Spark UI.

SparkContext.setJobGroup(….)

SparkContext.setJobDescription(….)

 

From: Vitaliy Pisarev <[hidden email]>
Date: Thursday, November 15, 2018 at 8:51 AM
To: user <[hidden email]>
Cc: David Markovitz <[hidden email]>
Subject: How to address seemingly low core utilization on a spark workload?

 

I have a workload that runs on a cluster of 300 cores. 

Below is a plot of the amount of active tasks over time during the execution of this workload:

 

image.png

 

What I deduce is that there are substantial intervals where the cores are heavily under-utilised. 

 

What actions can I take to:

  • Increase the efficiency (== core utilisation) of the cluster?
  • Understand the root causes behind the drops in core utilisation?
Reply | Threaded
Open this post in threaded view
|

Re: How to address seemingly low core utilization on a spark workload?

Vitaliy Pisarev
Agree, and I will try it. One clarification though: the amount of partitions also affects their in memory size. So fewer partitions may result in higher memory preassure and Ooms. I think this was the original intention.

So the motivation for partitioning is also to break down volumes yo fit the machines.

Is this premise wrong?

On Thu, Nov 15, 2018, 19:49 Shahbaz <[hidden email] wrote:
30k Sql shuffle partitions is extremely high.Core to Partition is 1 to  1 ,default value of Sql shuffle partitions is  200 ,set it to 300 or leave it to default ,see which one gives best performance,after you do that ,see how cores are being used?

Regards,
Shahbaz

On Thu, Nov 15, 2018 at 10:58 PM Vitaliy Pisarev <[hidden email]> wrote:
Oh, regarding and shuffle.partitions being 30k, don't know. I inherited the workload from an engineer that is no longer around and am trying to make sense of things in general. 

On Thu, Nov 15, 2018 at 7:26 PM Vitaliy Pisarev <[hidden email]> wrote:
The quest is dual:

  • Increase utilisation- because cores cost money and I want to make sure that if I fully utilise what I pay for. This is very blunt of corse, because there is always i/o and at least some degree of skew. Bottom line is do the same thing over the same time but with fewer (but better utilised) resources.
  • Reduce runtime by increasing parallelism. 
While not the same, I am looking at these as two sides of the same coin. 





On Thu, Nov 15, 2018 at 6:58 PM Thakrar, Jayesh <[hidden email]> wrote:

For that little data, I find spark.sql.shuffle.partitions = 30000 to be very high.

Any reason for that high value?

 

Do you have a baseline observation with the default value?

 

Also, enabling the jobgroup and job info through the API and observing through the UI will help you understand the code snippets when you have low utilization.

 

Finally, high utilization does not equate to high efficiency.

Its very likely that for your workload, you may only need 16-128 executors.

I would suggest getting the partition count for the various datasets/dataframes/rdds in your code by using

 

dataset.rdd. getNumPartitions

 

I would also suggest doing a number of tests with different number of executors too.

 

But coming back to the objective behind your quest – are you trying to maximize utilization hoping that by having high parallelism will reduce your total runtime?

 

 

From: Vitaliy Pisarev <[hidden email]>
Date: Thursday, November 15, 2018 at 10:07 AM
To: <[hidden email]>
Cc: user <[hidden email]>, David Markovitz <[hidden email]>
Subject: Re: How to address seemingly low core utilization on a spark workload?

 

I am working with parquets and the metadata reading there is quite fast as there are at most 16 files (a couple of gigs each).

 

I find it very hard to answer the question: "how many partitions do you have?", many spark operations do not preserve partitioning and I have a lot of filtering and grouping going on.

What I can say is that I specified spark.sql.shuffle.partitions to 30,000. 

 

I am not worried that there are not enough partitions to keep the cores working. Having said that I do see that the high utilisation correlates heavily with shuffle read/write. Whereas low utilisation correlates with no shuffling. 

This leads me to the conclusion that compared to the amount of shuffling, the cluster is doing very little work.

 

Question is what can I do about it.

 

On Thu, Nov 15, 2018 at 5:29 PM Thakrar, Jayesh <[hidden email]> wrote:

Can you shed more light on what kind of processing you are doing?

 

One common pattern that I have seen for active core/executor utilization dropping to zero is while reading ORC data and the driver seems (I think) to be doing schema validation.

In my case I would have hundreds of thousands of ORC data files and there is dead silence for about 1-2 hours.

I have tried providing a schema and disabling schema validation while reading the ORC data, but that does not seem to help (Spark 2.2.1).

 

And as you know, in most cases, there is a linear relationship between number of partitions in your data and the concurrently active executors.

 

Another thing I would suggest is use the following two API calls/method – they will annotate the spark stages and jobs with what is being executed in the Spark UI.

SparkContext.setJobGroup(….)

SparkContext.setJobDescription(….)

 

From: Vitaliy Pisarev <[hidden email]>
Date: Thursday, November 15, 2018 at 8:51 AM
To: user <[hidden email]>
Cc: David Markovitz <[hidden email]>
Subject: How to address seemingly low core utilization on a spark workload?

 

I have a workload that runs on a cluster of 300 cores. 

Below is a plot of the amount of active tasks over time during the execution of this workload:

 

image.png

 

What I deduce is that there are substantial intervals where the cores are heavily under-utilised. 

 

What actions can I take to:

  • Increase the efficiency (== core utilisation) of the cluster?
  • Understand the root causes behind the drops in core utilisation?
Reply | Threaded
Open this post in threaded view
|

Re: How to address seemingly low core utilization on a spark workload?

Thakrar, Jayesh

While there is some merit to that thought process, I would steer away from premature JVM GC optimization of this kind.

What are the memory, cpu and other settings (e.g. any JVM/GC settings) for the executors and driver?

So assuming that you are reading about 16 files of say 2-4 GB each, that’s about 32-64 GB of (compressed?) data in parquet files.

Do you have access to the Spark UI – what is the peak memory that you see for the executors?

The UI will also give you the time spent on GC by each executor.

So even if you completely eliminated all GC, that’s the max time you can potentially save.

 

 

From: Vitaliy Pisarev <[hidden email]>
Date: Thursday, November 15, 2018 at 1:03 PM
To: Shahbaz <[hidden email]>
Cc: "Thakrar, Jayesh" <[hidden email]>, user <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: How to address seemingly low core utilization on a spark workload?

 

Agree, and I will try it. One clarification though: the amount of partitions also affects their in memory size. So fewer partitions may result in higher memory preassure and Ooms. I think this was the original intention.

 

So the motivation for partitioning is also to break down volumes yo fit the machines.

 

Is this premise wrong?

 

On Thu, Nov 15, 2018, 19:49 Shahbaz <[hidden email] wrote:

30k Sql shuffle partitions is extremely high.Core to Partition is 1 to  1 ,default value of Sql shuffle partitions is  200 ,set it to 300 or leave it to default ,see which one gives best performance,after you do that ,see how cores are being used?

 

Regards,

Shahbaz

 

On Thu, Nov 15, 2018 at 10:58 PM Vitaliy Pisarev <[hidden email]> wrote:

Oh, regarding and shuffle.partitions being 30k, don't know. I inherited the workload from an engineer that is no longer around and am trying to make sense of things in general. 

 

On Thu, Nov 15, 2018 at 7:26 PM Vitaliy Pisarev <[hidden email]> wrote:

The quest is dual:

 

  • Increase utilisation- because cores cost money and I want to make sure that if I fully utilise what I pay for. This is very blunt of corse, because there is always i/o and at least some degree of skew. Bottom line is do the same thing over the same time but with fewer (but better utilised) resources.
  • Reduce runtime by increasing parallelism. 

While not the same, I am looking at these as two sides of the same coin. 

 

 

 

 

 

On Thu, Nov 15, 2018 at 6:58 PM Thakrar, Jayesh <[hidden email]> wrote:

For that little data, I find spark.sql.shuffle.partitions = 30000 to be very high.

Any reason for that high value?

 

Do you have a baseline observation with the default value?

 

Also, enabling the jobgroup and job info through the API and observing through the UI will help you understand the code snippets when you have low utilization.

 

Finally, high utilization does not equate to high efficiency.

Its very likely that for your workload, you may only need 16-128 executors.

I would suggest getting the partition count for the various datasets/dataframes/rdds in your code by using

 

dataset.rdd. getNumPartitions

 

I would also suggest doing a number of tests with different number of executors too.

 

But coming back to the objective behind your quest – are you trying to maximize utilization hoping that by having high parallelism will reduce your total runtime?

 

 

From: Vitaliy Pisarev <[hidden email]>
Date: Thursday, November 15, 2018 at 10:07 AM
To: <[hidden email]>
Cc: user <[hidden email]>, David Markovitz <[hidden email]>
Subject: Re: How to address seemingly low core utilization on a spark workload?

 

I am working with parquets and the metadata reading there is quite fast as there are at most 16 files (a couple of gigs each).

 

I find it very hard to answer the question: "how many partitions do you have?", many spark operations do not preserve partitioning and I have a lot of filtering and grouping going on.

What I can say is that I specified spark.sql.shuffle.partitions to 30,000. 

 

I am not worried that there are not enough partitions to keep the cores working. Having said that I do see that the high utilisation correlates heavily with shuffle read/write. Whereas low utilisation correlates with no shuffling. 

This leads me to the conclusion that compared to the amount of shuffling, the cluster is doing very little work.

 

Question is what can I do about it.

 

On Thu, Nov 15, 2018 at 5:29 PM Thakrar, Jayesh <[hidden email]> wrote:

Can you shed more light on what kind of processing you are doing?

 

One common pattern that I have seen for active core/executor utilization dropping to zero is while reading ORC data and the driver seems (I think) to be doing schema validation.

In my case I would have hundreds of thousands of ORC data files and there is dead silence for about 1-2 hours.

I have tried providing a schema and disabling schema validation while reading the ORC data, but that does not seem to help (Spark 2.2.1).

 

And as you know, in most cases, there is a linear relationship between number of partitions in your data and the concurrently active executors.

 

Another thing I would suggest is use the following two API calls/method – they will annotate the spark stages and jobs with what is being executed in the Spark UI.

SparkContext.setJobGroup(….)

SparkContext.setJobDescription(….)

 

From: Vitaliy Pisarev <[hidden email]>
Date: Thursday, November 15, 2018 at 8:51 AM
To: user <[hidden email]>
Cc: David Markovitz <[hidden email]>
Subject: How to address seemingly low core utilization on a spark workload?

 

I have a workload that runs on a cluster of 300 cores. 

Below is a plot of the amount of active tasks over time during the execution of this workload:

 

Error! Filename not specified.

 

What I deduce is that there are substantial intervals where the cores are heavily under-utilised. 

 

What actions can I take to:

  • Increase the efficiency (== core utilisation) of the cluster?
  • Understand the root causes behind the drops in core utilisation?
Reply | Threaded
Open this post in threaded view
|

Re: How to address seemingly low core utilization on a spark workload?

Vitaliy Pisarev
Small update, my initial estimate was incorrect. I have one location with 16*4G = 64G parquests (in snappy) + 20 * 5G = 100G parquets. So a total of 164G.

I am running on Databricks.
Here are some settings:

spark.executor.extraJavaOptions=-XX:ReservedCodeCacheSize=256m -XX:+UseCodeCacheFlushing -Ddatabricks.serviceName=spark-executor-1 -javaagent:/databricks/DatabricksAgent.jar -XX:+PrintFlagsFinal -XX:+PrintGCDateStamps -verbose:gc -XX:+PrintGCDetails -Xss4m -Djavax.xml.datatype.DatatypeFactory=com.sun.org.apache.xerces.internal.jaxp.datatype.DatatypeFactoryImpl -Djavax.xml.parsers.DocumentBuilderFactory=com.sun.org.apache.xerces.internal.jaxp.DocumentBuilderFactoryImpl -Djavax.xml.parsers.SAXParserFactory=com.sun.org.apache.xerces.internal.jaxp.SAXParserFactoryImpl -Djavax.xml.validation.SchemaFactory:http://www.w3.org/2001/XMLSchema=com.sun.org.apache.xerces.internal.jaxp.validation.XMLSchemaFactory -Dorg.xml.sax.driver=com.sun.org.apache.xerces.internal.parsers.SAXParser -Dorg.w3c.dom.DOMImplementationSourceList=com.sun.org.apache.xerces.internal.dom.DOMXSImplementationSourceImpl
spark.executor.memory=107407m
spark.executor.tempDirectory=/local_disk0/tmp

These are the only relevant setting that I see set when looking at the logs. I am guessing this means that the others are simply set to default. 
Are there any setting I should pay special attention to? (reference is also good).

My assumption is the the Databricks runtime is already preconfigured with known best practices (like corse per executor...). Now that I think of it I need to validate this assumption. 

On Thu, Nov 15, 2018 at 9:14 PM Thakrar, Jayesh <[hidden email]> wrote:

While there is some merit to that thought process, I would steer away from premature JVM GC optimization of this kind.

What are the memory, cpu and other settings (e.g. any JVM/GC settings) for the executors and driver?

So assuming that you are reading about 16 files of say 2-4 GB each, that’s about 32-64 GB of (compressed?) data in parquet files.

Do you have access to the Spark UI – what is the peak memory that you see for the executors?

The UI will also give you the time spent on GC by each executor.

So even if you completely eliminated all GC, that’s the max time you can potentially save.

 

 

From: Vitaliy Pisarev <[hidden email]>
Date: Thursday, November 15, 2018 at 1:03 PM
To: Shahbaz <[hidden email]>
Cc: "Thakrar, Jayesh" <[hidden email]>, user <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: How to address seemingly low core utilization on a spark workload?

 

Agree, and I will try it. One clarification though: the amount of partitions also affects their in memory size. So fewer partitions may result in higher memory preassure and Ooms. I think this was the original intention.

 

So the motivation for partitioning is also to break down volumes yo fit the machines.

 

Is this premise wrong?

 

On Thu, Nov 15, 2018, 19:49 Shahbaz <[hidden email] wrote:

30k Sql shuffle partitions is extremely high.Core to Partition is 1 to  1 ,default value of Sql shuffle partitions is  200 ,set it to 300 or leave it to default ,see which one gives best performance,after you do that ,see how cores are being used?

 

Regards,

Shahbaz

 

On Thu, Nov 15, 2018 at 10:58 PM Vitaliy Pisarev <[hidden email]> wrote:

Oh, regarding and shuffle.partitions being 30k, don't know. I inherited the workload from an engineer that is no longer around and am trying to make sense of things in general. 

 

On Thu, Nov 15, 2018 at 7:26 PM Vitaliy Pisarev <[hidden email]> wrote:

The quest is dual:

 

  • Increase utilisation- because cores cost money and I want to make sure that if I fully utilise what I pay for. This is very blunt of corse, because there is always i/o and at least some degree of skew. Bottom line is do the same thing over the same time but with fewer (but better utilised) resources.
  • Reduce runtime by increasing parallelism. 

While not the same, I am looking at these as two sides of the same coin. 

 

 

 

 

 

On Thu, Nov 15, 2018 at 6:58 PM Thakrar, Jayesh <[hidden email]> wrote:

For that little data, I find spark.sql.shuffle.partitions = 30000 to be very high.

Any reason for that high value?

 

Do you have a baseline observation with the default value?

 

Also, enabling the jobgroup and job info through the API and observing through the UI will help you understand the code snippets when you have low utilization.

 

Finally, high utilization does not equate to high efficiency.

Its very likely that for your workload, you may only need 16-128 executors.

I would suggest getting the partition count for the various datasets/dataframes/rdds in your code by using

 

dataset.rdd. getNumPartitions

 

I would also suggest doing a number of tests with different number of executors too.

 

But coming back to the objective behind your quest – are you trying to maximize utilization hoping that by having high parallelism will reduce your total runtime?

 

 

From: Vitaliy Pisarev <[hidden email]>
Date: Thursday, November 15, 2018 at 10:07 AM
To: <[hidden email]>
Cc: user <[hidden email]>, David Markovitz <[hidden email]>
Subject: Re: How to address seemingly low core utilization on a spark workload?

 

I am working with parquets and the metadata reading there is quite fast as there are at most 16 files (a couple of gigs each).

 

I find it very hard to answer the question: "how many partitions do you have?", many spark operations do not preserve partitioning and I have a lot of filtering and grouping going on.

What I can say is that I specified spark.sql.shuffle.partitions to 30,000. 

 

I am not worried that there are not enough partitions to keep the cores working. Having said that I do see that the high utilisation correlates heavily with shuffle read/write. Whereas low utilisation correlates with no shuffling. 

This leads me to the conclusion that compared to the amount of shuffling, the cluster is doing very little work.

 

Question is what can I do about it.

 

On Thu, Nov 15, 2018 at 5:29 PM Thakrar, Jayesh <[hidden email]> wrote:

Can you shed more light on what kind of processing you are doing?

 

One common pattern that I have seen for active core/executor utilization dropping to zero is while reading ORC data and the driver seems (I think) to be doing schema validation.

In my case I would have hundreds of thousands of ORC data files and there is dead silence for about 1-2 hours.

I have tried providing a schema and disabling schema validation while reading the ORC data, but that does not seem to help (Spark 2.2.1).

 

And as you know, in most cases, there is a linear relationship between number of partitions in your data and the concurrently active executors.

 

Another thing I would suggest is use the following two API calls/method – they will annotate the spark stages and jobs with what is being executed in the Spark UI.

SparkContext.setJobGroup(….)

SparkContext.setJobDescription(….)

 

From: Vitaliy Pisarev <[hidden email]>
Date: Thursday, November 15, 2018 at 8:51 AM
To: user <[hidden email]>
Cc: David Markovitz <[hidden email]>
Subject: How to address seemingly low core utilization on a spark workload?

 

I have a workload that runs on a cluster of 300 cores. 

Below is a plot of the amount of active tasks over time during the execution of this workload:

 

Error! Filename not specified.

 

What I deduce is that there are substantial intervals where the cores are heavily under-utilised. 

 

What actions can I take to:

  • Increase the efficiency (== core utilisation) of the cluster?
  • Understand the root causes behind the drops in core utilisation?
Reply | Threaded
Open this post in threaded view
|

Re: How to address seemingly low core utilization on a spark workload?

Thakrar, Jayesh

So 164 GB of parquet data –can potentially explode to upto 1000 GB data if the data is compressed (in practice it would be more like 400-600 GB)

Your executors have about 96 GB data.

With that kind of volume, 100-300 executors is ok (I would do tests with 100-300), but 30k shuffle partitions are still excessive.

 

From: Vitaliy Pisarev <[hidden email]>
Date: Thursday, November 15, 2018 at 1:58 PM
To: "Thakrar, Jayesh" <[hidden email]>
Cc: Shahbaz <[hidden email]>, user <[hidden email]>, David Markovitz <[hidden email]>
Subject: Re: How to address seemingly low core utilization on a spark workload?

 

Small update, my initial estimate was incorrect. I have one location with 16*4G = 64G parquests (in snappy) + 20 * 5G = 100G parquets. So a total of 164G.

 

I am running on Databricks.

Here are some settings:

 

spark.executor.extraJavaOptions=-XX:ReservedCodeCacheSize=256m -XX:+UseCodeCacheFlushing -Ddatabricks.serviceName=spark-executor-1 -javaagent:/databricks/DatabricksAgent.jar -XX:+PrintFlagsFinal -XX:+PrintGCDateStamps -verbose:gc -XX:+PrintGCDetails -Xss4m -Djavax.xml.datatype.DatatypeFactory=com.sun.org.apache.xerces.internal.jaxp.datatype.DatatypeFactoryImpl -Djavax.xml.parsers.DocumentBuilderFactory=com.sun.org.apache.xerces.internal.jaxp.DocumentBuilderFactoryImpl -Djavax.xml.parsers.SAXParserFactory=com.sun.org.apache.xerces.internal.jaxp.SAXParserFactoryImpl -Djavax.xml.validation.SchemaFactory:http://www.w3.org/2001/XMLSchema=com.sun.org.apache.xerces.internal.jaxp.validation.XMLSchemaFactory -Dorg.xml.sax.driver=com.sun.org.apache.xerces.internal.parsers.SAXParser -Dorg.w3c.dom.DOMImplementationSourceList=com.sun.org.apache.xerces.internal.dom.DOMXSImplementationSourceImpl

spark.executor.memory=107407m

spark.executor.tempDirectory=/local_disk0/tmp

 

These are the only relevant setting that I see set when looking at the logs. I am guessing this means that the others are simply set to default. 

Are there any setting I should pay special attention to? (reference is also good).

 

My assumption is the the Databricks runtime is already preconfigured with known best practices (like corse per executor...). Now that I think of it I need to validate this assumption. 

 

On Thu, Nov 15, 2018 at 9:14 PM Thakrar, Jayesh <[hidden email]> wrote:

While there is some merit to that thought process, I would steer away from premature JVM GC optimization of this kind.

What are the memory, cpu and other settings (e.g. any JVM/GC settings) for the executors and driver?

So assuming that you are reading about 16 files of say 2-4 GB each, that’s about 32-64 GB of (compressed?) data in parquet files.

Do you have access to the Spark UI – what is the peak memory that you see for the executors?

The UI will also give you the time spent on GC by each executor.

So even if you completely eliminated all GC, that’s the max time you can potentially save.

 

 

From: Vitaliy Pisarev <[hidden email]>
Date: Thursday, November 15, 2018 at 1:03 PM
To: Shahbaz <[hidden email]>
Cc: "Thakrar, Jayesh" <[hidden email]>, user <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: How to address seemingly low core utilization on a spark workload?

 

Agree, and I will try it. One clarification though: the amount of partitions also affects their in memory size. So fewer partitions may result in higher memory preassure and Ooms. I think this was the original intention.

 

So the motivation for partitioning is also to break down volumes yo fit the machines.

 

Is this premise wrong?

 

On Thu, Nov 15, 2018, 19:49 Shahbaz <[hidden email] wrote:

30k Sql shuffle partitions is extremely high.Core to Partition is 1 to  1 ,default value of Sql shuffle partitions is  200 ,set it to 300 or leave it to default ,see which one gives best performance,after you do that ,see how cores are being used?

 

Regards,

Shahbaz

 

On Thu, Nov 15, 2018 at 10:58 PM Vitaliy Pisarev <[hidden email]> wrote:

Oh, regarding and shuffle.partitions being 30k, don't know. I inherited the workload from an engineer that is no longer around and am trying to make sense of things in general. 

 

On Thu, Nov 15, 2018 at 7:26 PM Vitaliy Pisarev <[hidden email]> wrote:

The quest is dual:

 

  • Increase utilisation- because cores cost money and I want to make sure that if I fully utilise what I pay for. This is very blunt of corse, because there is always i/o and at least some degree of skew. Bottom line is do the same thing over the same time but with fewer (but better utilised) resources.
  • Reduce runtime by increasing parallelism. 

While not the same, I am looking at these as two sides of the same coin. 

 

 

 

 

 

On Thu, Nov 15, 2018 at 6:58 PM Thakrar, Jayesh <[hidden email]> wrote:

For that little data, I find spark.sql.shuffle.partitions = 30000 to be very high.

Any reason for that high value?

 

Do you have a baseline observation with the default value?

 

Also, enabling the jobgroup and job info through the API and observing through the UI will help you understand the code snippets when you have low utilization.

 

Finally, high utilization does not equate to high efficiency.

Its very likely that for your workload, you may only need 16-128 executors.

I would suggest getting the partition count for the various datasets/dataframes/rdds in your code by using

 

dataset.rdd. getNumPartitions

 

I would also suggest doing a number of tests with different number of executors too.

 

But coming back to the objective behind your quest – are you trying to maximize utilization hoping that by having high parallelism will reduce your total runtime?

 

 

From: Vitaliy Pisarev <[hidden email]>
Date: Thursday, November 15, 2018 at 10:07 AM
To: <[hidden email]>
Cc: user <[hidden email]>, David Markovitz <[hidden email]>
Subject: Re: How to address seemingly low core utilization on a spark workload?

 

I am working with parquets and the metadata reading there is quite fast as there are at most 16 files (a couple of gigs each).

 

I find it very hard to answer the question: "how many partitions do you have?", many spark operations do not preserve partitioning and I have a lot of filtering and grouping going on.

What I can say is that I specified spark.sql.shuffle.partitions to 30,000. 

 

I am not worried that there are not enough partitions to keep the cores working. Having said that I do see that the high utilisation correlates heavily with shuffle read/write. Whereas low utilisation correlates with no shuffling. 

This leads me to the conclusion that compared to the amount of shuffling, the cluster is doing very little work.

 

Question is what can I do about it.

 

On Thu, Nov 15, 2018 at 5:29 PM Thakrar, Jayesh <[hidden email]> wrote:

Can you shed more light on what kind of processing you are doing?

 

One common pattern that I have seen for active core/executor utilization dropping to zero is while reading ORC data and the driver seems (I think) to be doing schema validation.

In my case I would have hundreds of thousands of ORC data files and there is dead silence for about 1-2 hours.

I have tried providing a schema and disabling schema validation while reading the ORC data, but that does not seem to help (Spark 2.2.1).

 

And as you know, in most cases, there is a linear relationship between number of partitions in your data and the concurrently active executors.

 

Another thing I would suggest is use the following two API calls/method – they will annotate the spark stages and jobs with what is being executed in the Spark UI.

SparkContext.setJobGroup(….)

SparkContext.setJobDescription(….)

 

From: Vitaliy Pisarev <[hidden email]>
Date: Thursday, November 15, 2018 at 8:51 AM
To: user <[hidden email]>
Cc: David Markovitz <[hidden email]>
Subject: How to address seemingly low core utilization on a spark workload?

 

I have a workload that runs on a cluster of 300 cores. 

Below is a plot of the amount of active tasks over time during the execution of this workload:

 

Error! Filename not specified.

 

What I deduce is that there are substantial intervals where the cores are heavily under-utilised. 

 

What actions can I take to:

  • Increase the efficiency (== core utilisation) of the cluster?
  • Understand the root causes behind the drops in core utilisation?