Reset the offsets, Kafka 0.10 and Spark

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Reset the offsets, Kafka 0.10 and Spark

Guillermo Ortiz Fernández
I'm consuming data from Kafka with  createDirectStream and store the offsets in Kafka (https://spark.apache.org/docs/2.1.0/streaming-kafka-0-10-integration.html#kafka-itself)

val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)


My Spark version is 2.0.2 and 0.10 from Kafka. This solution works well and when I restart the spark process starts from the last offset which Spark consumes, but sometimes I need to reprocess all the topic from the beginning. 

I have seen that I could reset the offset with a kafka script but it's not enable in Kafka 0.10... 

kafka-consumer-groups --bootstrap-server kafka-host:9092 --group
my-group --reset-offsets --to-earliest --all-topics --execute

Another possibility it's to set another kafka parameter in the createDirectStream with a map with the offsets but, how could I get first offset from each partition?, I have checked the api from the new consumer and I don't see any method to get these offsets. 

Any other way?? I could start with another groupId as well, but it doesn't seem a very clean option for production. 
Reply | Threaded
Open this post in threaded view
|

Re: Reset the offsets, Kafka 0.10 and Spark

Tathagata Das
Structured Streaming really makes this easy. You can simply specify the option of whether the start the query from earliest or latest. 
Check out

On Thu, Jun 7, 2018 at 1:27 PM, Guillermo Ortiz Fernández <[hidden email]> wrote:
I'm consuming data from Kafka with  createDirectStream and store the offsets in Kafka (https://spark.apache.org/docs/2.1.0/streaming-kafka-0-10-integration.html#kafka-itself)

val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)


My Spark version is 2.0.2 and 0.10 from Kafka. This solution works well and when I restart the spark process starts from the last offset which Spark consumes, but sometimes I need to reprocess all the topic from the beginning. 

I have seen that I could reset the offset with a kafka script but it's not enable in Kafka 0.10... 

kafka-consumer-groups --bootstrap-server kafka-host:9092 --group
my-group --reset-offsets --to-earliest --all-topics --execute

Another possibility it's to set another kafka parameter in the createDirectStream with a map with the offsets but, how could I get first offset from each partition?, I have checked the api from the new consumer and I don't see any method to get these offsets. 

Any other way?? I could start with another groupId as well, but it doesn't seem a very clean option for production.