How to Scale Streaming Application to Multiple Workers

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

How to Scale Streaming Application to Multiple Workers

ArtemisDev
Hi,

We have a streaming application that read microbatch csv files and
involves the foreachBatch call.  Each microbatch can be processed
independently.  I noticed that only one worker node is being utilized. 
Is there anyway or any explicit method to distribute the batch work load
to multiple workers?  I would think Spark would execute foreachBatch
method on different workers since each batch can be treated as atomic?

Thanks!

ND


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: How to Scale Streaming Application to Multiple Workers

Lalwani, Jayesh
Parallelism of streaming depends on the input source. If you are getting one small file per microbatch, then Spark will read it in one worker. You can always repartition your data frame after reading it to increase the parallelism.

On 10/14/20, 11:26 PM, "Artemis User" <[hidden email]> wrote:

    CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.



    Hi,

    We have a streaming application that read microbatch csv files and
    involves the foreachBatch call.  Each microbatch can be processed
    independently.  I noticed that only one worker node is being utilized.
    Is there anyway or any explicit method to distribute the batch work load
    to multiple workers?  I would think Spark would execute foreachBatch
    method on different workers since each batch can be treated as atomic?

    Thanks!

    ND


    ---------------------------------------------------------------------
    To unsubscribe e-mail: [hidden email]



---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: How to Scale Streaming Application to Multiple Workers

ArtemisDev
Thanks for the input.  What I am interested is how to have multiple
workers to read and process the small files in parallel, and certainly
one file per worker at a time.  Partitioning data frame doesn't make
sense since the data frame is small already.

On 10/15/20 9:14 AM, Lalwani, Jayesh wrote:

> Parallelism of streaming depends on the input source. If you are getting one small file per microbatch, then Spark will read it in one worker. You can always repartition your data frame after reading it to increase the parallelism.
>
> On 10/14/20, 11:26 PM, "Artemis User" <[hidden email]> wrote:
>
>      CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.
>
>
>
>      Hi,
>
>      We have a streaming application that read microbatch csv files and
>      involves the foreachBatch call.  Each microbatch can be processed
>      independently.  I noticed that only one worker node is being utilized.
>      Is there anyway or any explicit method to distribute the batch work load
>      to multiple workers?  I would think Spark would execute foreachBatch
>      method on different workers since each batch can be treated as atomic?
>
>      Thanks!
>
>      ND
>
>
>      ---------------------------------------------------------------------
>      To unsubscribe e-mail: [hidden email]
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: How to Scale Streaming Application to Multiple Workers

Mich Talebzadeh
Hi,

This in general depends on how many topics you want to process at the same time and whether this is done on-premise running Spark in cluster mode.

Have you looked at Spark GUI to see if one worker (one JVM) is adequate for the task? 

Also how these small files are read and processed. Is it the same data microbatched?  Spark streaming does not process one event at a time which is in general I think what people call "Streaming." It instead processes groups of events. Each group is a "MicroBatch" that gets processed at the same time. 


What parameters (BatchInterval, WindowsLength,SlidingInterval) are you using?


Parallelism helps when you have reasonably large data and your cores are running on different sections of data in parallel.  Roughly how much do you have in every CSV file


HTH,


Mich


LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Thu, 15 Oct 2020 at 20:02, Artemis User <[hidden email]> wrote:
Thanks for the input.  What I am interested is how to have multiple
workers to read and process the small files in parallel, and certainly
one file per worker at a time.  Partitioning data frame doesn't make
sense since the data frame is small already.

On 10/15/20 9:14 AM, Lalwani, Jayesh wrote:
> Parallelism of streaming depends on the input source. If you are getting one small file per microbatch, then Spark will read it in one worker. You can always repartition your data frame after reading it to increase the parallelism.
>
> On 10/14/20, 11:26 PM, "Artemis User" <[hidden email]> wrote:
>
>      CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.
>
>
>
>      Hi,
>
>      We have a streaming application that read microbatch csv files and
>      involves the foreachBatch call.  Each microbatch can be processed
>      independently.  I noticed that only one worker node is being utilized.
>      Is there anyway or any explicit method to distribute the batch work load
>      to multiple workers?  I would think Spark would execute foreachBatch
>      method on different workers since each batch can be treated as atomic?
>
>      Thanks!
>
>      ND
>
>
>      ---------------------------------------------------------------------
>      To unsubscribe e-mail: [hidden email]
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: How to Scale Streaming Application to Multiple Workers

mmuru
File streaming in SS, you can try setting "maxFilesPerTrigger" per batch. The forEachBatch is an action, the output is written to various sinks. Are you doing any post transformation in forEachBatch?  

On Thu, Oct 15, 2020 at 1:24 PM Mich Talebzadeh <[hidden email]> wrote:
Hi,

This in general depends on how many topics you want to process at the same time and whether this is done on-premise running Spark in cluster mode.

Have you looked at Spark GUI to see if one worker (one JVM) is adequate for the task? 

Also how these small files are read and processed. Is it the same data microbatched?  Spark streaming does not process one event at a time which is in general I think what people call "Streaming." It instead processes groups of events. Each group is a "MicroBatch" that gets processed at the same time. 


What parameters (BatchInterval, WindowsLength,SlidingInterval) are you using?


Parallelism helps when you have reasonably large data and your cores are running on different sections of data in parallel.  Roughly how much do you have in every CSV file


HTH,


Mich


LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Thu, 15 Oct 2020 at 20:02, Artemis User <[hidden email]> wrote:
Thanks for the input.  What I am interested is how to have multiple
workers to read and process the small files in parallel, and certainly
one file per worker at a time.  Partitioning data frame doesn't make
sense since the data frame is small already.

On 10/15/20 9:14 AM, Lalwani, Jayesh wrote:
> Parallelism of streaming depends on the input source. If you are getting one small file per microbatch, then Spark will read it in one worker. You can always repartition your data frame after reading it to increase the parallelism.
>
> On 10/14/20, 11:26 PM, "Artemis User" <[hidden email]> wrote:
>
>      CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.
>
>
>
>      Hi,
>
>      We have a streaming application that read microbatch csv files and
>      involves the foreachBatch call.  Each microbatch can be processed
>      independently.  I noticed that only one worker node is being utilized.
>      Is there anyway or any explicit method to distribute the batch work load
>      to multiple workers?  I would think Spark would execute foreachBatch
>      method on different workers since each batch can be treated as atomic?
>
>      Thanks!
>
>      ND
>
>
>      ---------------------------------------------------------------------
>      To unsubscribe e-mail: [hidden email]
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: How to Scale Streaming Application to Multiple Workers

ArtemisDev

Thank you all for the responses.  Basically we were dealing with file source (not Kafka, therefore no topics involved) and dumping csv files (about 1000 lines, 300KB per file) at a pretty high speed (10 - 15 files/second) one at a time to the stream source directory.  We have a Spark 3.0.1. cluster configured with 4 workers, each one is allocated with 4 cores.  We tried numerous options, including setting the spark.streaming.dynamicAllocation.enabled parameter to true, and setting the maxFilesPerTrigger to 1, but were unable to scale the #cores*#workers >4.

What I am trying to understand is that what makes spark to allocate jobs to more workers?  Is it based on the size of the data frame, batch sizes or trigger intervals?  Looks like the Spark master scheduler doesn't consider the number of input files waiting to be processed, only consider the data size (i.e. the size of data frames) that has been read or already imported, before allocating new workers.  If that that case, then Spark really missed the point and wasn't really designed for real-time streaming applications.  I could write my own stream processor that would distribute the load based on the number of input files, given the fact, that each batch query is atomic/independent from each other..

Thanks in advance for your comment/input.

ND

On 10/15/20 7:13 PM, muru wrote:
File streaming in SS, you can try setting "maxFilesPerTrigger" per batch. The forEachBatch is an action, the output is written to various sinks. Are you doing any post transformation in forEachBatch?  

On Thu, Oct 15, 2020 at 1:24 PM Mich Talebzadeh <[hidden email]> wrote:
Hi,

This in general depends on how many topics you want to process at the same time and whether this is done on-premise running Spark in cluster mode.

Have you looked at Spark GUI to see if one worker (one JVM) is adequate for the task? 

Also how these small files are read and processed. Is it the same data microbatched?  Spark streaming does not process one event at a time which is in general I think what people call "Streaming." It instead processes groups of events. Each group is a "MicroBatch" that gets processed at the same time. 


What parameters (BatchInterval, WindowsLength,SlidingInterval) are you using?


Parallelism helps when you have reasonably large data and your cores are running on different sections of data in parallel.  Roughly how much do you have in every CSV file


HTH,


Mich


LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Thu, 15 Oct 2020 at 20:02, Artemis User <[hidden email]> wrote:
Thanks for the input.  What I am interested is how to have multiple
workers to read and process the small files in parallel, and certainly
one file per worker at a time.  Partitioning data frame doesn't make
sense since the data frame is small already.

On 10/15/20 9:14 AM, Lalwani, Jayesh wrote:
> Parallelism of streaming depends on the input source. If you are getting one small file per microbatch, then Spark will read it in one worker. You can always repartition your data frame after reading it to increase the parallelism.
>
> On 10/14/20, 11:26 PM, "Artemis User" <[hidden email]> wrote:
>
>      CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.
>
>
>
>      Hi,
>
>      We have a streaming application that read microbatch csv files and
>      involves the foreachBatch call.  Each microbatch can be processed
>      independently.  I noticed that only one worker node is being utilized.
>      Is there anyway or any explicit method to distribute the batch work load
>      to multiple workers?  I would think Spark would execute foreachBatch
>      method on different workers since each batch can be treated as atomic?
>
>      Thanks!
>
>      ND
>
>
>      ---------------------------------------------------------------------
>      To unsubscribe e-mail: [hidden email]
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: How to Scale Streaming Application to Multiple Workers

Lalwani, Jayesh

With a file based source, Spark is going to take maximum use of memory before it tries to scaling to more nodes. Parallelization adds overhead. This overhead is negligible if your data is several gigs or above. If your entire data can fit into memory of one node, then it’s better to process everything in one node. Forcing Spark to parallelize processing that can be done in a single node will reduce throughput.

 

You are right, though. Spark is overkill for a simple transformation for a 300KB file. A lot of people implement simple transformations using serverless AWS Lambda. Spark’s power comes in when you are joining streaming sources and/or joining streaming sources with batch sources. It’s not that Spark can’t do simple transformations. It’s perfectly capable of doing it. It make sense to implement simple transformations in Spark if you have a data pipeline that is implemented in Spark, and this ingestion is one of many other things that you do with Spark. But, if your entire pipeline consists of ingestion of small files, then you might be better off with simpler solutions.

 

From: Artemis User <[hidden email]>
Date: Friday, October 16, 2020 at 2:19 PM
Cc: user <[hidden email]>
Subject: RE: [EXTERNAL] How to Scale Streaming Application to Multiple Workers

 

CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.

 

Thank you all for the responses.  Basically we were dealing with file source (not Kafka, therefore no topics involved) and dumping csv files (about 1000 lines, 300KB per file) at a pretty high speed (10 - 15 files/second) one at a time to the stream source directory.  We have a Spark 3.0.1. cluster configured with 4 workers, each one is allocated with 4 cores.  We tried numerous options, including setting the spark.streaming.dynamicAllocation.enabled parameter to true, and setting the maxFilesPerTrigger to 1, but were unable to scale the #cores*#workers >4.

What I am trying to understand is that what makes spark to allocate jobs to more workers?  Is it based on the size of the data frame, batch sizes or trigger intervals?  Looks like the Spark master scheduler doesn't consider the number of input files waiting to be processed, only consider the data size (i.e. the size of data frames) that has been read or already imported, before allocating new workers.  If that that case, then Spark really missed the point and wasn't really designed for real-time streaming applications.  I could write my own stream processor that would distribute the load based on the number of input files, given the fact, that each batch query is atomic/independent from each other..

Thanks in advance for your comment/input.

ND

On 10/15/20 7:13 PM, muru wrote:

File streaming in SS, you can try setting "maxFilesPerTrigger" per batch. The forEachBatch is an action, the output is written to various sinks. Are you doing any post transformation in forEachBatch?  

 

On Thu, Oct 15, 2020 at 1:24 PM Mich Talebzadeh <[hidden email]> wrote:

Hi,

 

This in general depends on how many topics you want to process at the same time and whether this is done on-premise running Spark in cluster mode.

 

Have you looked at Spark GUI to see if one worker (one JVM) is adequate for the task? 

 

Also how these small files are read and processed. Is it the same data microbatched?  Spark streaming does not process one event at a time which is in general I think what people call "Streaming." It instead processes groups of events. Each group is a "MicroBatch" that gets processed at the same time. 


What parameters (BatchInterval, WindowsLength,SlidingInterval) are you using?

 

Parallelism helps when you have reasonably large data and your cores are running on different sections of data in parallel.  Roughly how much do you have in every CSV file

 

HTH,

 

Mich

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

 

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

 

 

On Thu, 15 Oct 2020 at 20:02, Artemis User <[hidden email]> wrote:

Thanks for the input.  What I am interested is how to have multiple
workers to read and process the small files in parallel, and certainly
one file per worker at a time.  Partitioning data frame doesn't make
sense since the data frame is small already.

On 10/15/20 9:14 AM, Lalwani, Jayesh wrote:
> Parallelism of streaming depends on the input source. If you are getting one small file per microbatch, then Spark will read it in one worker. You can always repartition your data frame after reading it to increase the parallelism.
>
> On 10/14/20, 11:26 PM, "Artemis User" <[hidden email]> wrote:
>
>      CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.
>
>
>
>      Hi,
>
>      We have a streaming application that read microbatch csv files and
>      involves the foreachBatch call.  Each microbatch can be processed
>      independently.  I noticed that only one worker node is being utilized.
>      Is there anyway or any explicit method to distribute the batch work load
>      to multiple workers?  I would think Spark would execute foreachBatch
>      method on different workers since each batch can be treated as atomic?
>
>      Thanks!
>
>      ND
>
>
>      ---------------------------------------------------------------------
>      To unsubscribe e-mail: [hidden email]
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: How to Scale Streaming Application to Multiple Workers

ArtemisDev

We can't use AWS since the target production has to be on-prem.  The reason we choose Spark is because of its ML libraries.  Lambda would be a great model for stream processing from a functional programming perspective.  Not sure how well can it be integrated with Spark ML or other ML libraries.  Any suggestions would be highly appreciated..

ND

On 10/16/20 2:49 PM, Lalwani, Jayesh wrote:

With a file based source, Spark is going to take maximum use of memory before it tries to scaling to more nodes. Parallelization adds overhead. This overhead is negligible if your data is several gigs or above. If your entire data can fit into memory of one node, then it’s better to process everything in one node. Forcing Spark to parallelize processing that can be done in a single node will reduce throughput.

 

You are right, though. Spark is overkill for a simple transformation for a 300KB file. A lot of people implement simple transformations using serverless AWS Lambda. Spark’s power comes in when you are joining streaming sources and/or joining streaming sources with batch sources. It’s not that Spark can’t do simple transformations. It’s perfectly capable of doing it. It make sense to implement simple transformations in Spark if you have a data pipeline that is implemented in Spark, and this ingestion is one of many other things that you do with Spark. But, if your entire pipeline consists of ingestion of small files, then you might be better off with simpler solutions.

 

From: Artemis User [hidden email]
Date: Friday, October 16, 2020 at 2:19 PM
Cc: user [hidden email]
Subject: RE: [EXTERNAL] How to Scale Streaming Application to Multiple Workers

 

CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.

 

Thank you all for the responses.  Basically we were dealing with file source (not Kafka, therefore no topics involved) and dumping csv files (about 1000 lines, 300KB per file) at a pretty high speed (10 - 15 files/second) one at a time to the stream source directory.  We have a Spark 3.0.1. cluster configured with 4 workers, each one is allocated with 4 cores.  We tried numerous options, including setting the spark.streaming.dynamicAllocation.enabled parameter to true, and setting the maxFilesPerTrigger to 1, but were unable to scale the #cores*#workers >4.

What I am trying to understand is that what makes spark to allocate jobs to more workers?  Is it based on the size of the data frame, batch sizes or trigger intervals?  Looks like the Spark master scheduler doesn't consider the number of input files waiting to be processed, only consider the data size (i.e. the size of data frames) that has been read or already imported, before allocating new workers.  If that that case, then Spark really missed the point and wasn't really designed for real-time streaming applications.  I could write my own stream processor that would distribute the load based on the number of input files, given the fact, that each batch query is atomic/independent from each other..

Thanks in advance for your comment/input.

ND

On 10/15/20 7:13 PM, muru wrote:

File streaming in SS, you can try setting "maxFilesPerTrigger" per batch. The forEachBatch is an action, the output is written to various sinks. Are you doing any post transformation in forEachBatch?  

 

On Thu, Oct 15, 2020 at 1:24 PM Mich Talebzadeh <[hidden email]> wrote:

Hi,

 

This in general depends on how many topics you want to process at the same time and whether this is done on-premise running Spark in cluster mode.

 

Have you looked at Spark GUI to see if one worker (one JVM) is adequate for the task? 

 

Also how these small files are read and processed. Is it the same data microbatched?  Spark streaming does not process one event at a time which is in general I think what people call "Streaming." It instead processes groups of events. Each group is a "MicroBatch" that gets processed at the same time. 


What parameters (BatchInterval, WindowsLength,SlidingInterval) are you using?

 

Parallelism helps when you have reasonably large data and your cores are running on different sections of data in parallel.  Roughly how much do you have in every CSV file

 

HTH,

 

Mich

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

 

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

 

 

On Thu, 15 Oct 2020 at 20:02, Artemis User <[hidden email]> wrote:

Thanks for the input.  What I am interested is how to have multiple
workers to read and process the small files in parallel, and certainly
one file per worker at a time.  Partitioning data frame doesn't make
sense since the data frame is small already.

On 10/15/20 9:14 AM, Lalwani, Jayesh wrote:
> Parallelism of streaming depends on the input source. If you are getting one small file per microbatch, then Spark will read it in one worker. You can always repartition your data frame after reading it to increase the parallelism.
>
> On 10/14/20, 11:26 PM, "Artemis User" <[hidden email]> wrote:
>
>      CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.
>
>
>
>      Hi,
>
>      We have a streaming application that read microbatch csv files and
>      involves the foreachBatch call.  Each microbatch can be processed
>      independently.  I noticed that only one worker node is being utilized.
>      Is there anyway or any explicit method to distribute the batch work load
>      to multiple workers?  I would think Spark would execute foreachBatch
>      method on different workers since each batch can be treated as atomic?
>
>      Thanks!
>
>      ND
>
>
>      ---------------------------------------------------------------------
>      To unsubscribe e-mail: [hidden email]
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: How to Scale Streaming Application to Multiple Workers

Lalwani, Jayesh

One you are talking about ML, you aren’t talking about “simple” transformations. Spark is a good platform to do ML on. You can easily configure Spark to read your data in one node, and then run ML transformations in parallel

 

From: Artemis User <[hidden email]>
Date: Friday, October 16, 2020 at 3:52 PM
To: "[hidden email]" <[hidden email]>
Subject: RE: [EXTERNAL] How to Scale Streaming Application to Multiple Workers

 

CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.

 

We can't use AWS since the target production has to be on-prem.  The reason we choose Spark is because of its ML libraries.  Lambda would be a great model for stream processing from a functional programming perspective.  Not sure how well can it be integrated with Spark ML or other ML libraries.  Any suggestions would be highly appreciated..

ND

On 10/16/20 2:49 PM, Lalwani, Jayesh wrote:

With a file based source, Spark is going to take maximum use of memory before it tries to scaling to more nodes. Parallelization adds overhead. This overhead is negligible if your data is several gigs or above. If your entire data can fit into memory of one node, then it’s better to process everything in one node. Forcing Spark to parallelize processing that can be done in a single node will reduce throughput.

 

You are right, though. Spark is overkill for a simple transformation for a 300KB file. A lot of people implement simple transformations using serverless AWS Lambda. Spark’s power comes in when you are joining streaming sources and/or joining streaming sources with batch sources. It’s not that Spark can’t do simple transformations. It’s perfectly capable of doing it. It make sense to implement simple transformations in Spark if you have a data pipeline that is implemented in Spark, and this ingestion is one of many other things that you do with Spark. But, if your entire pipeline consists of ingestion of small files, then you might be better off with simpler solutions.

 

From: Artemis User [hidden email]
Date: Friday, October 16, 2020 at 2:19 PM
Cc: user [hidden email]
Subject: RE: [EXTERNAL] How to Scale Streaming Application to Multiple Workers

 

CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.

 

Thank you all for the responses.  Basically we were dealing with file source (not Kafka, therefore no topics involved) and dumping csv files (about 1000 lines, 300KB per file) at a pretty high speed (10 - 15 files/second) one at a time to the stream source directory.  We have a Spark 3.0.1. cluster configured with 4 workers, each one is allocated with 4 cores.  We tried numerous options, including setting the spark.streaming.dynamicAllocation.enabled parameter to true, and setting the maxFilesPerTrigger to 1, but were unable to scale the #cores*#workers >4.

What I am trying to understand is that what makes spark to allocate jobs to more workers?  Is it based on the size of the data frame, batch sizes or trigger intervals?  Looks like the Spark master scheduler doesn't consider the number of input files waiting to be processed, only consider the data size (i.e. the size of data frames) that has been read or already imported, before allocating new workers.  If that that case, then Spark really missed the point and wasn't really designed for real-time streaming applications.  I could write my own stream processor that would distribute the load based on the number of input files, given the fact, that each batch query is atomic/independent from each other..

Thanks in advance for your comment/input.

ND

On 10/15/20 7:13 PM, muru wrote:

File streaming in SS, you can try setting "maxFilesPerTrigger" per batch. The forEachBatch is an action, the output is written to various sinks. Are you doing any post transformation in forEachBatch?  

 

On Thu, Oct 15, 2020 at 1:24 PM Mich Talebzadeh <[hidden email]> wrote:

Hi,

 

This in general depends on how many topics you want to process at the same time and whether this is done on-premise running Spark in cluster mode.

 

Have you looked at Spark GUI to see if one worker (one JVM) is adequate for the task? 

 

Also how these small files are read and processed. Is it the same data microbatched?  Spark streaming does not process one event at a time which is in general I think what people call "Streaming." It instead processes groups of events. Each group is a "MicroBatch" that gets processed at the same time. 


What parameters (BatchInterval, WindowsLength,SlidingInterval) are you using?

 

Parallelism helps when you have reasonably large data and your cores are running on different sections of data in parallel.  Roughly how much do you have in every CSV file

 

HTH,

 

Mich

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

 

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

 

 

On Thu, 15 Oct 2020 at 20:02, Artemis User <[hidden email]> wrote:

Thanks for the input.  What I am interested is how to have multiple
workers to read and process the small files in parallel, and certainly
one file per worker at a time.  Partitioning data frame doesn't make
sense since the data frame is small already.

On 10/15/20 9:14 AM, Lalwani, Jayesh wrote:
> Parallelism of streaming depends on the input source. If you are getting one small file per microbatch, then Spark will read it in one worker. You can always repartition your data frame after reading it to increase the parallelism.
>
> On 10/14/20, 11:26 PM, "Artemis User" <[hidden email]> wrote:
>
>      CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.
>
>
>
>      Hi,
>
>      We have a streaming application that read microbatch csv files and
>      involves the foreachBatch call.  Each microbatch can be processed
>      independently.  I noticed that only one worker node is being utilized.
>      Is there anyway or any explicit method to distribute the batch work load
>      to multiple workers?  I would think Spark would execute foreachBatch
>      method on different workers since each batch can be treated as atomic?
>
>      Thanks!
>
>      ND
>
>
>      ---------------------------------------------------------------------
>      To unsubscribe e-mail: [hidden email]
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: How to Scale Streaming Application to Multiple Workers

ArtemisDev

That's exactly my question was, whether Spark can do parallel read, not data-frame driven parallel query or processing, because our ML query is very simple, but the data ingestion part seams to be the bottleneck.  Can someone confirm that Spark just can't do parallel read?  If not, what would be the alternative?  Creating our own customized scheduler or listener?

Thanks!

On 10/16/20 4:25 PM, Lalwani, Jayesh wrote:

One you are talking about ML, you aren’t talking about “simple” transformations. Spark is a good platform to do ML on. You can easily configure Spark to read your data in one node, and then run ML transformations in parallel

 

From: Artemis User [hidden email]
Date: Friday, October 16, 2020 at 3:52 PM
To: [hidden email] [hidden email]
Subject: RE: [EXTERNAL] How to Scale Streaming Application to Multiple Workers

 

CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.

 

We can't use AWS since the target production has to be on-prem.  The reason we choose Spark is because of its ML libraries.  Lambda would be a great model for stream processing from a functional programming perspective.  Not sure how well can it be integrated with Spark ML or other ML libraries.  Any suggestions would be highly appreciated..

ND

On 10/16/20 2:49 PM, Lalwani, Jayesh wrote:

With a file based source, Spark is going to take maximum use of memory before it tries to scaling to more nodes. Parallelization adds overhead. This overhead is negligible if your data is several gigs or above. If your entire data can fit into memory of one node, then it’s better to process everything in one node. Forcing Spark to parallelize processing that can be done in a single node will reduce throughput.

 

You are right, though. Spark is overkill for a simple transformation for a 300KB file. A lot of people implement simple transformations using serverless AWS Lambda. Spark’s power comes in when you are joining streaming sources and/or joining streaming sources with batch sources. It’s not that Spark can’t do simple transformations. It’s perfectly capable of doing it. It make sense to implement simple transformations in Spark if you have a data pipeline that is implemented in Spark, and this ingestion is one of many other things that you do with Spark. But, if your entire pipeline consists of ingestion of small files, then you might be better off with simpler solutions.

 

From: Artemis User [hidden email]
Date: Friday, October 16, 2020 at 2:19 PM
Cc: user [hidden email]
Subject: RE: [EXTERNAL] How to Scale Streaming Application to Multiple Workers

 

CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.

 

Thank you all for the responses.  Basically we were dealing with file source (not Kafka, therefore no topics involved) and dumping csv files (about 1000 lines, 300KB per file) at a pretty high speed (10 - 15 files/second) one at a time to the stream source directory.  We have a Spark 3.0.1. cluster configured with 4 workers, each one is allocated with 4 cores.  We tried numerous options, including setting the spark.streaming.dynamicAllocation.enabled parameter to true, and setting the maxFilesPerTrigger to 1, but were unable to scale the #cores*#workers >4.

What I am trying to understand is that what makes spark to allocate jobs to more workers?  Is it based on the size of the data frame, batch sizes or trigger intervals?  Looks like the Spark master scheduler doesn't consider the number of input files waiting to be processed, only consider the data size (i.e. the size of data frames) that has been read or already imported, before allocating new workers.  If that that case, then Spark really missed the point and wasn't really designed for real-time streaming applications.  I could write my own stream processor that would distribute the load based on the number of input files, given the fact, that each batch query is atomic/independent from each other..

Thanks in advance for your comment/input.

ND

On 10/15/20 7:13 PM, muru wrote:

File streaming in SS, you can try setting "maxFilesPerTrigger" per batch. The forEachBatch is an action, the output is written to various sinks. Are you doing any post transformation in forEachBatch?  

 

On Thu, Oct 15, 2020 at 1:24 PM Mich Talebzadeh <[hidden email]> wrote:

Hi,

 

This in general depends on how many topics you want to process at the same time and whether this is done on-premise running Spark in cluster mode.

 

Have you looked at Spark GUI to see if one worker (one JVM) is adequate for the task? 

 

Also how these small files are read and processed. Is it the same data microbatched?  Spark streaming does not process one event at a time which is in general I think what people call "Streaming." It instead processes groups of events. Each group is a "MicroBatch" that gets processed at the same time. 


What parameters (BatchInterval, WindowsLength,SlidingInterval) are you using?

 

Parallelism helps when you have reasonably large data and your cores are running on different sections of data in parallel.  Roughly how much do you have in every CSV file

 

HTH,

 

Mich

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

 

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

 

 

On Thu, 15 Oct 2020 at 20:02, Artemis User <[hidden email]> wrote:

Thanks for the input.  What I am interested is how to have multiple
workers to read and process the small files in parallel, and certainly
one file per worker at a time.  Partitioning data frame doesn't make
sense since the data frame is small already.

On 10/15/20 9:14 AM, Lalwani, Jayesh wrote:
> Parallelism of streaming depends on the input source. If you are getting one small file per microbatch, then Spark will read it in one worker. You can always repartition your data frame after reading it to increase the parallelism.
>
> On 10/14/20, 11:26 PM, "Artemis User" <[hidden email]> wrote:
>
>      CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.
>
>
>
>      Hi,
>
>      We have a streaming application that read microbatch csv files and
>      involves the foreachBatch call.  Each microbatch can be processed
>      independently.  I noticed that only one worker node is being utilized.
>      Is there anyway or any explicit method to distribute the batch work load
>      to multiple workers?  I would think Spark would execute foreachBatch
>      method on different workers since each batch can be treated as atomic?
>
>      Thanks!
>
>      ND
>
>
>      ---------------------------------------------------------------------
>      To unsubscribe e-mail: [hidden email]
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: How to Scale Streaming Application to Multiple Workers

ArtemisDev
In reply to this post by mmuru

Apparently the number set in maxFilesPerTrigger doesn't have any effect on scaling at all.  Again, if all file reading is done by a single node, the Spark streaming isn't really designed for doing real-time processing at all, because that single node becomes a bottleneck...

On 10/16/20 3:47 PM, muru wrote:
You should set the maxFilesPerTrigger to be more than 1 if you want to process a lot of files otherwise Spark will process one file at a time. Since the file size is 300KB and 4 cores/worker, you should set the maxFilesPerTrigger = 4 or more.  (1 core per file)
Try out and let me know if it helps.    

On Fri, Oct 16, 2020 at 10:37 AM Artemis User <[hidden email]> wrote:

Thank you all for the responses.  Basically we were dealing with file source (not Kafka, therefore no topics involved) and dumping csv files (about 1000 lines, 300KB per file) at a pretty high speed (10 - 15 files/second) one at a time to the stream source directory.  We have a Spark 3.0.1. cluster configured with 4 workers, each one is allocated with 4 cores.  We tried numerous options, including setting the spark.streaming.dynamicAllocation.enabled parameter to true, and setting the maxFilesPerTrigger to 1, but were unable to scale the #cores*#workers >4.

What I am trying to understand is that what makes spark to allocate jobs to more workers?  Is it based on the size of the data frame, batch sizes or trigger intervals?  Looks like the Spark master scheduler doesn't consider the number of input files waiting to be processed, only consider the data size (i.e. the size of data frames) that has been read or already imported, before allocating new workers.  If that that case, then Spark really missed the point and wasn't really designed for real-time streaming applications.  I could write my own stream processor that would distribute the load based on the number of input files, given the fact, that each batch query is atomic/independent from each other..

Thanks in advance for your comment/input.

ND

On 10/15/20 7:13 PM, muru wrote:
File streaming in SS, you can try setting "maxFilesPerTrigger" per batch. The forEachBatch is an action, the output is written to various sinks. Are you doing any post transformation in forEachBatch?  

On Thu, Oct 15, 2020 at 1:24 PM Mich Talebzadeh <[hidden email]> wrote:
Hi,

This in general depends on how many topics you want to process at the same time and whether this is done on-premise running Spark in cluster mode.

Have you looked at Spark GUI to see if one worker (one JVM) is adequate for the task? 

Also how these small files are read and processed. Is it the same data microbatched?  Spark streaming does not process one event at a time which is in general I think what people call "Streaming." It instead processes groups of events. Each group is a "MicroBatch" that gets processed at the same time. 


What parameters (BatchInterval, WindowsLength,SlidingInterval) are you using?


Parallelism helps when you have reasonably large data and your cores are running on different sections of data in parallel.  Roughly how much do you have in every CSV file


HTH,


Mich


LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Thu, 15 Oct 2020 at 20:02, Artemis User <[hidden email]> wrote:
Thanks for the input.  What I am interested is how to have multiple
workers to read and process the small files in parallel, and certainly
one file per worker at a time.  Partitioning data frame doesn't make
sense since the data frame is small already.

On 10/15/20 9:14 AM, Lalwani, Jayesh wrote:
> Parallelism of streaming depends on the input source. If you are getting one small file per microbatch, then Spark will read it in one worker. You can always repartition your data frame after reading it to increase the parallelism.
>
> On 10/14/20, 11:26 PM, "Artemis User" <[hidden email]> wrote:
>
>      CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.
>
>
>
>      Hi,
>
>      We have a streaming application that read microbatch csv files and
>      involves the foreachBatch call.  Each microbatch can be processed
>      independently.  I noticed that only one worker node is being utilized.
>      Is there anyway or any explicit method to distribute the batch work load
>      to multiple workers?  I would think Spark would execute foreachBatch
>      method on different workers since each batch can be treated as atomic?
>
>      Thanks!
>
>      ND
>
>
>      ---------------------------------------------------------------------
>      To unsubscribe e-mail: [hidden email]
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]