spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Holden Karau <hol...@pigscanfly.ca>
Subject Re: Recommended way to serialize Hadoop Writables' in Spark
Date Sun, 03 Dec 2017 22:41:49 GMT
So is there a reason you want to shuffle Hadoop types rather than the Java
types?

As for your specific question, for Kyro you also need to register your
serializers, did you do that?

On Sun, Dec 3, 2017 at 10:02 AM pradeepbaji <pradeepbaji@gmail.com> wrote:

> Hi,
>
> Is there any recommended way of serializing Hadoop Writables' in Spark?
> Here is my problem.
>
> Question1:
> I have a pair RDD which is created by reading a SEQ[LongWritable,
> BytesWritable]:
> RDD[(LongWritable, BytesWritable)]
>
> I have these two settings set in spark conf.
> spark.serializer=org.apache.spark.serializer.KryoSerializer
> spark.kryo.registrator=MyCustomRegistrator
>
> Inside the MyCustomRegistrator, I registered both LongWritable and
> BytesWritable classes.
> kryo.register(classOf[LongWritable])
> kryo.register(classOf[BytesWritable])
>
> The total size of the SEQ[LongWritable, BytesWritable] that I read to
> create
> the RDD[(LongWritable, BytesWritable)] is *800MB*. I have 10 executors in
> my
> job with 10GB of memory. I am performing reduceByKey operation on this RDD
> and I see very huge Shuffle writes of 10GB on each executor which doesn't
> make any sense. Also the reduceByKey stage is very very slow and sometimes
> executors throw OOM exception.
>
> Can someone explain this shuffle behavior in Spark? Why does Spark show
> 100GB of shuffle writes for 800MB if input data?
> Also when I convert RDD[(LongWritable,BytesWritable)] to RDD[Long,
> CustomObject] , the reduceByKey operation takes only 30 seconds to finish
> and is very fast.
>
> Question2:
> Now for the same job this time I wrote custom serializers for LongWritable
> and BytesWritable. Here is the code.
>
>
> import com.esotericsoftware.kryo.{Kryo, Serializer}
> import com.esotericsoftware.kryo.io.{Input, Output}
> import org.apache.hadoop.io.{BytesWritable, LongWritable}
> /**
>   * Kryo Custom Serializer for serializing LongWritable
>   */
> class LongWritableSerializer extends Serializer[LongWritable] {
>   override def write(kryo: Kryo, output: Output, obj: LongWritable): Unit =
> {
>     output.writeLong(obj.get())
>   }
>   override def read(kryo: Kryo,
>                     input: Input,
>                     clazz: Class[LongWritable]): LongWritable = {
>     val longVal = input.readLong()
>     new LongWritable(longVal)
>   }
> }
> /**
>   * Kryo Custom Serializer for serializing BytesWritable
>   */
> class BytesWritableSerializer extends Serializer[BytesWritable] {
>   override def write(kryo: Kryo, output: Output, obj: BytesWritable): Unit
> =
> {
>     val bytes = obj.getBytes
>     output.writeInt(bytes.size)
>     output.writeBytes(bytes)
>   }
>   override def read(kryo: Kryo,
>                     input: Input,
>                     clazz: Class[BytesWritable]): BytesWritable = {
>     val length = input.readInt()
>     val bytes = input.readBytes(length)
>     new BytesWritable(bytes)
>   }
> }
>
>
>
> And then I registered these with Kryo inside MyCustomRegistrator.
> kryo.register(classOf[LongWritable], new LongWritableSerializer())
> kryo.register(classOf[BytesWritable], new BytesWritableSerializer())
>
> I still see the same behavior. Can someone also check this?
>
>
> Thanks,
> Pradeep
>
>
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>
> --
Twitter: https://twitter.com/holdenkarau

Mime
View raw message