Spark Kafka Streaming with Offset Gaps

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Spark Kafka Streaming with Offset Gaps

Rishabh Pugalia
I have an app that uses Kafka Streaming to pull data from `input` topic and push to `output` topic with `processing.guarantee=exactly_once`. Due to `exactly_once` gaps (transaction markers) are created in Kafka. Let's call this app `kafka-streamer`.

Now I've another app that listens to this output topic (actually they are multiple topics with a Pattern/Regex) and processes the data using https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html. Let's call this app `spark-streamer`.

Due to the gaps, the first thing that happens is spark streaming fails. To fix this I enabled `spark.streaming.kafka.allowNonConsecutiveOffsets=true` in the spark config before creating the StreamingContext. Now let's look at the issues that were faced when I start `spark-streamer` (I also went through some of the spark-streaming-kafka code in the limited amount of time I had):

1. Once `spark-streamer` starts if there are unconsumed offsets present in the topic partition, it does poll them but won't process (create RDDs) until some new message is pushed to the topic partition after the app is started. Code: https://github.com/apache/spark/blob/master/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala#L160 - I can see we poll the data but I'm not sure where the code is to process it. But anyway, when I run the app I'm pretty sure the data doesn't get processed (but it does get polled in `compactedStart()`) until `compactedNext()` is called.
2. In `compactedNext()` if no data is polled within 120s (default timeout), we throw an exception and the my app literally crashes. Code: https://github.com/apache/spark/blob/master/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala#L178 - Why do we throw an exception and not keep polling just like a normal KafkaConsumer would do/behave ?

Would be of great help if somebody can help me out with the 2 questions listed above!

--
Thanks and Best Regards,
Rishabh