Spark streaming + Kafka pending batches

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Spark streaming + Kafka pending batches

jithinpt
This post has NOT been accepted by the mailing list yet.
Hi all,

I'm running a Spark Streaming app that reads data from Kafka (using the Direct Stream approach) and publishes the results back to Kafka. The input rate to the app as well as the app's throughput remain steady for about an hour or two. After that, I start seeing batches that remain in the Active Batches queue for a very long time (for 30mins+). The Spark driver log indicates the following two types of errors and the time of occurrence of these errors coincides well with the start times of the batches that get stuck:

First error type

ERROR LiveListenerBus: Dropping SparkListenerEvent because no remaining room in event queue. This likely means one of the SparkListeners is too slow and cannot keep up with the rate at which tasks are being started by the scheduler.

Second error type

ERROR StreamingListenerBus: Listener StreamingJobProgressListener threw an exception
java.util.NoSuchElementException: key not found: 1501806558000 ms
  at scala.collection.MapLike$class.default(MapLike.scala:228)
  at scala.collection.AbstractMap.default(Map.scala:59)
  at scala.collection.mutable.HashMap.apply(HashMap.scala:65)
  at org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:134)
  at org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67)
  at org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29)
  at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63)
  at org.apache.spark.streaming.scheduler.StreamingListenerBus.postToAll(StreamingListenerBus.scala:29)
  at org.apache.spark.streaming.scheduler.StreamingListenerBus.onOtherEvent(StreamingListenerBus.scala:43)
  at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:75)
  at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:36)
  at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:36)
  at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63)
  at org.apache.spark.scheduler.LiveListenerBus.postToAll(LiveListenerBus.scala:36)
  at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(LiveListenerBus.scala:94)
  at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79)
  at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79)
  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
  at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:78)
  at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1279)
  at org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:77)
However, I'm not sure how to interpret these errors and in spite of an extensive online search, I couldn't find any useful info related to this.

Questions

1. What do these errors mean? Are they indicative of resource limitations (eg: CPU, memory, etc.)?
2. What would be the best way to fix these errors?

Thanks in advance.
Loading...