spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Rahul Palamuttam <rahulpala...@gmail.com>
Subject mutable.LinkedHashMap kryo serialization issues
Date Mon, 22 Aug 2016 15:12:03 GMT
Hi,

Just sending this again to see if others have had this issue.

I recently switched to using kryo serialization and I've been running into errors
with the mutable.LinkedHashMap class.

If I don't register the mutable.LinkedHashMap class then I get an ArrayStoreException seen
below.
If I do register the class, then when the LinkedHashMap is collected on the driver, it does
not contain any elements.

Here is the snippet of code I used : 
val sc = new SparkContext(new SparkConf()
  .setMaster("local[*]")
  .setAppName("Sample")
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .registerKryoClasses(Array(classOf[mutable.LinkedHashMap[String, String]])))

val collect = sc.parallelize(0 to 10)
  .map(p => new mutable.LinkedHashMap[String, String]() ++= Array(("hello", "bonjour"),
("good", "bueno")))

val mapSideSizes = collect.map(p => p.size).collect()(0)
val driverSideSizes = collect.collect()(0).size

println("The sizes before collect : " + mapSideSizes)
println("The sizes after collect : " + driverSideSizes)

** The following only occurs if I did not register the mutable.LinkedHashMap class **
16/08/20 18:10:38 ERROR TaskResultGetter: Exception while getting task result
java.lang.ArrayStoreException: scala.collection.mutable.HashMap
	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
	at org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:311)
	at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:97)
	at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:60)
	at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
	at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741)
	at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

I hope this is a known issue and/or I'm missing something important in my setup.
Appreciate any help or advice!

Best,

Rahul Palamuttam
Mime
View raw message