spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "andrew.rowson" <>
Subject Duplicate key when sorting BytesWritable with Kryo?
Date Fri, 30 Jan 2015 19:20:46 GMT
I've found a strange issue when trying to sort a lot of data in HDFS using
spark 1.2.0 (CDH5.3.0). My data is in sequencefiles and the key is a class
that derives from BytesWritable (the value is also a BytesWritable). I'm
using a custom KryoSerializer to serialize the underlying byte array
(basically write the length and the byte array).

My spark job looks like this:

spark.sequenceFile(inputPath, classOf[CustomKey],
classOf[BytesWritable]).sortByKey().map(t =>

CustomKey extends BytesWritable, adds a toString method and some other
helper methods that extract and convert parts of the underlying byte[].

This should simply output a series of textfiles which contain the sorted
list of keys. The problem is that under certain circumstances I get many
duplicate keys. The number of records output is correct, but it appears that
large chunks of the output are simply copies of the last record in that
chunk. E.g instead of [1,2,3,4,5,6,7,8,9] I'll see [9,9,9,9,9,9,9,9,9]. 

This appears to happen only above certain input data volumes, and it appears
to be when shuffle spills. For a job where shuffle spill for memory and disk
= 0B, the data is correct. If there is any spill, I see the duplicate
behaviour. Oddly, the shuffle write is much smaller when there's a spill.
E.g. the non spill job has 18.8 GB of input and 14.9GB of shuffle write,
whereas the spill job has 24.2 GB of input, and only 4.9GB of shuffle write.
I'm guessing some sort of compression is happening on duplicate identical

Oddly, I can fix this issue if I adjust my scala code to insert a map step
before the call to sortByKey():

.map(t => (new CustomKey(t._1),t._2))

This constructor is just:

public CustomKey(CustomKey left) { this.set(left); }

Why does this work? I've no idea.

The spark job is running in yarn-client mode with all the default
configuration values set. Using the external shuffle service and disabling
spill compression makes no difference.

Is this a bug?

View this message in context:
Sent from the Apache Spark User List mailing list archive at

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message