[Ask for help] How to manually submit offsetRanges

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[Ask for help] How to manually submit offsetRanges

Fangyuan Liu

Hello Sir/Madam,

 

    I am using spark streaming and kafka java API. And I want to know if there is any method to commit OffsetRanges except for `commitAsync`. The problem is: I made some modification on the OffsetRanges, and I commit it using `commitAsync` method, however, the modification seems not committed successfully.

    

-----------------------------------------------------------------------------------------------------------------------------------------------------------

    For Example:

    in this batch(batch 10):

    OffsetRange o = OffsetRanges[0] 

    o.fromOffset = 112233, o.untilOffset = 112244

    I made a modification so that 

    o.fromOffset = 112233, o.untilOffset = 112234

    and I commit the offset ranges using commitAsync

 

    but in next batch(batch 11)

    I expect o.fromOffset = 112234, however o.fromOffset = 112244 actually (so the modification seems not committed successfully)

-----------------------------------------------------------------------------------------------------------------------------------------------------------

 

    Here are some related code:(I am using org.apache.spark (version 2.3.0) and org.apache.kafka(0.10))

 

```java

JavaStreamingContext jsc = JavaStreamingContext.getOrCreate(checkpoint, () -> {


    JavaStreamingContext streamingContext = new JavaStreamingContext(sparkContext, Durations.milliseconds(1000));
    streamingContext.checkpoint(checkpoint);
    List<String> topics = new ArrayList<>("a");
    String consumerGroupId = "a_comsumer_group";


    Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("bootstrap.servers", String.join(",", "broker_name"));
        kafkaParams.put("group.id", consumerGroupId);
        kafkaParams.put("auto.offset.reset", "latest");
        kafkaParams.put("enable.auto.commit", "false");


    JavaDStream<ConsumerRecord<String, String>> kafkaStream = KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent(),
                ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));


    kafkaStream.foreachRDD(rdd -> {
        OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

 

 

// I made some changes to the offsetRanges


        ((CanCommitOffsets) kafkaStream).commitAsync(offsetRanges);
    });
}
```

Thanks a lot. :)

 

 

Best Regards,

Fangyuan