[Spark Optimization] Why is one node getting all the pressure?

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
14 messages Options
Reply | Threaded
Open this post in threaded view
|

[Spark Optimization] Why is one node getting all the pressure?

Aakash Basu-2
Hi,

I have submitted a job on 4 node cluster, where I see, most of the operations happening at one of the worker nodes and other two are simply chilling out.

Picture below puts light on that -

How to properly distribute the load?

My cluster conf (4 node cluster [1 driver; 3 slaves]) -

Cores - 6
RAM - 12 GB
HDD - 60 GB

My Spark Submit command is as follows -

spark-submit --master spark://192.168.49.37:7077 --num-executors 3 --executor-cores 5 --executor-memory 4G /appdata/bblite-codebase/prima_diabetes_indians.py

What to do?

Thanks,
Aakash.
Reply | Threaded
Open this post in threaded view
|

Re: [Spark Optimization] Why is one node getting all the pressure?

Jörn Franke
What is your code ? Maybe this one does an operation which is bound to a single host or your data volume is too small for multiple hosts.

On 11. Jun 2018, at 11:13, Aakash Basu <[hidden email]> wrote:

Hi,

I have submitted a job on 4 node cluster, where I see, most of the operations happening at one of the worker nodes and other two are simply chilling out.

Picture below puts light on that -
<image.png>
How to properly distribute the load?

My cluster conf (4 node cluster [1 driver; 3 slaves]) -

Cores - 6
RAM - 12 GB
HDD - 60 GB

My Spark Submit command is as follows -

spark-submit --master spark://192.168.49.37:7077 --num-executors 3 --executor-cores 5 --executor-memory 4G /appdata/bblite-codebase/prima_diabetes_indians.py

What to do?

Thanks,
Aakash.
Reply | Threaded
Open this post in threaded view
|

Re: [Spark Optimization] Why is one node getting all the pressure?

akshay naidu
In reply to this post by Aakash Basu-2
try
 --num-executors 3 --executor-cores 4 --executor-memory 2G --conf spark.scheduler.mode=FAIR

On Mon, Jun 11, 2018 at 2:43 PM, Aakash Basu <[hidden email]> wrote:
Hi,

I have submitted a job on 4 node cluster, where I see, most of the operations happening at one of the worker nodes and other two are simply chilling out.

Picture below puts light on that -

How to properly distribute the load?

My cluster conf (4 node cluster [1 driver; 3 slaves]) -

Cores - 6
RAM - 12 GB
HDD - 60 GB

My Spark Submit command is as follows -

spark-submit --master spark://192.168.49.37:7077 --num-executors 3 --executor-cores 5 --executor-memory 4G /appdata/bblite-codebase/prima_diabetes_indians.py

What to do?

Thanks,
Aakash.

Reply | Threaded
Open this post in threaded view
|

Re: [Spark Optimization] Why is one node getting all the pressure?

Aakash Basu-2
Jorn - The code is a series of feature engineering and model tuning operations. Too big to show. Yes, data volume is too low, it is in KBs, just tried to experiment with a small dataset before going for a large one.

Akshay - I ran with your suggested spark configurations, I get this (the node changed, but the problem persists) -





On Mon, Jun 11, 2018 at 3:16 PM, akshay naidu <[hidden email]> wrote:
try
 --num-executors 3 --executor-cores 4 --executor-memory 2G --conf spark.scheduler.mode=FAIR

On Mon, Jun 11, 2018 at 2:43 PM, Aakash Basu <[hidden email]> wrote:
Hi,

I have submitted a job on 4 node cluster, where I see, most of the operations happening at one of the worker nodes and other two are simply chilling out.

Picture below puts light on that -

How to properly distribute the load?

My cluster conf (4 node cluster [1 driver; 3 slaves]) -

Cores - 6
RAM - 12 GB
HDD - 60 GB

My Spark Submit command is as follows -

spark-submit --master spark://192.168.49.37:7077 --num-executors 3 --executor-cores 5 --executor-memory 4G /appdata/bblite-codebase/prima_diabetes_indians.py

What to do?

Thanks,
Aakash.


Reply | Threaded
Open this post in threaded view
|

Re: [Spark Optimization] Why is one node getting all the pressure?

Jörn Franke
If it is in kB then spark will always schedule it to one node. As soon as it gets bigger you will see usage of more nodes.

Hence increase your testing Dataset .

On 11. Jun 2018, at 12:22, Aakash Basu <[hidden email]> wrote:

Jorn - The code is a series of feature engineering and model tuning operations. Too big to show. Yes, data volume is too low, it is in KBs, just tried to experiment with a small dataset before going for a large one.

Akshay - I ran with your suggested spark configurations, I get this (the node changed, but the problem persists) -

<image.png>



On Mon, Jun 11, 2018 at 3:16 PM, akshay naidu <[hidden email]> wrote:
try
 --num-executors 3 --executor-cores 4 --executor-memory 2G --conf spark.scheduler.mode=FAIR

On Mon, Jun 11, 2018 at 2:43 PM, Aakash Basu <[hidden email]> wrote:
Hi,

I have submitted a job on 4 node cluster, where I see, most of the operations happening at one of the worker nodes and other two are simply chilling out.

Picture below puts light on that -

How to properly distribute the load?

My cluster conf (4 node cluster [1 driver; 3 slaves]) -

Cores - 6
RAM - 12 GB
HDD - 60 GB

My Spark Submit command is as follows -

spark-submit --master spark://192.168.49.37:7077 --num-executors 3 --executor-cores 5 --executor-memory 4G /appdata/bblite-codebase/prima_diabetes_indians.py

What to do?

Thanks,
Aakash.


Reply | Threaded
Open this post in threaded view
|

Re: [Spark Optimization] Why is one node getting all the pressure?

Aakash Basu-2
In reply to this post by Aakash Basu-2
Hi,

When I am running the same in my local machine by creating two workers and 1 driver, the distribution is proper and hence, the speed is faster. Mine is a 8 core, 16 Gigs RAM machine.

Is there then anything to do with the spark-submit configurations?

Any help?

Thanks,
Aakash.

On Mon, Jun 11, 2018 at 3:52 PM, Aakash Basu <[hidden email]> wrote:
Jorn - The code is a series of feature engineering and model tuning operations. Too big to show. Yes, data volume is too low, it is in KBs, just tried to experiment with a small dataset before going for a large one.

Akshay - I ran with your suggested spark configurations, I get this (the node changed, but the problem persists) -





On Mon, Jun 11, 2018 at 3:16 PM, akshay naidu <[hidden email]> wrote:
try
 --num-executors 3 --executor-cores 4 --executor-memory 2G --conf spark.scheduler.mode=FAIR

On Mon, Jun 11, 2018 at 2:43 PM, Aakash Basu <[hidden email]> wrote:
Hi,

I have submitted a job on 4 node cluster, where I see, most of the operations happening at one of the worker nodes and other two are simply chilling out.

Picture below puts light on that -

How to properly distribute the load?

My cluster conf (4 node cluster [1 driver; 3 slaves]) -

Cores - 6
RAM - 12 GB
HDD - 60 GB

My Spark Submit command is as follows -

spark-submit --master spark://192.168.49.37:7077 --num-executors 3 --executor-cores 5 --executor-memory 4G /appdata/bblite-codebase/prima_diabetes_indians.py

What to do?

Thanks,
Aakash.



Reply | Threaded
Open this post in threaded view
|

Re: [Spark Optimization] Why is one node getting all the pressure?

Aakash Basu-2
In reply to this post by Jörn Franke
Hi Jorn/Others,

Thanks for your help. Now, data is being distributed in a proper way, but the challenge is, after a certain point, I'm getting this error, after which, everything stops moving ahead -

2018-06-11 18:14:56 ERROR TaskSchedulerImpl:70 - Lost executor 0 on 192.168.49.39: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.



How to avoid this scenario?

Thanks,
Aakash.

On Mon, Jun 11, 2018 at 4:16 PM, Jörn Franke <[hidden email]> wrote:
If it is in kB then spark will always schedule it to one node. As soon as it gets bigger you will see usage of more nodes.

Hence increase your testing Dataset .

On 11. Jun 2018, at 12:22, Aakash Basu <[hidden email]> wrote:

Jorn - The code is a series of feature engineering and model tuning operations. Too big to show. Yes, data volume is too low, it is in KBs, just tried to experiment with a small dataset before going for a large one.

Akshay - I ran with your suggested spark configurations, I get this (the node changed, but the problem persists) -

<image.png>



On Mon, Jun 11, 2018 at 3:16 PM, akshay naidu <[hidden email]> wrote:
try
 --num-executors 3 --executor-cores 4 --executor-memory 2G --conf spark.scheduler.mode=FAIR

On Mon, Jun 11, 2018 at 2:43 PM, Aakash Basu <[hidden email]> wrote:
Hi,

I have submitted a job on 4 node cluster, where I see, most of the operations happening at one of the worker nodes and other two are simply chilling out.

Picture below puts light on that -

How to properly distribute the load?

My cluster conf (4 node cluster [1 driver; 3 slaves]) -

Cores - 6
RAM - 12 GB
HDD - 60 GB

My Spark Submit command is as follows -

spark-submit --master spark://192.168.49.37:7077 --num-executors 3 --executor-cores 5 --executor-memory 4G /appdata/bblite-codebase/prima_diabetes_indians.py

What to do?

Thanks,
Aakash.



Reply | Threaded
Open this post in threaded view
|

Re: [Spark Optimization] Why is one node getting all the pressure?

Vamshi Talla
Aakash,

Like Jorn suggested, did you increase your test data set? If so, did you also update your executor-memory setting? It seems like you might exceeding the executor memory threshold.

Thanks 
Vamshi Talla

Sent from my iPhone

On Jun 11, 2018, at 8:54 AM, Aakash Basu <[hidden email]> wrote:

Hi Jorn/Others,

Thanks for your help. Now, data is being distributed in a proper way, but the challenge is, after a certain point, I'm getting this error, after which, everything stops moving ahead -

2018-06-11 18:14:56 ERROR TaskSchedulerImpl:70 - Lost executor 0 on 192.168.49.39: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.

<image.png>

How to avoid this scenario?

Thanks,
Aakash.

On Mon, Jun 11, 2018 at 4:16 PM, Jörn Franke <[hidden email]> wrote:
If it is in kB then spark will always schedule it to one node. As soon as it gets bigger you will see usage of more nodes.

Hence increase your testing Dataset .

On 11. Jun 2018, at 12:22, Aakash Basu <[hidden email]> wrote:

Jorn - The code is a series of feature engineering and model tuning operations. Too big to show. Yes, data volume is too low, it is in KBs, just tried to experiment with a small dataset before going for a large one.

Akshay - I ran with your suggested spark configurations, I get this (the node changed, but the problem persists) -

<image.png>



On Mon, Jun 11, 2018 at 3:16 PM, akshay naidu <[hidden email]> wrote:
try
 --num-executors 3 --executor-cores 4 --executor-memory 2G --conf spark.scheduler.mode=FAIR

On Mon, Jun 11, 2018 at 2:43 PM, Aakash Basu <[hidden email]> wrote:
Hi,

I have submitted a job on 4 node cluster, where I see, most of the operations happening at one of the worker nodes and other two are simply chilling out.

Picture below puts light on that -

How to properly distribute the load?

My cluster conf (4 node cluster [1 driver; 3 slaves]) -

Cores - 6
RAM - 12 GB
HDD - 60 GB

My Spark Submit command is as follows -

spark-submit --master spark://192.168.49.37:7077 --num-executors 3 --executor-cores 5 --executor-memory 4G /appdata/bblite-codebase/prima_diabetes_indians.py

What to do?

Thanks,
Aakash.




image.png (92K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: [Spark Optimization] Why is one node getting all the pressure?

Aakash Basu-2
Yes, but when I did increase my executor memory, the spark job is going to halt after running a few steps, even though, the executor isn't dying.

Data - 60,000 data-points, 230 columns (60 MB data).

Any input on why it behaves like that?

On Tue, Jun 12, 2018 at 8:15 AM, Vamshi Talla <[hidden email]> wrote:
Aakash,

Like Jorn suggested, did you increase your test data set? If so, did you also update your executor-memory setting? It seems like you might exceeding the executor memory threshold.

Thanks 
Vamshi Talla

Sent from my iPhone

On Jun 11, 2018, at 8:54 AM, Aakash Basu <[hidden email]> wrote:

Hi Jorn/Others,

Thanks for your help. Now, data is being distributed in a proper way, but the challenge is, after a certain point, I'm getting this error, after which, everything stops moving ahead -

2018-06-11 18:14:56 ERROR TaskSchedulerImpl:70 - Lost executor 0 on 192.168.49.39: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.

<image.png>

How to avoid this scenario?

Thanks,
Aakash.

On Mon, Jun 11, 2018 at 4:16 PM, Jörn Franke <[hidden email]> wrote:
If it is in kB then spark will always schedule it to one node. As soon as it gets bigger you will see usage of more nodes.

Hence increase your testing Dataset .

On 11. Jun 2018, at 12:22, Aakash Basu <[hidden email]> wrote:

Jorn - The code is a series of feature engineering and model tuning operations. Too big to show. Yes, data volume is too low, it is in KBs, just tried to experiment with a small dataset before going for a large one.

Akshay - I ran with your suggested spark configurations, I get this (the node changed, but the problem persists) -

<image.png>



On Mon, Jun 11, 2018 at 3:16 PM, akshay naidu <[hidden email]> wrote:
try
 --num-executors 3 --executor-cores 4 --executor-memory 2G --conf spark.scheduler.mode=FAIR

On Mon, Jun 11, 2018 at 2:43 PM, Aakash Basu <[hidden email]> wrote:
Hi,

I have submitted a job on 4 node cluster, where I see, most of the operations happening at one of the worker nodes and other two are simply chilling out.

Picture below puts light on that -

How to properly distribute the load?

My cluster conf (4 node cluster [1 driver; 3 slaves]) -

Cores - 6
RAM - 12 GB
HDD - 60 GB

My Spark Submit command is as follows -

spark-submit --master spark://192.168.49.37:7077 --num-executors 3 --executor-cores 5 --executor-memory 4G /appdata/bblite-codebase/prima_diabetes_indians.py

What to do?

Thanks,
Aakash.




Reply | Threaded
Open this post in threaded view
|

Re: [Spark Optimization] Why is one node getting all the pressure?

srinath
Hi Aakash,

Can you check the logs for Executor ID 0? It was restarted on worker 192.168.49.39 perhaps due to OOM or something.

Also observed that the number of tasks are high and unevenly distributed across the workers.
Check if there are too many partitions in the RDD and tune it using spark.sql.shuffle.partitions.
If the uneven distribution is still occurring then try repartitioning the data set using appropriate fields.

Hope that helps.
Regards,
Srinath.


On Tue, Jun 12, 2018 at 1:39 PM Aakash Basu <[hidden email]> wrote:
Yes, but when I did increase my executor memory, the spark job is going to halt after running a few steps, even though, the executor isn't dying.

Data - 60,000 data-points, 230 columns (60 MB data).

Any input on why it behaves like that?

On Tue, Jun 12, 2018 at 8:15 AM, Vamshi Talla <[hidden email]> wrote:
Aakash,

Like Jorn suggested, did you increase your test data set? If so, did you also update your executor-memory setting? It seems like you might exceeding the executor memory threshold.

Thanks 
Vamshi Talla

Sent from my iPhone

On Jun 11, 2018, at 8:54 AM, Aakash Basu <[hidden email]> wrote:

Hi Jorn/Others,

Thanks for your help. Now, data is being distributed in a proper way, but the challenge is, after a certain point, I'm getting this error, after which, everything stops moving ahead -

2018-06-11 18:14:56 ERROR TaskSchedulerImpl:70 - Lost executor 0 on 192.168.49.39: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.

<image.png>

How to avoid this scenario?

Thanks,
Aakash.

On Mon, Jun 11, 2018 at 4:16 PM, Jörn Franke <[hidden email]> wrote:
If it is in kB then spark will always schedule it to one node. As soon as it gets bigger you will see usage of more nodes.

Hence increase your testing Dataset .

On 11. Jun 2018, at 12:22, Aakash Basu <[hidden email]> wrote:

Jorn - The code is a series of feature engineering and model tuning operations. Too big to show. Yes, data volume is too low, it is in KBs, just tried to experiment with a small dataset before going for a large one.

Akshay - I ran with your suggested spark configurations, I get this (the node changed, but the problem persists) -

<image.png>



On Mon, Jun 11, 2018 at 3:16 PM, akshay naidu <[hidden email]> wrote:
try
 --num-executors 3 --executor-cores 4 --executor-memory 2G --conf spark.scheduler.mode=FAIR

On Mon, Jun 11, 2018 at 2:43 PM, Aakash Basu <[hidden email]> wrote:
Hi,

I have submitted a job on 4 node cluster, where I see, most of the operations happening at one of the worker nodes and other two are simply chilling out.

Picture below puts light on that -

How to properly distribute the load?

My cluster conf (4 node cluster [1 driver; 3 slaves]) -

Cores - 6
RAM - 12 GB
HDD - 60 GB

My Spark Submit command is as follows -

spark-submit --master spark://192.168.49.37:7077 --num-executors 3 --executor-cores 5 --executor-memory 4G /appdata/bblite-codebase/prima_diabetes_indians.py

What to do?

Thanks,
Aakash.




Reply | Threaded
Open this post in threaded view
|

Re: [Spark Optimization] Why is one node getting all the pressure?

Aakash Basu-2
Hi Srinath,

Thanks for such an elaborate reply. How to reduce the number of overall tasks?

I found, after simply repartitioning the csv file into 8 parts and converting it to parquet with snappy compression, helped not only in even distribution of the tasks on all nodes, but also helped in bringing the end to end job timing down to approx 0.8X of the prior run.

Query - Check if there are too many partitions in the RDD and tune it using spark.sql.shuffle.partitions. How to do this? Because I have a huge pipeline of memory and CPU intensive operations, which will ideally have innumerable spark transformations. At which level should I apply the same? My total tasks of an average dataset is going to around 2 millions (approx), is it a bad show? How can I control? Do I need to re-factor my entire Pipeline (series of codes) then?

Below is the new executors show while the updated run is taking place -




Thanks,
Aakash.

On Tue, Jun 12, 2018 at 2:14 PM, Srinath C <[hidden email]> wrote:
Hi Aakash,

Can you check the logs for Executor ID 0? It was restarted on worker 192.168.49.39 perhaps due to OOM or something.

Also observed that the number of tasks are high and unevenly distributed across the workers.
Check if there are too many partitions in the RDD and tune it using spark.sql.shuffle.partitions.
If the uneven distribution is still occurring then try repartitioning the data set using appropriate fields.

Hope that helps.
Regards,
Srinath.


On Tue, Jun 12, 2018 at 1:39 PM Aakash Basu <[hidden email]> wrote:
Yes, but when I did increase my executor memory, the spark job is going to halt after running a few steps, even though, the executor isn't dying.

Data - 60,000 data-points, 230 columns (60 MB data).

Any input on why it behaves like that?

On Tue, Jun 12, 2018 at 8:15 AM, Vamshi Talla <[hidden email]> wrote:
Aakash,

Like Jorn suggested, did you increase your test data set? If so, did you also update your executor-memory setting? It seems like you might exceeding the executor memory threshold.

Thanks 
Vamshi Talla

Sent from my iPhone

On Jun 11, 2018, at 8:54 AM, Aakash Basu <[hidden email]> wrote:

Hi Jorn/Others,

Thanks for your help. Now, data is being distributed in a proper way, but the challenge is, after a certain point, I'm getting this error, after which, everything stops moving ahead -

2018-06-11 18:14:56 ERROR TaskSchedulerImpl:70 - Lost executor 0 on 192.168.49.39: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.

<image.png>

How to avoid this scenario?

Thanks,
Aakash.

On Mon, Jun 11, 2018 at 4:16 PM, Jörn Franke <[hidden email]> wrote:
If it is in kB then spark will always schedule it to one node. As soon as it gets bigger you will see usage of more nodes.

Hence increase your testing Dataset .

On 11. Jun 2018, at 12:22, Aakash Basu <[hidden email]> wrote:

Jorn - The code is a series of feature engineering and model tuning operations. Too big to show. Yes, data volume is too low, it is in KBs, just tried to experiment with a small dataset before going for a large one.

Akshay - I ran with your suggested spark configurations, I get this (the node changed, but the problem persists) -

<image.png>



On Mon, Jun 11, 2018 at 3:16 PM, akshay naidu <[hidden email]> wrote:
try
 --num-executors 3 --executor-cores 4 --executor-memory 2G --conf spark.scheduler.mode=FAIR

On Mon, Jun 11, 2018 at 2:43 PM, Aakash Basu <[hidden email]> wrote:
Hi,

I have submitted a job on 4 node cluster, where I see, most of the operations happening at one of the worker nodes and other two are simply chilling out.

Picture below puts light on that -

How to properly distribute the load?

My cluster conf (4 node cluster [1 driver; 3 slaves]) -

Cores - 6
RAM - 12 GB
HDD - 60 GB

My Spark Submit command is as follows -

spark-submit --master spark://192.168.49.37:7077 --num-executors 3 --executor-cores 5 --executor-memory 4G /appdata/bblite-codebase/prima_diabetes_indians.py

What to do?

Thanks,
Aakash.





Reply | Threaded
Open this post in threaded view
|

Re: [Spark Optimization] Why is one node getting all the pressure?

srinath
Hi Akash,

Glad to know that repartition helped!

The overall tasks actually depends on the kind of operations you are performing and also on how the DF is partitioned.
I can't comment on the former but can provide some pointers on the latter.

Default value of spark.sql.shuffle.partitions is 200. There will be 200 partitions of the DF or RDD after a shuffle.
This implies that there will be 200 tasks required for each stage on that DF.
You can check this in the "Storage tab" of the Spark UI while the application is running.

Try setting "--conf spark.sql.shuffle.partitions=N" while submitting the application.
Try different values of N (start with N=no of total executor cores) and see how it goes.
If you hit upon an OOM try increasing the value.

Hope it helps.
Regards,
Srinath.



On Tue, Jun 12, 2018 at 6:13 PM Aakash Basu <[hidden email]> wrote:
Hi Srinath,

Thanks for such an elaborate reply. How to reduce the number of overall tasks?

I found, after simply repartitioning the csv file into 8 parts and converting it to parquet with snappy compression, helped not only in even distribution of the tasks on all nodes, but also helped in bringing the end to end job timing down to approx 0.8X of the prior run.

Query - Check if there are too many partitions in the RDD and tune it using spark.sql.shuffle.partitions. How to do this? Because I have a huge pipeline of memory and CPU intensive operations, which will ideally have innumerable spark transformations. At which level should I apply the same? My total tasks of an average dataset is going to around 2 millions (approx), is it a bad show? How can I control? Do I need to re-factor my entire Pipeline (series of codes) then?

Below is the new executors show while the updated run is taking place -




Thanks,
Aakash.

On Tue, Jun 12, 2018 at 2:14 PM, Srinath C <[hidden email]> wrote:
Hi Aakash,

Can you check the logs for Executor ID 0? It was restarted on worker 192.168.49.39 perhaps due to OOM or something.

Also observed that the number of tasks are high and unevenly distributed across the workers.
Check if there are too many partitions in the RDD and tune it using spark.sql.shuffle.partitions.
If the uneven distribution is still occurring then try repartitioning the data set using appropriate fields.

Hope that helps.
Regards,
Srinath.


On Tue, Jun 12, 2018 at 1:39 PM Aakash Basu <[hidden email]> wrote:
Yes, but when I did increase my executor memory, the spark job is going to halt after running a few steps, even though, the executor isn't dying.

Data - 60,000 data-points, 230 columns (60 MB data).

Any input on why it behaves like that?

On Tue, Jun 12, 2018 at 8:15 AM, Vamshi Talla <[hidden email]> wrote:
Aakash,

Like Jorn suggested, did you increase your test data set? If so, did you also update your executor-memory setting? It seems like you might exceeding the executor memory threshold.

Thanks 
Vamshi Talla

Sent from my iPhone

On Jun 11, 2018, at 8:54 AM, Aakash Basu <[hidden email]> wrote:

Hi Jorn/Others,

Thanks for your help. Now, data is being distributed in a proper way, but the challenge is, after a certain point, I'm getting this error, after which, everything stops moving ahead -

2018-06-11 18:14:56 ERROR TaskSchedulerImpl:70 - Lost executor 0 on 192.168.49.39: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.

<image.png>

How to avoid this scenario?

Thanks,
Aakash.

On Mon, Jun 11, 2018 at 4:16 PM, Jörn Franke <[hidden email]> wrote:
If it is in kB then spark will always schedule it to one node. As soon as it gets bigger you will see usage of more nodes.

Hence increase your testing Dataset .

On 11. Jun 2018, at 12:22, Aakash Basu <[hidden email]> wrote:

Jorn - The code is a series of feature engineering and model tuning operations. Too big to show. Yes, data volume is too low, it is in KBs, just tried to experiment with a small dataset before going for a large one.

Akshay - I ran with your suggested spark configurations, I get this (the node changed, but the problem persists) -

<image.png>



On Mon, Jun 11, 2018 at 3:16 PM, akshay naidu <[hidden email]> wrote:
try
 --num-executors 3 --executor-cores 4 --executor-memory 2G --conf spark.scheduler.mode=FAIR

On Mon, Jun 11, 2018 at 2:43 PM, Aakash Basu <[hidden email]> wrote:
Hi,

I have submitted a job on 4 node cluster, where I see, most of the operations happening at one of the worker nodes and other two are simply chilling out.

Picture below puts light on that -

How to properly distribute the load?

My cluster conf (4 node cluster [1 driver; 3 slaves]) -

Cores - 6
RAM - 12 GB
HDD - 60 GB

My Spark Submit command is as follows -

spark-submit --master spark://192.168.49.37:7077 --num-executors 3 --executor-cores 5 --executor-memory 4G /appdata/bblite-codebase/prima_diabetes_indians.py

What to do?

Thanks,
Aakash.





Reply | Threaded
Open this post in threaded view
|

Re: [Spark Optimization] Why is one node getting all the pressure?

Aakash Basu-2
Thanks a lot Srinath!

Now, I increased my cluster size from 4 nodes to 6 nodes (1 driver, 5 worker).

Driver: 12 GB RAM, 6 Cores
2 Workers: 30 GB RAM, 14 Cores
3 Workers: 12 GB RAM, 6 Cores

When I submit a job, with this command (by calculating the ideal number of num executors, executor cores and executor memory) -

spark-submit --master spark://192.168.49.37:7077 --num-executors 5 --executor-cores 5 --executor-memory 10G --conf spark.storage.memoryFraction=0.2 --conf spark.sql.shuffle.partitions=25 /appdata/bblite-codebase/prima_diabetes_indians.py

The entire job runs extremely slowly, the zipWithIndex ideally takes less than 0.6 secs, but here, it takes much more than expected (Event timeline as follows) -


Job literally get's stalled after a few KBs of operation -


Where am I missing out?

Thanks,
Aakash.

On Wed, Jun 13, 2018 at 7:18 AM, Srinath C <[hidden email]> wrote:
Hi Akash,

Glad to know that repartition helped!

The overall tasks actually depends on the kind of operations you are performing and also on how the DF is partitioned.
I can't comment on the former but can provide some pointers on the latter.

Default value of spark.sql.shuffle.partitions is 200. There will be 200 partitions of the DF or RDD after a shuffle.
This implies that there will be 200 tasks required for each stage on that DF.
You can check this in the "Storage tab" of the Spark UI while the application is running.

Try setting "--conf spark.sql.shuffle.partitions=N" while submitting the application.
Try different values of N (start with N=no of total executor cores) and see how it goes.
If you hit upon an OOM try increasing the value.

Hope it helps.
Regards,
Srinath.



On Tue, Jun 12, 2018 at 6:13 PM Aakash Basu <[hidden email]> wrote:
Hi Srinath,

Thanks for such an elaborate reply. How to reduce the number of overall tasks?

I found, after simply repartitioning the csv file into 8 parts and converting it to parquet with snappy compression, helped not only in even distribution of the tasks on all nodes, but also helped in bringing the end to end job timing down to approx 0.8X of the prior run.

Query - Check if there are too many partitions in the RDD and tune it using spark.sql.shuffle.partitions. How to do this? Because I have a huge pipeline of memory and CPU intensive operations, which will ideally have innumerable spark transformations. At which level should I apply the same? My total tasks of an average dataset is going to around 2 millions (approx), is it a bad show? How can I control? Do I need to re-factor my entire Pipeline (series of codes) then?

Below is the new executors show while the updated run is taking place -




Thanks,
Aakash.

On Tue, Jun 12, 2018 at 2:14 PM, Srinath C <[hidden email]> wrote:
Hi Aakash,

Can you check the logs for Executor ID 0? It was restarted on worker 192.168.49.39 perhaps due to OOM or something.

Also observed that the number of tasks are high and unevenly distributed across the workers.
Check if there are too many partitions in the RDD and tune it using spark.sql.shuffle.partitions.
If the uneven distribution is still occurring then try repartitioning the data set using appropriate fields.

Hope that helps.
Regards,
Srinath.


On Tue, Jun 12, 2018 at 1:39 PM Aakash Basu <[hidden email]> wrote:
Yes, but when I did increase my executor memory, the spark job is going to halt after running a few steps, even though, the executor isn't dying.

Data - 60,000 data-points, 230 columns (60 MB data).

Any input on why it behaves like that?

On Tue, Jun 12, 2018 at 8:15 AM, Vamshi Talla <[hidden email]> wrote:
Aakash,

Like Jorn suggested, did you increase your test data set? If so, did you also update your executor-memory setting? It seems like you might exceeding the executor memory threshold.

Thanks 
Vamshi Talla

Sent from my iPhone

On Jun 11, 2018, at 8:54 AM, Aakash Basu <[hidden email]> wrote:

Hi Jorn/Others,

Thanks for your help. Now, data is being distributed in a proper way, but the challenge is, after a certain point, I'm getting this error, after which, everything stops moving ahead -

2018-06-11 18:14:56 ERROR TaskSchedulerImpl:70 - Lost executor 0 on 192.168.49.39: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.

<image.png>

How to avoid this scenario?

Thanks,
Aakash.

On Mon, Jun 11, 2018 at 4:16 PM, Jörn Franke <[hidden email]> wrote:
If it is in kB then spark will always schedule it to one node. As soon as it gets bigger you will see usage of more nodes.

Hence increase your testing Dataset .

On 11. Jun 2018, at 12:22, Aakash Basu <[hidden email]> wrote:

Jorn - The code is a series of feature engineering and model tuning operations. Too big to show. Yes, data volume is too low, it is in KBs, just tried to experiment with a small dataset before going for a large one.

Akshay - I ran with your suggested spark configurations, I get this (the node changed, but the problem persists) -

<image.png>



On Mon, Jun 11, 2018 at 3:16 PM, akshay naidu <[hidden email]> wrote:
try
 --num-executors 3 --executor-cores 4 --executor-memory 2G --conf spark.scheduler.mode=FAIR

On Mon, Jun 11, 2018 at 2:43 PM, Aakash Basu <[hidden email]> wrote:
Hi,

I have submitted a job on 4 node cluster, where I see, most of the operations happening at one of the worker nodes and other two are simply chilling out.

Picture below puts light on that -

How to properly distribute the load?

My cluster conf (4 node cluster [1 driver; 3 slaves]) -

Cores - 6
RAM - 12 GB
HDD - 60 GB

My Spark Submit command is as follows -

spark-submit --master spark://192.168.49.37:7077 --num-executors 3 --executor-cores 5 --executor-memory 4G /appdata/bblite-codebase/prima_diabetes_indians.py

What to do?

Thanks,
Aakash.






Reply | Threaded
Open this post in threaded view
|

Re: [Spark Optimization] Why is one node getting all the pressure?

Prem Sure
Can you try 
4 executors each of 4 core, 10GB and share the results.
optionally you can set the driver memory & cores.
OS & Yarn overhead factors we need to consider in calculating memory of each executor , looks you are trying to use full 12GB.

On Wed, Jun 13, 2018 at 3:32 PM, Aakash Basu <[hidden email]> wrote:
Thanks a lot Srinath!

Now, I increased my cluster size from 4 nodes to 6 nodes (1 driver, 5 worker).

Driver: 12 GB RAM, 6 Cores
2 Workers: 30 GB RAM, 14 Cores
3 Workers: 12 GB RAM, 6 Cores

When I submit a job, with this command (by calculating the ideal number of num executors, executor cores and executor memory) -

spark-submit --master spark://192.168.49.37:7077 --num-executors 5 --executor-cores 5 --executor-memory 10G --conf spark.storage.memoryFraction=0.2 --conf spark.sql.shuffle.partitions=25 /appdata/bblite-codebase/prima_diabetes_indians.py

The entire job runs extremely slowly, the zipWithIndex ideally takes less than 0.6 secs, but here, it takes much more than expected (Event timeline as follows) -


Job literally get's stalled after a few KBs of operation -


Where am I missing out?

Thanks,
Aakash.

On Wed, Jun 13, 2018 at 7:18 AM, Srinath C <[hidden email]> wrote:
Hi Akash,

Glad to know that repartition helped!

The overall tasks actually depends on the kind of operations you are performing and also on how the DF is partitioned.
I can't comment on the former but can provide some pointers on the latter.

Default value of spark.sql.shuffle.partitions is 200. There will be 200 partitions of the DF or RDD after a shuffle.
This implies that there will be 200 tasks required for each stage on that DF.
You can check this in the "Storage tab" of the Spark UI while the application is running.

Try setting "--conf spark.sql.shuffle.partitions=N" while submitting the application.
Try different values of N (start with N=no of total executor cores) and see how it goes.
If you hit upon an OOM try increasing the value.

Hope it helps.
Regards,
Srinath.



On Tue, Jun 12, 2018 at 6:13 PM Aakash Basu <[hidden email]> wrote:
Hi Srinath,

Thanks for such an elaborate reply. How to reduce the number of overall tasks?

I found, after simply repartitioning the csv file into 8 parts and converting it to parquet with snappy compression, helped not only in even distribution of the tasks on all nodes, but also helped in bringing the end to end job timing down to approx 0.8X of the prior run.

Query - Check if there are too many partitions in the RDD and tune it using spark.sql.shuffle.partitions. How to do this? Because I have a huge pipeline of memory and CPU intensive operations, which will ideally have innumerable spark transformations. At which level should I apply the same? My total tasks of an average dataset is going to around 2 millions (approx), is it a bad show? How can I control? Do I need to re-factor my entire Pipeline (series of codes) then?

Below is the new executors show while the updated run is taking place -




Thanks,
Aakash.

On Tue, Jun 12, 2018 at 2:14 PM, Srinath C <[hidden email]> wrote:
Hi Aakash,

Can you check the logs for Executor ID 0? It was restarted on worker 192.168.49.39 perhaps due to OOM or something.

Also observed that the number of tasks are high and unevenly distributed across the workers.
Check if there are too many partitions in the RDD and tune it using spark.sql.shuffle.partitions.
If the uneven distribution is still occurring then try repartitioning the data set using appropriate fields.

Hope that helps.
Regards,
Srinath.


On Tue, Jun 12, 2018 at 1:39 PM Aakash Basu <[hidden email]> wrote:
Yes, but when I did increase my executor memory, the spark job is going to halt after running a few steps, even though, the executor isn't dying.

Data - 60,000 data-points, 230 columns (60 MB data).

Any input on why it behaves like that?

On Tue, Jun 12, 2018 at 8:15 AM, Vamshi Talla <[hidden email]> wrote:
Aakash,

Like Jorn suggested, did you increase your test data set? If so, did you also update your executor-memory setting? It seems like you might exceeding the executor memory threshold.

Thanks 
Vamshi Talla

Sent from my iPhone

On Jun 11, 2018, at 8:54 AM, Aakash Basu <[hidden email]> wrote:

Hi Jorn/Others,

Thanks for your help. Now, data is being distributed in a proper way, but the challenge is, after a certain point, I'm getting this error, after which, everything stops moving ahead -

2018-06-11 18:14:56 ERROR TaskSchedulerImpl:70 - Lost executor 0 on 192.168.49.39: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.

<image.png>

How to avoid this scenario?

Thanks,
Aakash.

On Mon, Jun 11, 2018 at 4:16 PM, Jörn Franke <[hidden email]> wrote:
If it is in kB then spark will always schedule it to one node. As soon as it gets bigger you will see usage of more nodes.

Hence increase your testing Dataset .

On 11. Jun 2018, at 12:22, Aakash Basu <[hidden email]> wrote:

Jorn - The code is a series of feature engineering and model tuning operations. Too big to show. Yes, data volume is too low, it is in KBs, just tried to experiment with a small dataset before going for a large one.

Akshay - I ran with your suggested spark configurations, I get this (the node changed, but the problem persists) -

<image.png>



On Mon, Jun 11, 2018 at 3:16 PM, akshay naidu <[hidden email]> wrote:
try
 --num-executors 3 --executor-cores 4 --executor-memory 2G --conf spark.scheduler.mode=FAIR

On Mon, Jun 11, 2018 at 2:43 PM, Aakash Basu <[hidden email]> wrote:
Hi,

I have submitted a job on 4 node cluster, where I see, most of the operations happening at one of the worker nodes and other two are simply chilling out.

Picture below puts light on that -

How to properly distribute the load?

My cluster conf (4 node cluster [1 driver; 3 slaves]) -

Cores - 6
RAM - 12 GB
HDD - 60 GB

My Spark Submit command is as follows -

spark-submit --master spark://192.168.49.37:7077 --num-executors 3 --executor-cores 5 --executor-memory 4G /appdata/bblite-codebase/prima_diabetes_indians.py

What to do?

Thanks,
Aakash.