The trigger interval in spark structured streaming

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

The trigger interval in spark structured streaming

Mich Talebzadeh
One thing I noticed is that when the trigger interval in foreachBatch is set to something low (in this case 2 seconds, equivalent to the batch interval that source sends data to Kafka topic (every 2 seconds)

trigger(processingTime='2 seconds')
    
Spark sends the warning that the queue is falling behind

```
batchId is 22,  rows is 40
21/03/18 16:29:05 WARN org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 2000 milliseconds, but spent 13686 milliseconds
batchId is 23,  rows is 40
21/03/18 16:29:21 WARN org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 2000 milliseconds, but spent 15316 milliseconds
batchId is 24,  rows is 40
```
So, assuming  that the batch interval is somehow fixed, one needs to look at how to adjust the resources that process the topic in a timely manner.

Any comments welcome



   view my Linkedln profile

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: The trigger interval in spark structured streaming

Lalwani, Jayesh

Short Answer: Yes

 

Long answer: You need to understand your load characteristics to size your cluster. Most applications have 3 components to their load. A) a predictable amount of expected load. This usually changes based on time of day, and day of week The main thing is that it’s predictable. B) unpredictable spikes. The main thing about spikes that they are periods of high traffic but are short lived c) sustained high  load: this is load that occurs because of some event (for example, retailers have higher load during Christmas) This might be semi-predictable: You know it’s coming, but you don’t know how big it will be. Also, unlike spikes, they will be sustained

 

What you want to do is size your cluster and tune your application so you can handle 120% of your predictable load without falling behind. If your load is unusually spiky, you might want to increase that to 150% or 200%. This means that when a spike hits, you have excess capacity to handle the load.

 

Also, you want to make sure that if there is too much load, requests get queued up instead of crashing your streaming app. You don’t want an OOM because Spark tried to read 100 million records in one batch. There is a setting called maxRatePerPartition that you can set. This limits the number of records that will be read in one batch. You should set this to be to be 80% of the maximum batch size your cluster can handle.  If you do this, and if you get a huge spike, your application will survive the spike, but you might have large latency until you get back to normal.

 

If you are expecting to receive high loads for a sustained amount of time, you want to implement some sort of autoscaling that adds nodes to your cluster, and increases partitioning of the data. Autoscaling cannot react fast enough for momentary spikes, but it can prevent your system from being overwhelmed with sustained high loads

 

From: Mich Talebzadeh <[hidden email]>
Date: Friday, March 26, 2021 at 1:44 PM
To: "user @spark" <[hidden email]>
Subject: [EXTERNAL] The trigger interval in spark structured streaming

 

CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.

 

One thing I noticed is that when the trigger interval in foreachBatch is set to something low (in this case 2 seconds, equivalent to the batch interval that source sends data to Kafka topic (every 2 seconds)

 

trigger(processingTime='2 seconds')

    

Spark sends the warning that the queue is falling behind

 

```

batchId is 22,  rows is 40

21/03/18 16:29:05 WARN org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 2000 milliseconds, but spent 13686 milliseconds

batchId is 23,  rows is 40

21/03/18 16:29:21 WARN org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 2000 milliseconds, but spent 15316 milliseconds

batchId is 24,  rows is 40

```

So, assuming  that the batch interval is somehow fixed, one needs to look at how to adjust the resources that process the topic in a timely manner.

 

Any comments welcome

 

 

   view my Linkedln profile

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: The trigger interval in spark structured streaming

Mich Talebzadeh
Thanks for the insight. Appreciated

Well deploying IaaS for example using Google Dataproc clusters for handling Spark will certainly address both the size of the cluster and the MIPS power provided by each node of the cluster(that can be adjusted by adding more resources to the existing nodes, albeit statically). 

However, as usual your mileage varies because as we all concur, Spark itself is part of the solution with streaming data and amount of tickers (securities in our case) and of course processing of your streaming data within the micro-batch itself. You will fall behind if your processing time exceeds your injection time and finally the rate that you interact with the sink, in our case Google BigQuery database (serveless) will also add to the throughput calculation. I find Spark GUI a great help to analyse the details and plan the workload overall

With regard to anticipating the workload, we always start with MVP and the anticipated average volume/traffic and rev it up by one standard deviation(1.646)  of the load so theoretically we should have 90% Confidence Level that we can accommodate the additional workload.

This normally works except for the tail of the distribution that we have to improvise.

Thanks

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Fri, 26 Mar 2021 at 18:15, Lalwani, Jayesh <[hidden email]> wrote:

Short Answer: Yes

 

Long answer: You need to understand your load characteristics to size your cluster. Most applications have 3 components to their load. A) a predictable amount of expected load. This usually changes based on time of day, and day of week The main thing is that it’s predictable. B) unpredictable spikes. The main thing about spikes that they are periods of high traffic but are short lived c) sustained high  load: this is load that occurs because of some event (for example, retailers have higher load during Christmas) This might be semi-predictable: You know it’s coming, but you don’t know how big it will be. Also, unlike spikes, they will be sustained

 

What you want to do is size your cluster and tune your application so you can handle 120% of your predictable load without falling behind. If your load is unusually spiky, you might want to increase that to 150% or 200%. This means that when a spike hits, you have excess capacity to handle the load.

 

Also, you want to make sure that if there is too much load, requests get queued up instead of crashing your streaming app. You don’t want an OOM because Spark tried to read 100 million records in one batch. There is a setting called maxRatePerPartition that you can set. This limits the number of records that will be read in one batch. You should set this to be to be 80% of the maximum batch size your cluster can handle.  If you do this, and if you get a huge spike, your application will survive the spike, but you might have large latency until you get back to normal.

 

If you are expecting to receive high loads for a sustained amount of time, you want to implement some sort of autoscaling that adds nodes to your cluster, and increases partitioning of the data. Autoscaling cannot react fast enough for momentary spikes, but it can prevent your system from being overwhelmed with sustained high loads

 

From: Mich Talebzadeh <[hidden email]>
Date: Friday, March 26, 2021 at 1:44 PM
To: "user @spark" <[hidden email]>
Subject: [EXTERNAL] The trigger interval in spark structured streaming

 

CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.

 

One thing I noticed is that when the trigger interval in foreachBatch is set to something low (in this case 2 seconds, equivalent to the batch interval that source sends data to Kafka topic (every 2 seconds)

 

trigger(processingTime='2 seconds')

    

Spark sends the warning that the queue is falling behind

 

```

batchId is 22,  rows is 40

21/03/18 16:29:05 WARN org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 2000 milliseconds, but spent 13686 milliseconds

batchId is 23,  rows is 40

21/03/18 16:29:21 WARN org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 2000 milliseconds, but spent 15316 milliseconds

batchId is 24,  rows is 40

```

So, assuming  that the batch interval is somehow fixed, one needs to look at how to adjust the resources that process the topic in a timely manner.

 

Any comments welcome

 

 

 Image removed by sender.  view my Linkedln profile

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: The trigger interval in spark structured streaming

Mich Talebzadeh
On the subject of workload management, the usual thing to watch is

Processing Time + Reserved Capacity < Batch Interval

We are aware of Batch Interval, i.e. the rate at which the upstream source sends messages through Kafka. We can start by assuming that the rate of increase in the number of messages processed (processing time) will require an additional reserved capacity. We can anticipate a heuristic 70% (~1SD) increase on the processing time so in theory we  should be able to handle all this work below the batch interval.

The parameter which I think many deploy is  spark.streaming.backpressure.enabled. The central idea is that if a component is struggling to keep up, it should communicate to upstream components and get them to reduce the load. In the context of Spark Streaming, the receiver is the upstream component which gets notified if the executors cannot keep up. There are a number of occasions this will  (not just necessarily the spike in the incoming messages). For example: 
  • Streaming Source: Unexpected short burst of incoming messages in source system
  • YARN: Lost Spark executors due to node(s) failure
  • External Sink System: High load on external systems such as BigQuery etc
Without backpressure, microbatches queue up over time and the scheduling delay increases.

The next parameter I think of is sparkStreamingBackpressurePidMinRate. It is the total records per second. It relies on spark.streaming.kafka.maxRatePerPartition, (not set), which is the maximum rate (number of records per second) at which messages will be read from each Kafka partition.

So  sparkStreamingBackpressurePidMinRate starts with

n (total number of kafka partitions) * spark.streaming.kafka.maxRatePerPartition * Batch Interval

I think one needs to review both settings for spark.streaming.kafka.maxRatePerPartition and sparkStreamingBackpressurePidMinRate periodally as these can change over a period of time.

Of course I stand corrected.

HTH


   view my Linkedin profile

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Fri, 26 Mar 2021 at 18:50, Mich Talebzadeh <[hidden email]> wrote:
Thanks for the insight. Appreciated

Well deploying IaaS for example using Google Dataproc clusters for handling Spark will certainly address both the size of the cluster and the MIPS power provided by each node of the cluster(that can be adjusted by adding more resources to the existing nodes, albeit statically). 

However, as usual your mileage varies because as we all concur, Spark itself is part of the solution with streaming data and amount of tickers (securities in our case) and of course processing of your streaming data within the micro-batch itself. You will fall behind if your processing time exceeds your injection time and finally the rate that you interact with the sink, in our case Google BigQuery database (serveless) will also add to the throughput calculation. I find Spark GUI a great help to analyse the details and plan the workload overall

With regard to anticipating the workload, we always start with MVP and the anticipated average volume/traffic and rev it up by one standard deviation(1.646)  of the load so theoretically we should have 90% Confidence Level that we can accommodate the additional workload.

This normally works except for the tail of the distribution that we have to improvise.

Thanks

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Fri, 26 Mar 2021 at 18:15, Lalwani, Jayesh <[hidden email]> wrote:

Short Answer: Yes

 

Long answer: You need to understand your load characteristics to size your cluster. Most applications have 3 components to their load. A) a predictable amount of expected load. This usually changes based on time of day, and day of week The main thing is that it’s predictable. B) unpredictable spikes. The main thing about spikes that they are periods of high traffic but are short lived c) sustained high  load: this is load that occurs because of some event (for example, retailers have higher load during Christmas) This might be semi-predictable: You know it’s coming, but you don’t know how big it will be. Also, unlike spikes, they will be sustained

 

What you want to do is size your cluster and tune your application so you can handle 120% of your predictable load without falling behind. If your load is unusually spiky, you might want to increase that to 150% or 200%. This means that when a spike hits, you have excess capacity to handle the load.

 

Also, you want to make sure that if there is too much load, requests get queued up instead of crashing your streaming app. You don’t want an OOM because Spark tried to read 100 million records in one batch. There is a setting called maxRatePerPartition that you can set. This limits the number of records that will be read in one batch. You should set this to be to be 80% of the maximum batch size your cluster can handle.  If you do this, and if you get a huge spike, your application will survive the spike, but you might have large latency until you get back to normal.

 

If you are expecting to receive high loads for a sustained amount of time, you want to implement some sort of autoscaling that adds nodes to your cluster, and increases partitioning of the data. Autoscaling cannot react fast enough for momentary spikes, but it can prevent your system from being overwhelmed with sustained high loads

 

From: Mich Talebzadeh <[hidden email]>
Date: Friday, March 26, 2021 at 1:44 PM
To: "user @spark" <[hidden email]>
Subject: [EXTERNAL] The trigger interval in spark structured streaming

 

CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.

 

One thing I noticed is that when the trigger interval in foreachBatch is set to something low (in this case 2 seconds, equivalent to the batch interval that source sends data to Kafka topic (every 2 seconds)

 

trigger(processingTime='2 seconds')

    

Spark sends the warning that the queue is falling behind

 

```

batchId is 22,  rows is 40

21/03/18 16:29:05 WARN org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 2000 milliseconds, but spent 13686 milliseconds

batchId is 23,  rows is 40

21/03/18 16:29:21 WARN org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 2000 milliseconds, but spent 15316 milliseconds

batchId is 24,  rows is 40

```

So, assuming  that the batch interval is somehow fixed, one needs to look at how to adjust the resources that process the topic in a timely manner.

 

Any comments welcome

 

 

 Image removed by sender.  view my Linkedln profile

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.