How to repartition Spark DStream Kafka ConsumerRecord RDD.

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

How to repartition Spark DStream Kafka ConsumerRecord RDD.

SparkUser6
 How to repartition Spark DStream Kafka ConsumerRecord RDD.  I am getting uneven size of Kafka topics.. We want to repartition the input RDD based on some logic.

 But when I try to apply the repartition I am getting "object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord" error, I found following workaround

 https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/javaionotserializableexception.html

Call rdd.forEachPartition and create the NotSerializable object in there like this:
rdd.forEachPartition(iter -> {
  NotSerializable notSerializable = new NotSerializable();

  // ...Now process iter
});

APPLIED HERE

 val stream =KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParam) ).map(_.value())
      stream.foreachRDD { rdd =>
        val repartitionRDD = flow.repartitionRDD(rdd,1)
        println("&&&&&&&&&&&&&& repartitionRDD " + repartitionRDD.count())
       val modifiedRDD = rdd.mapPartitions { 
          iter =>{
            val customerRecords: List[ConsumerRecord[String, String]] = List[ConsumerRecord[String, String]]()
             while(iter.hasNext){
                  val consumerRecord :ConsumerRecord[String, String] = iter.next()
                  customerRecords:+ consumerRecord
             }
             customerRecords.iterator
          }
        }
        val r = modifiedRDD.repartition(1)
        println("************* after repartition " + r.count())

BUT still getting same object not Serializable error.   Any help is greatly appreciated.