spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Paolo Patierno <>
Subject Job aborted due to not serializable exception
Date Wed, 29 Jun 2016 08:19:04 GMT
following the socketStream[T] function implementation from the official spark GitHub repo

ef socketStream[T](
              hostname: String,
              port: Int,
              converter: JFunction[InputStream, java.lang.Iterable[T]],
              storageLevel: StorageLevel)
          : JavaReceiverInputDStream[T] = {
            def fn: (InputStream) => Iterator[T] = (x: InputStream) =>
            implicit val cmt: ClassTag[T] =
            ssc.socketStream(hostname, port, fn, storageLevel)

I'm implementing a custom receiver that works great with used in Scala.
I'm trying to use it from Java and the createStream in MyReceiverUtils.scala is the following

def createStream[T](
      jssc: JavaStreamingContext,
      host: String,
      port: Int,
      address: String,
      messageConverter: Function[Message, Option[T]],
      storageLevel: StorageLevel
    ): JavaReceiverInputDStream[T] = {

    def fn: (Message) => Option[T] = (x: Message) =>
    implicit val cmt: ClassTag[T] =
    new MyInputDStream(jssc.ssc, host, port, address, fn, storageLevel)

Trying to use it I receive :

org.apache.spark.SparkException: Job aborted due to stage failure: Failed to serialize task
465, not attempting to retry it. Exception during serialization:

If I change the fn definition with something simpler like (x: Message) => None for example,
the error goes away. 

Why the call on messageConverter is producing this problem ?


View raw message