Spark 2.x duplicates output when task fails at "repartition" stage. Checkpointing is enabled before repartition.

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Spark 2.x duplicates output when task fails at "repartition" stage. Checkpointing is enabled before repartition.

Serega Sheypak
Hi, I have spark job that produces duplicates when one or tasks from repartition stage fails. 
Here is simplified code.

sparkContext.setCheckpointDir("hdfs://path-to-checkpoint-dir")

val inputRDDs: List[RDD[String]] = List.empty // an RDD per input dir


val updatedRDDs = inputRDDs.map{ inputRDD => // some stuff happens here

  inputRDD

    .filter(???)

     .map(???)

}


val unionOfUpdatedRDDs = sparkContext.union(updatedRDDs)

unionOfUpdatedRDDs.checkpoint() // id didn't help


unionOfUpdatedRDDs

  .repartition(42) // task failed here,

  .saveAsNewAPIHadoopFile("/path") // task failed here too.

// what really causes duplicates in output?