Dynamic Allocation Backlog Property in Spark on Kubernetes

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Dynamic Allocation Backlog Property in Spark on Kubernetes

Ranju Jain

Hi All,

 

I have set dynamic allocation enabled while running spark on Kubernetes . But new executors are requested if pending tasks are backlogged for more than configured duration in property “spark.dynamicAllocation.schedulerBacklogTimeout”.

 

My Use Case is:

 

There are number of parallel jobs which might or might not run together at a particular point of time. E.g Only One Spark Job may run at a point of time or two spark jobs may run at a single point of time depending upon the need.

I configured spark.dynamicAllocation.minExecutors as 3 and spark.dynamicAllocation.maxExecutors as 8 .

 

Steps:

  1. SparkContext initialized with 3 executors and First Job requested.
  2. Now, if second job requested after few mins  (e.g 15 mins) , I am thinking if I can use the benefit of dynamic allocation and executor should scale up to handle second job tasks.

For this I think “spark.dynamicAllocation.schedulerBacklogTimeout” needs to set after which new executors would be requested.

Problem: Problem is there are chances that second job is not requested at all or may be requested after 10 mins or after 20 mins. How can I set a constant value for

property “spark.dynamicAllocation.schedulerBacklogTimeout” to scale the executors , when tasks backlog is dependent upon the number of jobs requested.

 

Regards

Ranju

Reply | Threaded
Open this post in threaded view
|

Re: Dynamic Allocation Backlog Property in Spark on Kubernetes

Attila Zsolt Piros
Hi!

For dynamic allocation you do not need to run the Spark jobs in parallel.
Dynamic allocation simply means Spark scales up by requesting more executors when there are pending tasks (which kinda related to the available partitions) and scales down when the executor is idle (as within one job the number of partitions can fluctuate).  

But if you optimize for run time you can start those jobs in parallel at the beginning. 
In this case you will use higher number of executors even from the beginning.

The "spark.dynamicAllocation.schedulerBacklogTimeout" is not for to schedule/synchronize different Spark jobs but it is about tasks. 

Best regards,
Attila 

On Tue, Apr 6, 2021 at 1:59 PM Ranju Jain <[hidden email]> wrote:

Hi All,

 

I have set dynamic allocation enabled while running spark on Kubernetes . But new executors are requested if pending tasks are backlogged for more than configured duration in property “spark.dynamicAllocation.schedulerBacklogTimeout”.

 

My Use Case is:

 

There are number of parallel jobs which might or might not run together at a particular point of time. E.g Only One Spark Job may run at a point of time or two spark jobs may run at a single point of time depending upon the need.

I configured spark.dynamicAllocation.minExecutors as 3 and spark.dynamicAllocation.maxExecutors as 8 .

 

Steps:

  1. SparkContext initialized with 3 executors and First Job requested.
  2. Now, if second job requested after few mins  (e.g 15 mins) , I am thinking if I can use the benefit of dynamic allocation and executor should scale up to handle second job tasks.

For this I think “spark.dynamicAllocation.schedulerBacklogTimeout” needs to set after which new executors would be requested.

Problem: Problem is there are chances that second job is not requested at all or may be requested after 10 mins or after 20 mins. How can I set a constant value for

property “spark.dynamicAllocation.schedulerBacklogTimeout” to scale the executors , when tasks backlog is dependent upon the number of jobs requested.

 

Regards

Ranju

Reply | Threaded
Open this post in threaded view
|

RE: Dynamic Allocation Backlog Property in Spark on Kubernetes

Ranju Jain

Hi Attila,

 

Thanks for your reply.

 

If I talk about single job which started to run with minExecutors as 3. And Suppose this job [which reads the full data from backend and process and writes it to a location]

takes around 2 hour to complete.

 

What I understood is, as the default value of spark.dynamicAllocation.schedulerBacklogTimeout is 1 sec, so executors will scale from 3 to 4 and then 8 after every second if tasks are pending at scheduler backend. So If I don’t want  it 1 sec and I might set it to 1 hour [3600 sec] in 2 hour of spark job.

 

So this is all about when I want to scale executors dynamically for spark job. Is that understanding correct?

 

In the below statement I don’t understand much about available partitions :-(

pending tasks (which kinda related to the available partitions)

 

 

Regards

Ranju

 

 

From: Attila Zsolt Piros <[hidden email]>
Sent: Friday, April 9, 2021 12:13 AM
To: Ranju Jain <[hidden email]>
Cc: [hidden email]
Subject: Re: Dynamic Allocation Backlog Property in Spark on Kubernetes

 

Hi!

For dynamic allocation you do not need to run the Spark jobs in parallel.
Dynamic allocation simply means Spark scales up by requesting more executors when there are pending tasks (which kinda related to the available partitions) and scales down when the executor is idle (as within one job the number of partitions can fluctuate).  

But if you optimize for run time you can start those jobs in parallel at the beginning. 

In this case you will use higher number of executors even from the beginning.

The "spark.dynamicAllocation.schedulerBacklogTimeout" is not for to schedule/synchronize different Spark jobs but it is about tasks. 

Best regards,
Attila 

 

On Tue, Apr 6, 2021 at 1:59 PM Ranju Jain <[hidden email]> wrote:

Hi All,

 

I have set dynamic allocation enabled while running spark on Kubernetes . But new executors are requested if pending tasks are backlogged for more than configured duration in property “spark.dynamicAllocation.schedulerBacklogTimeout”.

 

My Use Case is:

 

There are number of parallel jobs which might or might not run together at a particular point of time. E.g Only One Spark Job may run at a point of time or two spark jobs may run at a single point of time depending upon the need.

I configured spark.dynamicAllocation.minExecutors as 3 and spark.dynamicAllocation.maxExecutors as 8 .

 

Steps:

  1. SparkContext initialized with 3 executors and First Job requested.
  2. Now, if second job requested after few mins  (e.g 15 mins) , I am thinking if I can use the benefit of dynamic allocation and executor should scale up to handle second job tasks.

For this I think “spark.dynamicAllocation.schedulerBacklogTimeout” needs to set after which new executors would be requested.

Problem: Problem is there are chances that second job is not requested at all or may be requested after 10 mins or after 20 mins. How can I set a constant value for

property “spark.dynamicAllocation.schedulerBacklogTimeout” to scale the executors , when tasks backlog is dependent upon the number of jobs requested.

 

Regards

Ranju

Reply | Threaded
Open this post in threaded view
|

Re: Dynamic Allocation Backlog Property in Spark on Kubernetes

Attila Zsolt Piros
You should not set "spark.dynamicAllocation.schedulerBacklogTimeout" so high and the purpose of this config is very different form the one you would like to use it for.

The confusion I guess comes from the fact that you are still thinking in multiple Spark jobs.

But Dynamic Allocation is useful in case of a single Spark job, too.
With Dynamic allocation if there are pending tasks then new resources should be allocated to speed up the calculation.
If you do not have enough partitions then you do not have enough tasks to run in parallel that was my earlier comment about.

So let's focus on your first job:
- With 3 executors it takes 2 hours to complete, right?
- And what about 8 executors?  I hope significantly less time. 

So if you have more than 3 partitions and the tasks are meaningfully long enough to request some extra resources (schedulerBacklogTimeout) and the number of running executors are lower than the maximum number of executors you set (maxExecutors) then why wouldn't you want to use those extra resources?

 

On Fri, Apr 9, 2021 at 6:03 AM Ranju Jain <[hidden email]> wrote:

Hi Attila,

 

Thanks for your reply.

 

If I talk about single job which started to run with minExecutors as 3. And Suppose this job [which reads the full data from backend and process and writes it to a location]

takes around 2 hour to complete.

 

What I understood is, as the default value of spark.dynamicAllocation.schedulerBacklogTimeout is 1 sec, so executors will scale from 3 to 4 and then 8 after every second if tasks are pending at scheduler backend. So If I don’t want  it 1 sec and I might set it to 1 hour [3600 sec] in 2 hour of spark job.

 

So this is all about when I want to scale executors dynamically for spark job. Is that understanding correct?

 

In the below statement I don’t understand much about available partitions :-(

pending tasks (which kinda related to the available partitions)

 

 

Regards

Ranju

 

 

From: Attila Zsolt Piros <[hidden email]>
Sent: Friday, April 9, 2021 12:13 AM
To: Ranju Jain <[hidden email]>
Cc: [hidden email]
Subject: Re: Dynamic Allocation Backlog Property in Spark on Kubernetes

 

Hi!

For dynamic allocation you do not need to run the Spark jobs in parallel.
Dynamic allocation simply means Spark scales up by requesting more executors when there are pending tasks (which kinda related to the available partitions) and scales down when the executor is idle (as within one job the number of partitions can fluctuate).  

But if you optimize for run time you can start those jobs in parallel at the beginning. 

In this case you will use higher number of executors even from the beginning.

The "spark.dynamicAllocation.schedulerBacklogTimeout" is not for to schedule/synchronize different Spark jobs but it is about tasks. 

Best regards,
Attila 

 

On Tue, Apr 6, 2021 at 1:59 PM Ranju Jain <[hidden email]> wrote:

Hi All,

 

I have set dynamic allocation enabled while running spark on Kubernetes . But new executors are requested if pending tasks are backlogged for more than configured duration in property “spark.dynamicAllocation.schedulerBacklogTimeout”.

 

My Use Case is:

 

There are number of parallel jobs which might or might not run together at a particular point of time. E.g Only One Spark Job may run at a point of time or two spark jobs may run at a single point of time depending upon the need.

I configured spark.dynamicAllocation.minExecutors as 3 and spark.dynamicAllocation.maxExecutors as 8 .

 

Steps:

  1. SparkContext initialized with 3 executors and First Job requested.
  2. Now, if second job requested after few mins  (e.g 15 mins) , I am thinking if I can use the benefit of dynamic allocation and executor should scale up to handle second job tasks.

For this I think “spark.dynamicAllocation.schedulerBacklogTimeout” needs to set after which new executors would be requested.

Problem: Problem is there are chances that second job is not requested at all or may be requested after 10 mins or after 20 mins. How can I set a constant value for

property “spark.dynamicAllocation.schedulerBacklogTimeout” to scale the executors , when tasks backlog is dependent upon the number of jobs requested.

 

Regards

Ranju

Reply | Threaded
Open this post in threaded view
|

Re: Dynamic Allocation Backlog Property in Spark on Kubernetes

ranju goel
In reply to this post by Ranju Jain

Hi Attila,


I understood what you mean that Use the extra resources if available for running spark job, using schedulerbacklogtimeout (dynamic allocation).

This will speeds up the job. But if there are no extra resources available, then go for static allocation rather dynamic. Is it correct ?


Please validate below few scenarios for effective use of dynamic allocation


1.  Below screenshot shows, the Tasks are tiny, each task is executing fast, but number of total tasks count is high (3241).

Dynamic Allocation Advantage for this scenario

If reserved spark quota has more resources available when min Executors running, setting schedulerbacklogtimeout to few secs [say 15 min], those available quota resources can be used and  (3241) number of tasks can be finished fast. Is this understanding correct?

image.png



2. Below report has less total number of tasks count (192) and parallel running task count (24), but each task took around 7 min to complete.

So here again, if resources are available in quota, more parallelism can be achieved using schedulerbacklogtimeout (say 15 mins) and speeds up the job.


image.png

Best Regards



 

From: Attila Zsolt Piros <[hidden email]>
Sent: Friday, April 9, 2021 11:11 AM
To: Ranju Jain <[hidden email]>
Cc: [hidden email]
Subject: Re: Dynamic Allocation Backlog Property in Spark on Kubernetes

 

You should not set "spark.dynamicAllocation.schedulerBacklogTimeout" so high and the purpose of this config is very different form the one you would like to use it for.


The confusion I guess comes from the fact that you are still thinking in multiple Spark jobs.

But Dynamic Allocation is useful in case of a single Spark job, too.
With Dynamic allocation if there are pending tasks then new resources should be allocated to speed up the calculation.
If you do not have enough partitions then you do not have enough tasks to run in parallel that was my earlier comment about.

So let's focus on your first job:
- With 3 executors it takes 2 hours to complete, right?
- And what about 8 executors?  I hope significantly less time. 

So if you have more than 3 partitions and the tasks are meaningfully long enough to request some extra resources (schedulerBacklogTimeout) and the number of running executors are lower than the maximum number of executors you set (maxExecutors) then why wouldn't you want to use those extra resources?


 

 

On Fri, Apr 9, 2021 at 6:03 AM Ranju Jain <[hidden email]> wrote:

Hi Attila,

 

Thanks for your reply.

 

If I talk about single job which started to run with minExecutors as 3. And Suppose this job [which reads the full data from backend and process and writes it to a location]

takes around 2 hour to complete.

 

What I understood is, as the default value of spark.dynamicAllocation.schedulerBacklogTimeout is 1 sec, so executors will scale from 3 to 4 and then 8 after every second if tasks are pending at scheduler backend. So If I don’t want  it 1 sec and I might set it to 1 hour [3600 sec] in 2 hour of spark job.

 

So this is all about when I want to scale executors dynamically for spark job. Is that understanding correct?

 

In the below statement I don’t understand much about available partitions :-(

pending tasks (which kinda related to the available partitions)

 

 

Regards

Ranju

 

 

From: Attila Zsolt Piros <[hidden email]>
Sent: Friday, April 9, 2021 12:13 AM
To: Ranju Jain <[hidden email]>
Cc: [hidden email]
Subject: Re: Dynamic Allocation Backlog Property in Spark on Kubernetes

 

Hi!

For dynamic allocation you do not need to run the Spark jobs in parallel.
Dynamic allocation simply means Spark scales up by requesting more executors when there are pending tasks (which kinda related to the available partitions) and scales down when the executor is idle (as within one job the number of partitions can fluctuate).  

But if you optimize for run time you can start those jobs in parallel at the beginning. 

In this case you will use higher number of executors even from the beginning.

The "spark.dynamicAllocation.schedulerBacklogTimeout" is not for to schedule/synchronize different Spark jobs but it is about tasks. 

Best regards,
Attila 

 

On Tue, Apr 6, 2021 at 1:59 PM Ranju Jain <[hidden email]> wrote:

Hi All,

 

I have set dynamic allocation enabled while running spark on Kubernetes . But new executors are requested if pending tasks are backlogged for more than configured duration in property “spark.dynamicAllocation.schedulerBacklogTimeout”.

 

My Use Case is:

 

There are number of parallel jobs which might or might not run together at a particular point of time. E.g Only One Spark Job may run at a point of time or two spark jobs may run at a single point of time depending upon the need.

I configured spark.dynamicAllocation.minExecutors as 3 and spark.dynamicAllocation.maxExecutors as 8 .

 

Steps:

  1. SparkContext initialized with 3 executors and First Job requested.
  2. Now, if second job requested after few mins  (e.g 15 mins) , I am thinking if I can use the benefit of dynamic allocation and executor should scale up to handle second job tasks.

For this I think “spark.dynamicAllocation.schedulerBacklogTimeout” needs to set after which new executors would be requested.

Problem: Problem is there are chances that second job is not requested at all or may be requested after 10 mins or after 20 mins. How can I set a constant value for

property “spark.dynamicAllocation.schedulerBacklogTimeout” to scale the executors , when tasks backlog is dependent upon the number of jobs requested.

 

Regards

Ranju













You should not set "spark.dynamicAllocation.schedulerBacklogTimeout" so high and the purpose of this config is very different form the one you would like to use it for.


The confusion I guess comes from the fact that you are still thinking in multiple Spark jobs.

But Dynamic Allocation is useful in case of a single Spark job, too.
With Dynamic allocation if there are pending tasks then new resources should be allocated to speed up the calculation.
If you do not have enough partitions then you do not have enough tasks to run in parallel that was my earlier comment about.

So let's focus on your first job:
- With 3 executors it takes 2 hours to complete, right?
- And what about 8 executors?  I hope significantly less time. 

So if you have more than 3 partitions and the tasks are meaningfully long enough to request some extra resources (schedulerBacklogTimeout) and the number of running executors are lower than the maximum number of executors you set (maxExecutors) then why wouldn't you want to use those extra resources?


 

 

On Fri, Apr 9, 2021 at 6:03 AM Ranju Jain <[hidden email]> wrote:

Hi Attila,

 

Thanks for your reply.

 

If I talk about single job which started to run with minExecutors as 3. And Suppose this job [which reads the full data from backend and process and writes it to a location]

takes around 2 hour to complete.

 

What I understood is, as the default value of spark.dynamicAllocation.schedulerBacklogTimeout is 1 sec, so executors will scale from 3 to 4 and then 8 after every second if tasks are pending at scheduler backend. So If I don’t want  it 1 sec and I might set it to 1 hour [3600 sec] in 2 hour of spark job.

 

So this is all about when I want to scale executors dynamically for spark job. Is that understanding correct?

 

In the below statement I don’t understand much about available partitions :-(

pending tasks (which kinda related to the available partitions)

 

 

Regards

Ranju

 

 

From: Attila Zsolt Piros <[hidden email]>
Sent: Friday, April 9, 2021 12:13 AM
To: Ranju Jain <[hidden email]>
Cc: [hidden email]
Subject: Re: Dynamic Allocation Backlog Property in Spark on Kubernetes

 

Hi!

For dynamic allocation you do not need to run the Spark jobs in parallel.
Dynamic allocation simply means Spark scales up by requesting more executors when there are pending tasks (which kinda related to the available partitions) and scales down when the executor is idle (as within one job the number of partitions can fluctuate).  

But if you optimize for run time you can start those jobs in parallel at the beginning. 

In this case you will use higher number of executors even from the beginning.

The "spark.dynamicAllocation.schedulerBacklogTimeout" is not for to schedule/synchronize different Spark jobs but it is about tasks. 

Best regards,
Attila 

 

On Tue, Apr 6, 2021 at 1:59 PM Ranju Jain <[hidden email]> wrote:

Hi All,

 

I have set dynamic allocation enabled while running spark on Kubernetes . But new executors are requested if pending tasks are backlogged for more than configured duration in property “spark.dynamicAllocation.schedulerBacklogTimeout”.

 

My Use Case is:

 

There are number of parallel jobs which might or might not run together at a particular point of time. E.g Only One Spark Job may run at a point of time or two spark jobs may run at a single point of time depending upon the need.

I configured spark.dynamicAllocation.minExecutors as 3 and spark.dynamicAllocation.maxExecutors as 8 .

 

Steps:

  1. SparkContext initialized with 3 executors and First Job requested.
  2. Now, if second job requested after few mins  (e.g 15 mins) , I am thinking if I can use the benefit of dynamic allocation and executor should scale up to handle second job tasks.

For this I think “spark.dynamicAllocation.schedulerBacklogTimeout” needs to set after which new executors would be requested.

Problem: Problem is there are chances that second job is not requested at all or may be requested after 10 mins or after 20 mins. How can I set a constant value for

property “spark.dynamicAllocation.schedulerBacklogTimeout” to scale the executors , when tasks backlog is dependent upon the number of jobs requested.

 

Regards

Ranju

Reply | Threaded
Open this post in threaded view
|

Re: Dynamic Allocation Backlog Property in Spark on Kubernetes

Attila Zsolt Piros
Hi Ranju!

> But if there are no extra resources available, then go for static allocation rather dynamic. Is it correct ?

I think there is no such rule. If there is no more available new resource for Spark then the existing ones will be used (even the min executors is not guaranteed to be reached if no available resources).

But I suggest to always set the max executors to a meaningful value (the default is too high: int max). 
This way you can avoid too high costs for a small/medium sized job where the tasks number is high but their size are small.

Regarding your questions: in both cases as I see extra resources are helping and the jobs will be finished faster.

Best Regards,
Attila
  

On Sat, Apr 10, 2021 at 7:01 PM ranju goel <[hidden email]> wrote:

Hi Attila,


I understood what you mean that Use the extra resources if available for running spark job, using schedulerbacklogtimeout (dynamic allocation).

This will speeds up the job. But if there are no extra resources available, then go for static allocation rather dynamic. Is it correct ?


Please validate below few scenarios for effective use of dynamic allocation


1.  Below screenshot shows, the Tasks are tiny, each task is executing fast, but number of total tasks count is high (3241).

Dynamic Allocation Advantage for this scenario

If reserved spark quota has more resources available when min Executors running, setting schedulerbacklogtimeout to few secs [say 15 min], those available quota resources can be used and  (3241) number of tasks can be finished fast. Is this understanding correct?

image.png



2. Below report has less total number of tasks count (192) and parallel running task count (24), but each task took around 7 min to complete.

So here again, if resources are available in quota, more parallelism can be achieved using schedulerbacklogtimeout (say 15 mins) and speeds up the job.


image.png

Best Regards



 

From: Attila Zsolt Piros <[hidden email]>
Sent: Friday, April 9, 2021 11:11 AM
To: Ranju Jain <[hidden email]>
Cc: [hidden email]
Subject: Re: Dynamic Allocation Backlog Property in Spark on Kubernetes

 

You should not set "spark.dynamicAllocation.schedulerBacklogTimeout" so high and the purpose of this config is very different form the one you would like to use it for.


The confusion I guess comes from the fact that you are still thinking in multiple Spark jobs.

But Dynamic Allocation is useful in case of a single Spark job, too.
With Dynamic allocation if there are pending tasks then new resources should be allocated to speed up the calculation.
If you do not have enough partitions then you do not have enough tasks to run in parallel that was my earlier comment about.

So let's focus on your first job:
- With 3 executors it takes 2 hours to complete, right?
- And what about 8 executors?  I hope significantly less time. 

So if you have more than 3 partitions and the tasks are meaningfully long enough to request some extra resources (schedulerBacklogTimeout) and the number of running executors are lower than the maximum number of executors you set (maxExecutors) then why wouldn't you want to use those extra resources?


 

 

On Fri, Apr 9, 2021 at 6:03 AM Ranju Jain <[hidden email]> wrote:

Hi Attila,

 

Thanks for your reply.

 

If I talk about single job which started to run with minExecutors as 3. And Suppose this job [which reads the full data from backend and process and writes it to a location]

takes around 2 hour to complete.

 

What I understood is, as the default value of spark.dynamicAllocation.schedulerBacklogTimeout is 1 sec, so executors will scale from 3 to 4 and then 8 after every second if tasks are pending at scheduler backend. So If I don’t want  it 1 sec and I might set it to 1 hour [3600 sec] in 2 hour of spark job.

 

So this is all about when I want to scale executors dynamically for spark job. Is that understanding correct?

 

In the below statement I don’t understand much about available partitions :-(

pending tasks (which kinda related to the available partitions)

 

 

Regards

Ranju

 

 

From: Attila Zsolt Piros <[hidden email]>
Sent: Friday, April 9, 2021 12:13 AM
To: Ranju Jain <[hidden email]>
Cc: [hidden email]
Subject: Re: Dynamic Allocation Backlog Property in Spark on Kubernetes

 

Hi!

For dynamic allocation you do not need to run the Spark jobs in parallel.
Dynamic allocation simply means Spark scales up by requesting more executors when there are pending tasks (which kinda related to the available partitions) and scales down when the executor is idle (as within one job the number of partitions can fluctuate).  

But if you optimize for run time you can start those jobs in parallel at the beginning. 

In this case you will use higher number of executors even from the beginning.

The "spark.dynamicAllocation.schedulerBacklogTimeout" is not for to schedule/synchronize different Spark jobs but it is about tasks. 

Best regards,
Attila 

 

On Tue, Apr 6, 2021 at 1:59 PM Ranju Jain <[hidden email]> wrote:

Hi All,

 

I have set dynamic allocation enabled while running spark on Kubernetes . But new executors are requested if pending tasks are backlogged for more than configured duration in property “spark.dynamicAllocation.schedulerBacklogTimeout”.

 

My Use Case is:

 

There are number of parallel jobs which might or might not run together at a particular point of time. E.g Only One Spark Job may run at a point of time or two spark jobs may run at a single point of time depending upon the need.

I configured spark.dynamicAllocation.minExecutors as 3 and spark.dynamicAllocation.maxExecutors as 8 .

 

Steps:

  1. SparkContext initialized with 3 executors and First Job requested.
  2. Now, if second job requested after few mins  (e.g 15 mins) , I am thinking if I can use the benefit of dynamic allocation and executor should scale up to handle second job tasks.

For this I think “spark.dynamicAllocation.schedulerBacklogTimeout” needs to set after which new executors would be requested.

Problem: Problem is there are chances that second job is not requested at all or may be requested after 10 mins or after 20 mins. How can I set a constant value for

property “spark.dynamicAllocation.schedulerBacklogTimeout” to scale the executors , when tasks backlog is dependent upon the number of jobs requested.

 

Regards

Ranju













You should not set "spark.dynamicAllocation.schedulerBacklogTimeout" so high and the purpose of this config is very different form the one you would like to use it for.


The confusion I guess comes from the fact that you are still thinking in multiple Spark jobs.

But Dynamic Allocation is useful in case of a single Spark job, too.
With Dynamic allocation if there are pending tasks then new resources should be allocated to speed up the calculation.
If you do not have enough partitions then you do not have enough tasks to run in parallel that was my earlier comment about.

So let's focus on your first job:
- With 3 executors it takes 2 hours to complete, right?
- And what about 8 executors?  I hope significantly less time. 

So if you have more than 3 partitions and the tasks are meaningfully long enough to request some extra resources (schedulerBacklogTimeout) and the number of running executors are lower than the maximum number of executors you set (maxExecutors) then why wouldn't you want to use those extra resources?


 

 

On Fri, Apr 9, 2021 at 6:03 AM Ranju Jain <[hidden email]> wrote:

Hi Attila,

 

Thanks for your reply.

 

If I talk about single job which started to run with minExecutors as 3. And Suppose this job [which reads the full data from backend and process and writes it to a location]

takes around 2 hour to complete.

 

What I understood is, as the default value of spark.dynamicAllocation.schedulerBacklogTimeout is 1 sec, so executors will scale from 3 to 4 and then 8 after every second if tasks are pending at scheduler backend. So If I don’t want  it 1 sec and I might set it to 1 hour [3600 sec] in 2 hour of spark job.

 

So this is all about when I want to scale executors dynamically for spark job. Is that understanding correct?

 

In the below statement I don’t understand much about available partitions :-(

pending tasks (which kinda related to the available partitions)

 

 

Regards

Ranju

 

 

From: Attila Zsolt Piros <[hidden email]>
Sent: Friday, April 9, 2021 12:13 AM
To: Ranju Jain <[hidden email]>
Cc: [hidden email]
Subject: Re: Dynamic Allocation Backlog Property in Spark on Kubernetes

 

Hi!

For dynamic allocation you do not need to run the Spark jobs in parallel.
Dynamic allocation simply means Spark scales up by requesting more executors when there are pending tasks (which kinda related to the available partitions) and scales down when the executor is idle (as within one job the number of partitions can fluctuate).  

But if you optimize for run time you can start those jobs in parallel at the beginning. 

In this case you will use higher number of executors even from the beginning.

The "spark.dynamicAllocation.schedulerBacklogTimeout" is not for to schedule/synchronize different Spark jobs but it is about tasks. 

Best regards,
Attila 

 

On Tue, Apr 6, 2021 at 1:59 PM Ranju Jain <[hidden email]> wrote:

Hi All,

 

I have set dynamic allocation enabled while running spark on Kubernetes . But new executors are requested if pending tasks are backlogged for more than configured duration in property “spark.dynamicAllocation.schedulerBacklogTimeout”.

 

My Use Case is:

 

There are number of parallel jobs which might or might not run together at a particular point of time. E.g Only One Spark Job may run at a point of time or two spark jobs may run at a single point of time depending upon the need.

I configured spark.dynamicAllocation.minExecutors as 3 and spark.dynamicAllocation.maxExecutors as 8 .

 

Steps:

  1. SparkContext initialized with 3 executors and First Job requested.
  2. Now, if second job requested after few mins  (e.g 15 mins) , I am thinking if I can use the benefit of dynamic allocation and executor should scale up to handle second job tasks.

For this I think “spark.dynamicAllocation.schedulerBacklogTimeout” needs to set after which new executors would be requested.

Problem: Problem is there are chances that second job is not requested at all or may be requested after 10 mins or after 20 mins. How can I set a constant value for

property “spark.dynamicAllocation.schedulerBacklogTimeout” to scale the executors , when tasks backlog is dependent upon the number of jobs requested.

 

Regards

Ranju

Reply | Threaded
Open this post in threaded view
|

Re: Dynamic Allocation Backlog Property in Spark on Kubernetes

ranju goel
Hi Attila,

Thanks for your guidance of how to use dynamic allocation effectively for spark job. Now I am bit more confident to set the schedulerbacklogtimeout wisely.

In your statement "If there is no more available new resource for Spark then the existing ones will be used (even the min executors is not guaranteed to be reached if no available resources)."

We discussed this thing earlier using spark jira where I wanted to kill the job if no available resources. I finally succeeded to kill the job with the help of  spark Events Listener Bus where I use the spark.extralisteners and get the notification when executors gets added during SparkContext initialization.
If I don't notify within 10 secs(approx) at the time of SparkContext initialization, then I assume the Resources are unavailable and subsequently I killed the job.

I saw a similar property in spark documentation :

spark.scheduler.maxRegisteredResourcesWaitingTime 30s Maximum amount of time to wait for resources to register before scheduling begins.

Does this property mean that scheduler will wait only for 30sec and if resources still not registered, then scheduler will forget this job but executors will remain in Pending state. Or this property does more?

Best Regards



On Sat, Apr 10, 2021 at 11:50 PM Attila Zsolt Piros <[hidden email]> wrote:
Hi Ranju!

> But if there are no extra resources available, then go for static allocation rather dynamic. Is it correct ?

I think there is no such rule. If there is no more available new resource for Spark then the existing ones will be used (even the min executors is not guaranteed to be reached if no available resources).

But I suggest to always set the max executors to a meaningful value (the default is too high: int max). 
This way you can avoid too high costs for a small/medium sized job where the tasks number is high but their size are small.

Regarding your questions: in both cases as I see extra resources are helping and the jobs will be finished faster.

Best Regards,
Attila
  

On Sat, Apr 10, 2021 at 7:01 PM ranju goel <[hidden email]> wrote:

Hi Attila,


I understood what you mean that Use the extra resources if available for running spark job, using schedulerbacklogtimeout (dynamic allocation).

This will speeds up the job. But if there are no extra resources available, then go for static allocation rather dynamic. Is it correct ?


Please validate below few scenarios for effective use of dynamic allocation


1.  Below screenshot shows, the Tasks are tiny, each task is executing fast, but number of total tasks count is high (3241).

Dynamic Allocation Advantage for this scenario

If reserved spark quota has more resources available when min Executors running, setting schedulerbacklogtimeout to few secs [say 15 min], those available quota resources can be used and  (3241) number of tasks can be finished fast. Is this understanding correct?

image.png



2. Below report has less total number of tasks count (192) and parallel running task count (24), but each task took around 7 min to complete.

So here again, if resources are available in quota, more parallelism can be achieved using schedulerbacklogtimeout (say 15 mins) and speeds up the job.


image.png

Best Regards



 

From: Attila Zsolt Piros <[hidden email]>
Sent: Friday, April 9, 2021 11:11 AM
To: Ranju Jain <[hidden email]>
Cc: [hidden email]
Subject: Re: Dynamic Allocation Backlog Property in Spark on Kubernetes

 

You should not set "spark.dynamicAllocation.schedulerBacklogTimeout" so high and the purpose of this config is very different form the one you would like to use it for.


The confusion I guess comes from the fact that you are still thinking in multiple Spark jobs.

But Dynamic Allocation is useful in case of a single Spark job, too.
With Dynamic allocation if there are pending tasks then new resources should be allocated to speed up the calculation.
If you do not have enough partitions then you do not have enough tasks to run in parallel that was my earlier comment about.

So let's focus on your first job:
- With 3 executors it takes 2 hours to complete, right?
- And what about 8 executors?  I hope significantly less time. 

So if you have more than 3 partitions and the tasks are meaningfully long enough to request some extra resources (schedulerBacklogTimeout) and the number of running executors are lower than the maximum number of executors you set (maxExecutors) then why wouldn't you want to use those extra resources?


 

 

On Fri, Apr 9, 2021 at 6:03 AM Ranju Jain <[hidden email]> wrote:

Hi Attila,

 

Thanks for your reply.

 

If I talk about single job which started to run with minExecutors as 3. And Suppose this job [which reads the full data from backend and process and writes it to a location]

takes around 2 hour to complete.

 

What I understood is, as the default value of spark.dynamicAllocation.schedulerBacklogTimeout is 1 sec, so executors will scale from 3 to 4 and then 8 after every second if tasks are pending at scheduler backend. So If I don’t want  it 1 sec and I might set it to 1 hour [3600 sec] in 2 hour of spark job.

 

So this is all about when I want to scale executors dynamically for spark job. Is that understanding correct?

 

In the below statement I don’t understand much about available partitions :-(

pending tasks (which kinda related to the available partitions)

 

 

Regards

Ranju

 

 

From: Attila Zsolt Piros <[hidden email]>
Sent: Friday, April 9, 2021 12:13 AM
To: Ranju Jain <[hidden email]>
Cc: [hidden email]
Subject: Re: Dynamic Allocation Backlog Property in Spark on Kubernetes

 

Hi!

For dynamic allocation you do not need to run the Spark jobs in parallel.
Dynamic allocation simply means Spark scales up by requesting more executors when there are pending tasks (which kinda related to the available partitions) and scales down when the executor is idle (as within one job the number of partitions can fluctuate).  

But if you optimize for run time you can start those jobs in parallel at the beginning. 

In this case you will use higher number of executors even from the beginning.

The "spark.dynamicAllocation.schedulerBacklogTimeout" is not for to schedule/synchronize different Spark jobs but it is about tasks. 

Best regards,
Attila 

 

On Tue, Apr 6, 2021 at 1:59 PM Ranju Jain <[hidden email]> wrote:

Hi All,

 

I have set dynamic allocation enabled while running spark on Kubernetes . But new executors are requested if pending tasks are backlogged for more than configured duration in property “spark.dynamicAllocation.schedulerBacklogTimeout”.

 

My Use Case is:

 

There are number of parallel jobs which might or might not run together at a particular point of time. E.g Only One Spark Job may run at a point of time or two spark jobs may run at a single point of time depending upon the need.

I configured spark.dynamicAllocation.minExecutors as 3 and spark.dynamicAllocation.maxExecutors as 8 .

 

Steps:

  1. SparkContext initialized with 3 executors and First Job requested.
  2. Now, if second job requested after few mins  (e.g 15 mins) , I am thinking if I can use the benefit of dynamic allocation and executor should scale up to handle second job tasks.

For this I think “spark.dynamicAllocation.schedulerBacklogTimeout” needs to set after which new executors would be requested.

Problem: Problem is there are chances that second job is not requested at all or may be requested after 10 mins or after 20 mins. How can I set a constant value for

property “spark.dynamicAllocation.schedulerBacklogTimeout” to scale the executors , when tasks backlog is dependent upon the number of jobs requested.

 

Regards

Ranju













You should not set "spark.dynamicAllocation.schedulerBacklogTimeout" so high and the purpose of this config is very different form the one you would like to use it for.


The confusion I guess comes from the fact that you are still thinking in multiple Spark jobs.

But Dynamic Allocation is useful in case of a single Spark job, too.
With Dynamic allocation if there are pending tasks then new resources should be allocated to speed up the calculation.
If you do not have enough partitions then you do not have enough tasks to run in parallel that was my earlier comment about.

So let's focus on your first job:
- With 3 executors it takes 2 hours to complete, right?
- And what about 8 executors?  I hope significantly less time. 

So if you have more than 3 partitions and the tasks are meaningfully long enough to request some extra resources (schedulerBacklogTimeout) and the number of running executors are lower than the maximum number of executors you set (maxExecutors) then why wouldn't you want to use those extra resources?


 

 

On Fri, Apr 9, 2021 at 6:03 AM Ranju Jain <[hidden email]> wrote:

Hi Attila,

 

Thanks for your reply.

 

If I talk about single job which started to run with minExecutors as 3. And Suppose this job [which reads the full data from backend and process and writes it to a location]

takes around 2 hour to complete.

 

What I understood is, as the default value of spark.dynamicAllocation.schedulerBacklogTimeout is 1 sec, so executors will scale from 3 to 4 and then 8 after every second if tasks are pending at scheduler backend. So If I don’t want  it 1 sec and I might set it to 1 hour [3600 sec] in 2 hour of spark job.

 

So this is all about when I want to scale executors dynamically for spark job. Is that understanding correct?

 

In the below statement I don’t understand much about available partitions :-(

pending tasks (which kinda related to the available partitions)

 

 

Regards

Ranju

 

 

From: Attila Zsolt Piros <[hidden email]>
Sent: Friday, April 9, 2021 12:13 AM
To: Ranju Jain <[hidden email]>
Cc: [hidden email]
Subject: Re: Dynamic Allocation Backlog Property in Spark on Kubernetes

 

Hi!

For dynamic allocation you do not need to run the Spark jobs in parallel.
Dynamic allocation simply means Spark scales up by requesting more executors when there are pending tasks (which kinda related to the available partitions) and scales down when the executor is idle (as within one job the number of partitions can fluctuate).  

But if you optimize for run time you can start those jobs in parallel at the beginning. 

In this case you will use higher number of executors even from the beginning.

The "spark.dynamicAllocation.schedulerBacklogTimeout" is not for to schedule/synchronize different Spark jobs but it is about tasks. 

Best regards,
Attila 

 

On Tue, Apr 6, 2021 at 1:59 PM Ranju Jain <[hidden email]> wrote:

Hi All,

 

I have set dynamic allocation enabled while running spark on Kubernetes . But new executors are requested if pending tasks are backlogged for more than configured duration in property “spark.dynamicAllocation.schedulerBacklogTimeout”.

 

My Use Case is:

 

There are number of parallel jobs which might or might not run together at a particular point of time. E.g Only One Spark Job may run at a point of time or two spark jobs may run at a single point of time depending upon the need.

I configured spark.dynamicAllocation.minExecutors as 3 and spark.dynamicAllocation.maxExecutors as 8 .

 

Steps:

  1. SparkContext initialized with 3 executors and First Job requested.
  2. Now, if second job requested after few mins  (e.g 15 mins) , I am thinking if I can use the benefit of dynamic allocation and executor should scale up to handle second job tasks.

For this I think “spark.dynamicAllocation.schedulerBacklogTimeout” needs to set after which new executors would be requested.

Problem: Problem is there are chances that second job is not requested at all or may be requested after 10 mins or after 20 mins. How can I set a constant value for

property “spark.dynamicAllocation.schedulerBacklogTimeout” to scale the executors , when tasks backlog is dependent upon the number of jobs requested.

 

Regards

Ranju