spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Paolo Patierno <ppatie...@live.com>
Subject Job aborted due to not serializable exception
Date Wed, 29 Jun 2016 08:19:04 GMT
Hi, 
following the socketStream[T] function implementation from the official spark GitHub repo
:

ef socketStream[T](
      
      
              hostname: String,
      
      
              port: Int,
      
      
              converter: JFunction[InputStream, java.lang.Iterable[T]],
      
      
              storageLevel: StorageLevel)
      
      
          : JavaReceiverInputDStream[T] = {
      
      
            def fn: (InputStream) => Iterator[T] = (x: InputStream) => converter.call(x).iterator().asScala
      
      
            implicit val cmt: ClassTag[T] =
      
      
              implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
      
      
            ssc.socketStream(hostname, port, fn, storageLevel)
      
      
          }

I'm implementing a custom receiver that works great with used in Scala.
I'm trying to use it from Java and the createStream in MyReceiverUtils.scala is the following
:

def createStream[T](
      jssc: JavaStreamingContext,
      host: String,
      port: Int,
      address: String,
      messageConverter: Function[Message, Option[T]],
      storageLevel: StorageLevel
    ): JavaReceiverInputDStream[T] = {


    def fn: (Message) => Option[T] = (x: Message) => messageConverter.call(x)
    implicit val cmt: ClassTag[T] =
      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
    new MyInputDStream(jssc.ssc, host, port, address, fn, storageLevel)
  }

Trying to use it I receive :

org.apache.spark.SparkException: Job aborted due to stage failure: Failed to serialize task
465, not attempting to retry it. Exception during serialization: java.io.NotSerializableException:
org.apache.spark.streaming.amqp.JavaMyReceiverStreamSuite

If I change the fn definition with something simpler like (x: Message) => None for example,
the error goes away. 

Why the call on messageConverter is producing this problem ?

Thanks,
Paolo

 		 	   		  
Mime
View raw message