Hi, I have spark job that produces duplicates when one or tasks from
repartition stage fails.
Here is simplified code.
sparkContext.setCheckpointDir("hdfs://path-to-checkpoint-dir")
*val *inputRDDs: List[RDD[String]] = *List*.*empty *// an RDD per input dir
*val *updatedRDDs = inputRDDs.map{ inputRDD => // some stuff happens here
inputRDD
.filter(*???*)
.map(*???*)
}
*val *unionOfUpdatedRDDs = sparkContext.union(updatedRDDs)
unionOfUpdatedRDDs.checkpoint() // id didn't help
unionOfUpdatedRDDs
.repartition(42) // task failed here,
.saveAsNewAPIHadoopFile("/path") // task failed here too.
// what really causes duplicates in output?
|