Long and consistent wait between tasks in streaming job

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Long and consistent wait between tasks in streaming job

Francisco Pareja
Hi,

I have a spark streaming job running on Mesos.
All batches take the exact same time and this time is much longer than expected.
The jobs pull data from kafka, process the data and insert them to cassandra and again back to kafka in a different topic.
Each batch (below) has 3 jobs, 2 of them pull from kafka, process and insert into cassandra, and the other pulls from kafka, processes and pushes back to kafka.

I inspected the batch in the spark UI and found that they all take the same time (4s) but drilling down more, they actually process for less than a second each but they all have a gap of the same time (around 4 seconds).
Adding more executors or more processing power doesn't look like it will make a difference.



Processing time = 12s
Total delay = 1.2 s !!!!



So I drill down into each job of the batch (they all take the exact same time = 4s even if they are doing different processing):




They all take 4 seconds to run one of their stage (the one that reads from kafka).
Now I drill down into one of them (they are all very similar):



What is this wait!! The whole thing actually only takes 0.5s to run!! But it is just waiting. Waiting for Kafka??

Has anyone experienced anything similar?
What could I have coded wrong or configured incorrectly?

Best Regards,
Javier Pareja
Reply | Threaded
Open this post in threaded view
|

Re: Long and consistent wait between tasks in streaming job

Francisco Pareja
Hi again,

Here is a minimum code that triggers this behaviour. This makes me think that it must be the setup somehow.

object Test {

def main(args: Array[String]) {

val sparkConf = new SparkConf(true)
val streamingContext = new StreamingContext(sparkConf, Seconds(5))

val kafkaParams = Map[String, String](
"bootstrap.servers" -> "####,####,####",
"group.id" -> "test"
)

val stream = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](
streamingContext, kafkaParams, Set("test_topic")
)

stream.map(t => "LEN=" + t._2.length).print()

streamingContext.start()
streamingContext.awaitTermination()
}
}
Even if all the executors are in the same node (--conf spark.executor.cores=2 --conf spark.cores.max=2), the problem is still there and it is exactly 4 seconds as before:


Even if the topic has no messages (batch of 0 records), spark streaming takes 4 seconds for every batch.

The only way that I have been able to fix this is by setting cores=1 and cores.max=1 so that it only creates one task to execute.

This task is displayed and NODE_LOCAL like one of the tasks above. It seems that when NODE_LOCAL the execution is instantaneous but when Locality is ANY it takes 4!! seconds to connect.

I hope this helps because I am running out of ideas.



F Javier Pareja

On Thu, Jun 7, 2018 at 5:44 PM, Javier Pareja <[hidden email]> wrote:
Hi,

I have a spark streaming job running on Mesos.
All batches take the exact same time and this time is much longer than expected.
The jobs pull data from kafka, process the data and insert them to cassandra and again back to kafka in a different topic.
Each batch (below) has 3 jobs, 2 of them pull from kafka, process and insert into cassandra, and the other pulls from kafka, processes and pushes back to kafka.

I inspected the batch in the spark UI and found that they all take the same time (4s) but drilling down more, they actually process for less than a second each but they all have a gap of the same time (around 4 seconds).
Adding more executors or more processing power doesn't look like it will make a difference.



Processing time = 12s
Total delay = 1.2 s !!!!



So I drill down into each job of the batch (they all take the exact same time = 4s even if they are doing different processing):




They all take 4 seconds to run one of their stage (the one that reads from kafka).
Now I drill down into one of them (they are all very similar):



What is this wait!! The whole thing actually only takes 0.5s to run!! But it is just waiting. Waiting for Kafka??

Has anyone experienced anything similar?
What could I have coded wrong or configured incorrectly?

Best Regards,
Javier Pareja

Reply | Threaded
Open this post in threaded view
|

Re: Long and consistent wait between tasks in streaming job

Francisco Pareja
Hi all,

I have finally found the problem: spark.locality.wait
Its default value is 3 seconds. I have set it now to 0s and everything is running smooth.

Hope this helps someone in the future.

Best Regards,
Francisco

F Javier Pareja

On Thu, Jun 7, 2018 at 10:59 PM, Javier Pareja <[hidden email]> wrote:
Hi again,

Here is a minimum code that triggers this behaviour. This makes me think that it must be the setup somehow.

object Test {

def main(args: Array[String]) {

val sparkConf = new SparkConf(true)
val streamingContext = new StreamingContext(sparkConf, Seconds(5))

val kafkaParams = Map[String, String](
"bootstrap.servers" -> "####,####,####",
"group.id" -> "test"
)

val stream = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](
streamingContext, kafkaParams, Set("test_topic")
)

stream.map(t => "LEN=" + t._2.length).print()

streamingContext.start()
streamingContext.awaitTermination()
}
}
Even if all the executors are in the same node (--conf spark.executor.cores=2 --conf spark.cores.max=2), the problem is still there and it is exactly 4 seconds as before:


Even if the topic has no messages (batch of 0 records), spark streaming takes 4 seconds for every batch.

The only way that I have been able to fix this is by setting cores=1 and cores.max=1 so that it only creates one task to execute.

This task is displayed and NODE_LOCAL like one of the tasks above. It seems that when NODE_LOCAL the execution is instantaneous but when Locality is ANY it takes 4!! seconds to connect.

I hope this helps because I am running out of ideas.



F Javier Pareja

On Thu, Jun 7, 2018 at 5:44 PM, Javier Pareja <[hidden email]> wrote:
Hi,

I have a spark streaming job running on Mesos.
All batches take the exact same time and this time is much longer than expected.
The jobs pull data from kafka, process the data and insert them to cassandra and again back to kafka in a different topic.
Each batch (below) has 3 jobs, 2 of them pull from kafka, process and insert into cassandra, and the other pulls from kafka, processes and pushes back to kafka.

I inspected the batch in the spark UI and found that they all take the same time (4s) but drilling down more, they actually process for less than a second each but they all have a gap of the same time (around 4 seconds).
Adding more executors or more processing power doesn't look like it will make a difference.



Processing time = 12s
Total delay = 1.2 s !!!!



So I drill down into each job of the batch (they all take the exact same time = 4s even if they are doing different processing):




They all take 4 seconds to run one of their stage (the one that reads from kafka).
Now I drill down into one of them (they are all very similar):



What is this wait!! The whole thing actually only takes 0.5s to run!! But it is just waiting. Waiting for Kafka??

Has anyone experienced anything similar?
What could I have coded wrong or configured incorrectly?

Best Regards,
Javier Pareja