Is it possible to rate limit an UDP?

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Is it possible to rate limit an UDP?

Yeikel

I have a data frame for which I apply an UDF that calls a REST web service.  This web service is distributed in only a few nodes and it won’t be able to handle a massive load from Spark.

 

Is it possible to rate limit this UDP? For example , something like 100 op/s.

 

If not , what are the options? Is splitting the df an option?

 

I’ve read a similar question in Stack overflow [1] and the solution suggests Spark Streaming , but my application does not involve streaming. Do I need to turn the operations into a streaming workflow to achieve something like that?

 

Current Workflow : Hive -> Spark ->  Service

 

Thank you

 

[1] https://stackoverflow.com/questions/43953882/how-to-rate-limit-a-spark-map-operation

Reply | Threaded
Open this post in threaded view
|

Re: Is it possible to rate limit an UDP?

Sonal Goyal
Have you tried controlling the number of partitions of the dataframe? Say you have 5 partitions, it means you are making 5 concurrent calls to the web service. The throughput of the web service would be your bottleneck and Spark workers would be waiting for tasks, but if you cant control the REST service, maybe its worth a shot.  

Thanks,
Sonal
Nube Technologies 






On Wed, Jan 9, 2019 at 4:51 AM <[hidden email]> wrote:

I have a data frame for which I apply an UDF that calls a REST web service.  This web service is distributed in only a few nodes and it won’t be able to handle a massive load from Spark.

 

Is it possible to rate limit this UDP? For example , something like 100 op/s.

 

If not , what are the options? Is splitting the df an option?

 

I’ve read a similar question in Stack overflow [1] and the solution suggests Spark Streaming , but my application does not involve streaming. Do I need to turn the operations into a streaming workflow to achieve something like that?

 

Current Workflow : Hive -> Spark ->  Service

 

Thank you

 

[1] https://stackoverflow.com/questions/43953882/how-to-rate-limit-a-spark-map-operation

Reply | Threaded
Open this post in threaded view
|

Re: Is it possible to rate limit an UDP?

Ramandeep Singh
Backpressure is the suggested way out here and is the correct approach, it rate limits at the source itself for safety.   Imagine a service with throttling enabled, It can outright reject your calls. 

Even if you split your df that alone won't achieve your purpose, You can combine that with backpressure enabled API or restricting by time.

Here's an example, Using RxJava, if you don't want to use any streaming api. 
def main(args: Array[String]): Unit = {
val ss = SparkSession.builder().master("local[*]").enableHiveSupport().getOrCreate()

import ss.sqlContext.implicits._

val df = ss.read.json("src/main/resources/person.json")
implicit val encoder = RowEncoder(df.schema)
df.repartition(2).mapPartitions(it => {
val itF = Flowable.fromIterable[Row](it.toIterable.asJava)
val delSt = Flowable.interval(1, TimeUnit.SECONDS)
Flowable.zip[java.lang.Long, Row, Row](delSt, itF, new BiFunction[java.lang.Long, Row, Row]() {
override def apply(t1: java.lang.Long, t2: Row): Row = {
//call api here
t2
}
}).toList.blockingGet().iterator().asScala
})
df.show()
}

On Wed, Jan 9, 2019 at 6:12 AM Sonal Goyal <[hidden email]> wrote:
Have you tried controlling the number of partitions of the dataframe? Say you have 5 partitions, it means you are making 5 concurrent calls to the web service. The throughput of the web service would be your bottleneck and Spark workers would be waiting for tasks, but if you cant control the REST service, maybe its worth a shot.  

Thanks,
Sonal
Nube Technologies 






On Wed, Jan 9, 2019 at 4:51 AM <[hidden email]> wrote:

I have a data frame for which I apply an UDF that calls a REST web service.  This web service is distributed in only a few nodes and it won’t be able to handle a massive load from Spark.

 

Is it possible to rate limit this UDP? For example , something like 100 op/s.

 

If not , what are the options? Is splitting the df an option?

 

I’ve read a similar question in Stack overflow [1] and the solution suggests Spark Streaming , but my application does not involve streaming. Do I need to turn the operations into a streaming workflow to achieve something like that?

 

Current Workflow : Hive -> Spark ->  Service

 

Thank you

 

[1] https://stackoverflow.com/questions/43953882/how-to-rate-limit-a-spark-map-operation



--
Regards,
Ramandeep Singh
Reply | Threaded
Open this post in threaded view
|

RE: Is it possible to rate limit an UDP?

Yeikel

Thank you for your suggestion Ramandeep , but the code is not clear to me. Could you please explain it?  Particularly this part :

 

Flowable.zip[java.lang.Long, Row, Row](delSt, itF, new BiFunction[java.lang.Long, Row, Row]() {

 

Also , is it possible to achieve this without third party libraries?

 

Thank you

 

From: Ramandeep Singh <[hidden email]>
Sent: Thursday, January 10, 2019 1:48 AM
To: Sonal Goyal <[hidden email]>
Cc: [hidden email]; user <[hidden email]>
Subject: Re: Is it possible to rate limit an UDP?

 

Backpressure is the suggested way out here and is the correct approach, it rate limits at the source itself for safety.   Imagine a service with throttling enabled, It can outright reject your calls. 

 

Even if you split your df that alone won't achieve your purpose, You can combine that with backpressure enabled API or restricting by time.

 

Here's an example, Using RxJava, if you don't want to use any streaming api. 

def main(args: Array[String]): Unit = {
 
val ss = SparkSession.builder().master("local[*]").enableHiveSupport().getOrCreate()

 
import ss.sqlContext.implicits._

 
val df = ss.read.json("src/main/resources/person.json")
 
implicit val encoder = RowEncoder(df.schema)
  df.repartition(
2).mapPartitions(it => {
   
val itF = Flowable.fromIterable[Row](it.toIterable.asJava)
   
val delSt = Flowable.interval(1, TimeUnit.SECONDS)
    Flowable.zip[java.lang.Long
, Row, Row](delSt, itF, new BiFunction[java.lang.Long, Row, Row]() {
     
override def apply(t1: java.lang.Long, t2: Row): Row = {
       
//call api here
       
t2
      }
    }).toList.blockingGet().iterator().asScala
  })
  df.show()
}

 

On Wed, Jan 9, 2019 at 6:12 AM Sonal Goyal <[hidden email]> wrote:

Have you tried controlling the number of partitions of the dataframe? Say you have 5 partitions, it means you are making 5 concurrent calls to the web service. The throughput of the web service would be your bottleneck and Spark workers would be waiting for tasks, but if you cant control the REST service, maybe its worth a shot.  


Thanks,
Sonal
Nube Technologies 

 



 

 

On Wed, Jan 9, 2019 at 4:51 AM <[hidden email]> wrote:

I have a data frame for which I apply an UDF that calls a REST web service.  This web service is distributed in only a few nodes and it won’t be able to handle a massive load from Spark.

 

Is it possible to rate limit this UDP? For example , something like 100 op/s.

 

If not , what are the options? Is splitting the df an option?

 

I’ve read a similar question in Stack overflow [1] and the solution suggests Spark Streaming , but my application does not involve streaming. Do I need to turn the operations into a streaming workflow to achieve something like that?

 

Current Workflow : Hive -> Spark ->  Service

 

Thank you

 

[1] https://stackoverflow.com/questions/43953882/how-to-rate-limit-a-spark-map-operation


 

--

Regards,

Ramandeep Singh

Reply | Threaded
Open this post in threaded view
|

Re: Is it possible to rate limit an UDP?

ramannanda9@gmail.com
Basically, it is a zipping two flowables using the defined function[takes two parameters and returns one, Hence the name BiFunction].

Obviously, you could avoid using RXJava and by using a TimerTask.

val a = Seq(1, 2, 3)
val b = a.zipWithIndex
b.foreach(b => new Timer().schedule(new TimerTask {
override def run(): Unit = println(b._1)
}, b._2 * 200));

On Sat, Jan 12, 2019 at 9:25 PM <[hidden email]> wrote:

Thank you for your suggestion Ramandeep , but the code is not clear to me. Could you please explain it?  Particularly this part :

 

Flowable.zip[java.lang.Long, Row, Row](delSt, itF, new BiFunction[java.lang.Long, Row, Row]() {

 

Also , is it possible to achieve this without third party libraries?

 

Thank you

 

From: Ramandeep Singh <[hidden email]>
Sent: Thursday, January 10, 2019 1:48 AM
To: Sonal Goyal <[hidden email]>
Cc: [hidden email]; user <[hidden email]>
Subject: Re: Is it possible to rate limit an UDP?

 

Backpressure is the suggested way out here and is the correct approach, it rate limits at the source itself for safety.   Imagine a service with throttling enabled, It can outright reject your calls. 

 

Even if you split your df that alone won't achieve your purpose, You can combine that with backpressure enabled API or restricting by time.

 

Here's an example, Using RxJava, if you don't want to use any streaming api. 

def main(args: Array[String]): Unit = {
 
val ss = SparkSession.builder().master("local[*]").enableHiveSupport().getOrCreate()

 
import ss.sqlContext.implicits._

 
val df = ss.read.json("src/main/resources/person.json")
 
implicit val encoder = RowEncoder(df.schema)
  df.repartition(
2).mapPartitions(it => {
   
val itF = Flowable.fromIterable[Row](it.toIterable.asJava)
   
val delSt = Flowable.interval(1, TimeUnit.SECONDS)
    Flowable.zip[java.lang.Long
, Row, Row](delSt, itF, new BiFunction[java.lang.Long, Row, Row]() {
     
override def apply(t1: java.lang.Long, t2: Row): Row = {
       
//call api here
       
t2
      }
    }).toList.blockingGet().iterator().asScala
  })
  df.show()
}

 

On Wed, Jan 9, 2019 at 6:12 AM Sonal Goyal <[hidden email]> wrote:

Have you tried controlling the number of partitions of the dataframe? Say you have 5 partitions, it means you are making 5 concurrent calls to the web service. The throughput of the web service would be your bottleneck and Spark workers would be waiting for tasks, but if you cant control the REST service, maybe its worth a shot.  


Thanks,
Sonal
Nube Technologies 

 



 

 

On Wed, Jan 9, 2019 at 4:51 AM <[hidden email]> wrote:

I have a data frame for which I apply an UDF that calls a REST web service.  This web service is distributed in only a few nodes and it won’t be able to handle a massive load from Spark.

 

Is it possible to rate limit this UDP? For example , something like 100 op/s.

 

If not , what are the options? Is splitting the df an option?

 

I’ve read a similar question in Stack overflow [1] and the solution suggests Spark Streaming , but my application does not involve streaming. Do I need to turn the operations into a streaming workflow to achieve something like that?

 

Current Workflow : Hive -> Spark ->  Service

 

Thank you

 

[1] https://stackoverflow.com/questions/43953882/how-to-rate-limit-a-spark-map-operation


 

--

Regards,

Ramandeep Singh



--
Regards,
Ramandeep Singh
http://orastack.com
+13474792296
[hidden email]