I'm reading compacted Kafka topic with spark 2.4, using direct stream - KafkaUtils.createDirectStream(...). I have configured necessary options for compacted stream, so its processed with CompactedKafkaRDDIterator.
It works well, however in case of many gaps in the topic, the processing is very slow and 90% of time the executors are idle.
I had a look to the source are here are my findings:
Spark first computes number of records to stream from Kafka (processing rate * batch window size). # of records are translated to Kafka's (offset_from, offset_to) and eventually the Iterator reads records within the offset boundaries.
This works fine until there are many gaps in the topic, which reduces the real number of processed records.
Let's say we wanted to read 100k records in 60 sec window. With gaps it gets to 10k (because 90k are just compacted gaps) in 60 sec.
As a result executor is working only 6 sec and 54 sec doing nothing.
I'd like to utilize the executor as much as possible.
A great feature would be to read 100k real records (skip the gaps) no matter what are the offsets.
I've tried to make some improvement with backpressure and my custom RateEstimator (decorating PidRateEstimator and boosting the rate per second). And was even able to fully utilize the executors, but my approach have a big problem when compacted part of the topic meets non compacted part. The executor just tries to read a too big chunk of Kafka and the whole processing dies.