Fwd: [Spark Streaming]: Why my Spark Direct stream is sending multiple offset commits to Kafka?

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Fwd: [Spark Streaming]: Why my Spark Direct stream is sending multiple offset commits to Kafka?

Raghu B
Hi Spark Community.

I need help with the following issue and I have been researching about it from last 2 weeks and as a last and best resource I want to ask the Spark community.

I am running the following code in Spark*

      val sparkConf = new SparkConf()
        .setMaster("local[*]")
        .setAppName("KafkaTest")
        .set("spark.streaming.kafka.maxRatePerPartition","10")
        .set("spark.default.parallelism","10")
        .set("spark.streaming.backpressure.enabled", "true")
        .set("spark.scheduler.mode", "FAIR")
   
      lazy val sparkContext = new SparkContext(sparkConf)
      val sparkJob = new SparkLocal
   
      val kafkaParams = Map[String, Object](
          "bootstrap.servers" -> "kafka-270894369.spark.google.com:9092",
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "group.id" -> "stream_group1",
          "auto.offset.reset" -> "latest",
          "enable.auto.commit" -> "false",
          "heartbeat.interval.ms" -> "130000", //3000
          "request.timeout.ms" -> "150000", //40000
          "session.timeout.ms" -> "140000", //30000
          "max.poll.interval.ms" -> "140000", //isn't a known config
          "max.poll.records" -> "100" //2147483647
        )
   
        val streamingContext = new StreamingContext(sparkContext, Seconds(120))
   
        val topics = Array("topicname")
   
        val kafkaStream = KafkaUtils.createDirectStream[String, String](
          streamingContext,
          PreferConsistent,
          Subscribe[String, String](topics, kafkaParams)
        )
   
        def messageTuple(tuple: ConsumerRecord[String, String]): (String) = {
          (null) // Removed the code
        }
   
        var offset : Array[OffsetRange] = null
   
        kafkaStream.foreachRDD{rdd =>
          val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
          offset = offsetRanges
   
          rdd.map(row => messageTuple(row))
            .foreachPartition { partition =>
              partition.map(row => null)
                .foreach{ record =>
                  print("")
                  Thread.sleep(5)
                }
              }
          kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
          }
   
        streamingContext.start()
        streamingContext.awaitTerminationOrTimeout(6000000)
   
        sys.ShutdownHookThread{
          println("Gracefully shutting down App")
          streamingContext.stop(true,true)
          println("Application stopped")
        }



With the above code I am observing multiple commits are sending to Kafka and I am not sure why ?

(Got the below info from kafka __consumer_offset topic)



    [stream_group1,topicname,59]::OffsetAndMetadata(offset=864006531, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730000011, expireTimestamp=Some(1577816400011))
    [stream_group1,topicname,59]::OffsetAndMetadata(offset=864006531, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730000012, expireTimestamp=Some(1577816400012))
    [stream_group1,topicname,59]::OffsetAndMetadata(offset=864005827, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730000079, expireTimestamp=Some(1577816400079))
   
    [stream_group1,topicname,59]::OffsetAndMetadata(offset=864008524, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730120008, expireTimestamp=Some(1577816520008))
    [stream_group1,topicname,59]::OffsetAndMetadata(offset=864008524, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730120010, expireTimestamp=Some(1577816520010))
    [stream_group1,topicname,59]::OffsetAndMetadata(offset=864008524, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730120077, expireTimestamp=Some(1577816520077))
   
    [stream_group1,topicname,59]::OffsetAndMetadata(offset=864008959, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730240010, expireTimestamp=Some(1577816640010))
    [stream_group1,topicname,59]::OffsetAndMetadata(offset=864008959, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730240015, expireTimestamp=Some(1577816640015))
    [stream_group1,topicname,59]::OffsetAndMetadata(offset=864008959, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730240137, expireTimestamp=Some(1577816640137))
    [stream_group1,topicname,59]::OffsetAndMetadata(offset=864006531, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730000012, expireTimestamp=Some(1577816400012))
    [stream_group1,topicname,59]::OffsetAndMetadata(offset=864005827, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730000079, expireTimestamp=Some(1577816400079))
   
    [stream_group1,topicname,59]::OffsetAndMetadata(offset=864008524, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730120008, expireTimestamp=Some(1577816520008))
    [stream_group1,topicname,59]::OffsetAndMetadata(offset=864008524, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730120010, expireTimestamp=Some(1577816520010))
    [stream_group1,topicname,59]::OffsetAndMetadata(offset=864008524, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730120077, expireTimestamp=Some(1577816520077))
   
    [stream_group1,topicname,59]::OffsetAndMetadata(offset=864008959, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730240010, expireTimestamp=Some(1577816640010))
    [stream_group1,topicname,59]::OffsetAndMetadata(offset=864008959, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730240015, expireTimestamp=Some(1577816640015))
    [stream_group1,topicname,59]::OffsetAndMetadata(offset=864008959, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730240137, expireTimestamp=Some(1577816640137))


Ideally we should see only one commit for every 2mins based on my batch size but in our case we are observing 3 commits.

Also during the Application restart we are loosing the data because of above issue (Commit mismatch)

Please help me with your inputs ?

Thanks in advance,
Raghu