spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sandy Ryza <>
Subject Re: Duplicate key when sorting BytesWritable with Kryo?
Date Fri, 30 Jan 2015 19:27:02 GMT
Hi Andrew,

Here's a note from the doc for sequenceFile:

    * '''Note:''' Because Hadoop's RecordReader class re-uses the same
Writable object for each
    * record, directly caching the returned RDD will create many references
to the same object.
    * If you plan to directly cache Hadoop writable objects, you should
first copy them using
    * a `map` function.

This should probably say "direct cachingly *or directly shuffling*".  To
sort directly from a sequence file, the records need to be cloned first.


On Fri, Jan 30, 2015 at 11:20 AM, andrew.rowson <> wrote:

> I've found a strange issue when trying to sort a lot of data in HDFS using
> spark 1.2.0 (CDH5.3.0). My data is in sequencefiles and the key is a class
> that derives from BytesWritable (the value is also a BytesWritable). I'm
> using a custom KryoSerializer to serialize the underlying byte array
> (basically write the length and the byte array).
> My spark job looks like this:
> spark.sequenceFile(inputPath, classOf[CustomKey],
> classOf[BytesWritable]).sortByKey().map(t =>
> t._1).saveAsTextFile(outputPath)
> CustomKey extends BytesWritable, adds a toString method and some other
> helper methods that extract and convert parts of the underlying byte[].
> This should simply output a series of textfiles which contain the sorted
> list of keys. The problem is that under certain circumstances I get many
> duplicate keys. The number of records output is correct, but it appears
> that
> large chunks of the output are simply copies of the last record in that
> chunk. E.g instead of [1,2,3,4,5,6,7,8,9] I'll see [9,9,9,9,9,9,9,9,9].
> This appears to happen only above certain input data volumes, and it
> appears
> to be when shuffle spills. For a job where shuffle spill for memory and
> disk
> = 0B, the data is correct. If there is any spill, I see the duplicate
> behaviour. Oddly, the shuffle write is much smaller when there's a spill.
> E.g. the non spill job has 18.8 GB of input and 14.9GB of shuffle write,
> whereas the spill job has 24.2 GB of input, and only 4.9GB of shuffle
> write.
> I'm guessing some sort of compression is happening on duplicate identical
> values?
> Oddly, I can fix this issue if I adjust my scala code to insert a map step
> before the call to sortByKey():
> .map(t => (new CustomKey(t._1),t._2))
> This constructor is just:
> public CustomKey(CustomKey left) { this.set(left); }
> Why does this work? I've no idea.
> The spark job is running in yarn-client mode with all the default
> configuration values set. Using the external shuffle service and disabling
> spill compression makes no difference.
> Is this a bug?
> --
> View this message in context:
> Sent from the Apache Spark User List mailing list archive at
> ---------------------------------------------------------------------
> To unsubscribe, e-mail:
> For additional commands, e-mail:

View raw message