spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aaron Davidson <>
Subject Re: Duplicate key when sorting BytesWritable with Kryo?
Date Fri, 30 Jan 2015 19:59:35 GMT
Ah, this is in particular an issue due to sort-based shuffle (it was not
the case for hash-based shuffle, which would immediately serialize each
record rather than holding many in memory at once). The documentation
should be updated.

On Fri, Jan 30, 2015 at 11:27 AM, Sandy Ryza <>

> Hi Andrew,
> Here's a note from the doc for sequenceFile:
>     * '''Note:''' Because Hadoop's RecordReader class re-uses the same
> Writable object for each
>     * record, directly caching the returned RDD will create many
> references to the same object.
>     * If you plan to directly cache Hadoop writable objects, you should
> first copy them using
>     * a `map` function.
> This should probably say "direct cachingly *or directly shuffling*".  To
> sort directly from a sequence file, the records need to be cloned first.
> -Sandy
> On Fri, Jan 30, 2015 at 11:20 AM, andrew.rowson <
>> wrote:
>> I've found a strange issue when trying to sort a lot of data in HDFS using
>> spark 1.2.0 (CDH5.3.0). My data is in sequencefiles and the key is a class
>> that derives from BytesWritable (the value is also a BytesWritable). I'm
>> using a custom KryoSerializer to serialize the underlying byte array
>> (basically write the length and the byte array).
>> My spark job looks like this:
>> spark.sequenceFile(inputPath, classOf[CustomKey],
>> classOf[BytesWritable]).sortByKey().map(t =>
>> t._1).saveAsTextFile(outputPath)
>> CustomKey extends BytesWritable, adds a toString method and some other
>> helper methods that extract and convert parts of the underlying byte[].
>> This should simply output a series of textfiles which contain the sorted
>> list of keys. The problem is that under certain circumstances I get many
>> duplicate keys. The number of records output is correct, but it appears
>> that
>> large chunks of the output are simply copies of the last record in that
>> chunk. E.g instead of [1,2,3,4,5,6,7,8,9] I'll see [9,9,9,9,9,9,9,9,9].
>> This appears to happen only above certain input data volumes, and it
>> appears
>> to be when shuffle spills. For a job where shuffle spill for memory and
>> disk
>> = 0B, the data is correct. If there is any spill, I see the duplicate
>> behaviour. Oddly, the shuffle write is much smaller when there's a spill.
>> E.g. the non spill job has 18.8 GB of input and 14.9GB of shuffle write,
>> whereas the spill job has 24.2 GB of input, and only 4.9GB of shuffle
>> write.
>> I'm guessing some sort of compression is happening on duplicate identical
>> values?
>> Oddly, I can fix this issue if I adjust my scala code to insert a map step
>> before the call to sortByKey():
>> .map(t => (new CustomKey(t._1),t._2))
>> This constructor is just:
>> public CustomKey(CustomKey left) { this.set(left); }
>> Why does this work? I've no idea.
>> The spark job is running in yarn-client mode with all the default
>> configuration values set. Using the external shuffle service and disabling
>> spill compression makes no difference.
>> Is this a bug?
>> --
>> View this message in context:
>> Sent from the Apache Spark User List mailing list archive at
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail:
>> For additional commands, e-mail:

View raw message