How to repartition Spark DStream Kafka ConsumerRecord RDD.

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view

How to repartition Spark DStream Kafka ConsumerRecord RDD.

 How to repartition Spark DStream Kafka ConsumerRecord RDD.  I am getting uneven size of Kafka topics.. We want to repartition the input RDD based on some logic.

 But when I try to apply the repartition I am getting "object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord" error, I found following workaround

Call rdd.forEachPartition and create the NotSerializable object in there like this:
rdd.forEachPartition(iter -> {
  NotSerializable notSerializable = new NotSerializable();

  // ...Now process iter


 val stream =KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParam) ).map(_.value())
      stream.foreachRDD { rdd =>
        val repartitionRDD = flow.repartitionRDD(rdd,1)
        println("&&&&&&&&&&&&&& repartitionRDD " + repartitionRDD.count())
       val modifiedRDD = rdd.mapPartitions { 
          iter =>{
            val customerRecords: List[ConsumerRecord[String, String]] = List[ConsumerRecord[String, String]]()
                  val consumerRecord :ConsumerRecord[String, String] =
                  customerRecords:+ consumerRecord
        val r = modifiedRDD.repartition(1)
        println("************* after repartition " + r.count())

BUT still getting same object not Serializable error.   Any help is greatly appreciated.