spark.streaming.receiver.maxRate

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

spark.streaming.receiver.maxRate

Margusja

Hi

Using Spark 2.1.1.2.6-1.0-129 (from Hortonworks distro) and Scala 2.11.8 and Java 1.8.0_60

I have Nifi flow produces more records than Spark stream can work in batch time. To avoid spark queue overflow I wanted to try spark streaming backpressure (did not work for my) so back to the more simple but static solution I tried spark.streaming.receiver.maxRate.

I set it spark.streaming.receiver.maxRate=1. As I understand it from Spark manual: "If the batch processing time is more than batchinterval then obviously the receiver’s memory will start filling up and will end up in throwing exceptions (most probably BlockNotFoundException). Currently there is no way to pause the receiver. Using SparkConf configuration spark.streaming.receiver.maxRate, rate of receiver can be limited." - it means 1 record per second?

I have very simple code:

val conf = new SiteToSiteClient.Builder().url("http://192.168.80.120:9090/nifi").portName("testing").buildConfig()
val ssc = new StreamingContext(sc, Seconds(1))

val lines = ssc.receiverStream(new NiFiReceiver(conf, StorageLevel.MEMORY_AND_DISK))
lines.print()

ssc.start()


I have loads of records waiting in Nifi testing port. After I start ssc.start() I will get 9-7 records per batch (batch time is 1s). Do I understand spark.streaming.receiver.maxRate wrong?

-- 
Margus (margusja) Roo
http://margus.roo.ee
skype: margusja
https://www.facebook.com/allan.tuuring
+372 51 48 780
Reply | Threaded
Open this post in threaded view
|

Re: spark.streaming.receiver.maxRate

Margusja

Hi

I tested spark.streaming.receiver.maxRate and spark.streaming.backpressure.enabled settings using socketStream and it works.

But if I am using nifi-spark-receiver (https://mvnrepository.com/artifact/org.apache.nifi/nifi-spark-receiver) then it does not using
spark.streaming.receiver.maxRate

any workaround?

Margus (margusja) Roo
http://margus.roo.ee
skype: margusja
https://www.facebook.com/allan.tuuring
+372 51 48 780
On 14/09/2017 09:57, Margus Roo wrote:

Hi

Using Spark 2.1.1.2.6-1.0-129 (from Hortonworks distro) and Scala 2.11.8 and Java 1.8.0_60

I have Nifi flow produces more records than Spark stream can work in batch time. To avoid spark queue overflow I wanted to try spark streaming backpressure (did not work for my) so back to the more simple but static solution I tried spark.streaming.receiver.maxRate.

I set it spark.streaming.receiver.maxRate=1. As I understand it from Spark manual: "If the batch processing time is more than batchinterval then obviously the receiver’s memory will start filling up and will end up in throwing exceptions (most probably BlockNotFoundException). Currently there is no way to pause the receiver. Using SparkConf configuration spark.streaming.receiver.maxRate, rate of receiver can be limited." - it means 1 record per second?

I have very simple code:

val conf = new SiteToSiteClient.Builder().url("http://192.168.80.120:9090/nifi").portName("testing").buildConfig()
val ssc = new StreamingContext(sc, Seconds(1))

val lines = ssc.receiverStream(new NiFiReceiver(conf, StorageLevel.MEMORY_AND_DISK))
lines.print()

ssc.start()


I have loads of records waiting in Nifi testing port. After I start ssc.start() I will get 9-7 records per batch (batch time is 1s). Do I understand spark.streaming.receiver.maxRate wrong?

-- 
Margus (margusja) Roo
http://margus.roo.ee
skype: margusja
https://www.facebook.com/allan.tuuring
+372 51 48 780

Reply | Threaded
Open this post in threaded view
|

Re: spark.streaming.receiver.maxRate

Margusja

Some more info

val lines = ssc.socketStream() // works
val lines = ssc.receiverStream(new NiFiReceiver(conf, StorageLevel.MEMORY_AND_DISK_SER_2)) // does not work
Margus (margusja) Roo
http://margus.roo.ee
skype: margusja
https://www.facebook.com/allan.tuuring
+372 51 48 780
On 15/09/2017 21:50, Margus Roo wrote:

Hi

I tested spark.streaming.receiver.maxRate and spark.streaming.backpressure.enabled settings using socketStream and it works.

But if I am using nifi-spark-receiver (https://mvnrepository.com/artifact/org.apache.nifi/nifi-spark-receiver) then it does not using
spark.streaming.receiver.maxRate

any workaround?

Margus (margusja) Roo
http://margus.roo.ee
skype: margusja
https://www.facebook.com/allan.tuuring
+372 51 48 780
On 14/09/2017 09:57, Margus Roo wrote:

Hi

Using Spark 2.1.1.2.6-1.0-129 (from Hortonworks distro) and Scala 2.11.8 and Java 1.8.0_60

I have Nifi flow produces more records than Spark stream can work in batch time. To avoid spark queue overflow I wanted to try spark streaming backpressure (did not work for my) so back to the more simple but static solution I tried spark.streaming.receiver.maxRate.

I set it spark.streaming.receiver.maxRate=1. As I understand it from Spark manual: "If the batch processing time is more than batchinterval then obviously the receiver’s memory will start filling up and will end up in throwing exceptions (most probably BlockNotFoundException). Currently there is no way to pause the receiver. Using SparkConf configuration spark.streaming.receiver.maxRate, rate of receiver can be limited." - it means 1 record per second?

I have very simple code:

val conf = new SiteToSiteClient.Builder().url("http://192.168.80.120:9090/nifi").portName("testing").buildConfig()
val ssc = new StreamingContext(sc, Seconds(1))

val lines = ssc.receiverStream(new NiFiReceiver(conf, StorageLevel.MEMORY_AND_DISK))
lines.print()

ssc.start()


I have loads of records waiting in Nifi testing port. After I start ssc.start() I will get 9-7 records per batch (batch time is 1s). Do I understand spark.streaming.receiver.maxRate wrong?

-- 
Margus (margusja) Roo
http://margus.roo.ee
skype: margusja
https://www.facebook.com/allan.tuuring
+372 51 48 780


Reply | Threaded
Open this post in threaded view
|

Re: spark.streaming.receiver.maxRate

Akhil Das-2

On Sat, Sep 16, 2017 at 1:59 AM, Margus Roo <[hidden email]> wrote:

Some more info

val lines = ssc.socketStream() // works
val lines = ssc.receiverStream(new NiFiReceiver(conf, StorageLevel.MEMORY_AND_DISK_SER_2)) // does not work
Margus (margusja) Roo
http://margus.roo.ee
skype: margusja
https://www.facebook.com/allan.tuuring
+372 51 48 780
On 15/09/2017 21:50, Margus Roo wrote:

Hi

I tested spark.streaming.receiver.maxRate and spark.streaming.backpressure.enabled settings using socketStream and it works.

But if I am using nifi-spark-receiver (https://mvnrepository.com/artifact/org.apache.nifi/nifi-spark-receiver) then it does not using
spark.streaming.receiver.maxRate

any workaround?

Margus (margusja) Roo
http://margus.roo.ee
skype: margusja
https://www.facebook.com/allan.tuuring
+372 51 48 780
On 14/09/2017 09:57, Margus Roo wrote:

Hi

Using Spark 2.1.1.2.6-1.0-129 (from Hortonworks distro) and Scala 2.11.8 and Java 1.8.0_60

I have Nifi flow produces more records than Spark stream can work in batch time. To avoid spark queue overflow I wanted to try spark streaming backpressure (did not work for my) so back to the more simple but static solution I tried spark.streaming.receiver.maxRate.

I set it spark.streaming.receiver.maxRate=1. As I understand it from Spark manual: "If the batch processing time is more than batchinterval then obviously the receiver’s memory will start filling up and will end up in throwing exceptions (most probably BlockNotFoundException). Currently there is no way to pause the receiver. Using SparkConf configuration spark.streaming.receiver.maxRate, rate of receiver can be limited." - it means 1 record per second?

I have very simple code:

val conf = new SiteToSiteClient.Builder().url("http://192.168.80.120:9090/nifi").portName("testing").buildConfig()
val ssc = new StreamingContext(sc, Seconds(1))

val lines = ssc.receiverStream(new NiFiReceiver(conf, StorageLevel.MEMORY_AND_DISK))
lines.print()

ssc.start()


I have loads of records waiting in Nifi testing port. After I start ssc.start() I will get 9-7 records per batch (batch time is 1s). Do I understand spark.streaming.receiver.maxRate wrong?

-- 
Margus (margusja) Roo
http://margus.roo.ee
skype: margusja
https://www.facebook.com/allan.tuuring
+372 51 48 780





--
Cheers!