spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Padmanabhan, Mahesh (contractor) <mahesh.padmanab...@twc-contractor.com>
Subject Custom receiver runtime Kryo exception
Date Mon, 05 Jan 2015 17:57:44 GMT
Hello all,

I am using Spark 1.0.2 and I have a custom receiver that works well.

I tried adding Kryo serialization to SparkConf:

val spark = new SparkConf()
    …..
    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

 and I am getting a strange error that I am not sure how to solve:

Exception in thread "Thread-37" org.apache.spark.SparkException: Job aborted due to stage
failure: Task 0.0:0 failed 1 times, most recent failure: Exception failure in TID 0 on host
localhost: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException:
ConfigObject is immutable, you can't call Map.put
Serialization trace:
object (com.typesafe.config.impl.SimpleConfig)
atomFeedConf (com.twc.needle.cp.AtomFeedReceiver)
        com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
        com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
        com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
        com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
        com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
        com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
        com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
        com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
        com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)

        ……

Here is part of my custom receiver:

class AtomFeedReceiver
    extends Receiver[String](StorageLevel.MEMORY_ONLY_SER) {

  private val conf = ConfigFactory.load
  private val atomFeedConf = conf.getConfig("cp.spark.atomfeed")

  private val atomFeedUrl = atomFeedConf.getString("url")
  private val urlConnTimeout = atomFeedConf.getInt("url_conn_timeout")
  private val urlReadTimeout = atomFeedConf.getInt("url_read_timeout")
  private val username = atomFeedConf.getString("username")
  private val password = atomFeedConf.getString("password")
  private val keepFeedCriteria = atomFeedConf.getString("keep_feed_criteria")
  private val feedTrackerDir = atomFeedConf.getString("feed_tracker_dir")
  private val feedTrackerFileName = atomFeedConf.getString("feed_tracker_file_name")
  private val enableSampling = atomFeedConf.getBoolean("enable_sampling”)

  …..


Here is how I am calling the receiver in the main method:

val logLineStreamRaw = ssc.receiverStream(new AtomFeedReceiver)

Any idea why Spark needs the Config object to be mutable only when Kryo serialization is enabled?

Thanks.


________________________________
This E-mail and any of its attachments may contain Time Warner Cable proprietary information,
which is privileged, confidential, or subject to copyright belonging to Time Warner Cable.
This E-mail is intended solely for the use of the individual or entity to which it is addressed.
If you are not the intended recipient of this E-mail, you are hereby notified that any dissemination,
distribution, copying, or action taken in relation to the contents of and attachments to this
E-mail is strictly prohibited and may be unlawful. If you have received this E-mail in error,
please notify the sender immediately and permanently delete the original and any copy of this
E-mail and any printout.

Mime
View raw message