spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Hari Shreedharan (JIRA)" <j...@apache.org>
Subject [jira] [Created] (SPARK-1785) Streaming requires receivers to be serializable
Date Sat, 10 May 2014 22:00:59 GMT
Hari Shreedharan created SPARK-1785:
---------------------------------------

             Summary: Streaming requires receivers to be serializable
                 Key: SPARK-1785
                 URL: https://issues.apache.org/jira/browse/SPARK-1785
             Project: Spark
          Issue Type: Bug
          Components: Streaming
    Affects Versions: 0.9.0
            Reporter: Hari Shreedharan


When the ReceiverTracker starts the receivers it creates a temporary RDD to  send the receivers
over to the workers. Then they are started on the workers  using a the startReceivers method.

Looks like this means that the receivers have to really be serializable. In case of the Flume
receiver, the Avro IPC components are not serializable causing an error that looks like this:
{code}
Exception in thread "Thread-46" org.apache.spark.SparkException: Job aborted due to stage
failure: Task not serializable: java.io.NotSerializableException: org.apache.avro.ipc.specific.SpecificResponder
	- field (class "org.apache.spark.streaming.flume.FlumeReceiver", name: "responder", type:
"class org.apache.avro.ipc.specific.SpecificResponder")
	- object (class "org.apache.spark.streaming.flume.FlumeReceiver", org.apache.spark.streaming.flume.FlumeReceiver@5e6bbb36)
	- element of array (index: 0)
	- array (class "[Lorg.apache.spark.streaming.receiver.Receiver;", size: 1)
	- field (class "scala.collection.mutable.WrappedArray$ofRef", name: "array", type: "class
[Ljava.lang.Object;")
	- object (class "scala.collection.mutable.WrappedArray$ofRef", WrappedArray(org.apache.spark.streaming.flume.FlumeReceiver@5e6bbb36))
	- field (class "org.apache.spark.rdd.ParallelCollectionPartition", name: "values", type:
"interface scala.collection.Seq")
	- custom writeObject data (class "org.apache.spark.rdd.ParallelCollectionPartition")
	- object (class "org.apache.spark.rdd.ParallelCollectionPartition", org.apache.spark.rdd.ParallelCollectionPartition@691)
	- writeExternal data
	- root object (class "org.apache.spark.scheduler.ResultTask", ResultTask(0, 0))
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770)
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713)
	at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
	at akka.actor.ActorCell.invoke(ActorCell.scala:456)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
	at akka.dispatch.Mailbox.run(Mailbox.scala:219)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
{code}

A way out of this is to simply send the class name (or .class) to the workers in the tempRDD
and have the workers instantiate and start the receiver.

My analysis maybe wrong. but if it makes sense, I will submit a PR to fix this.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Mime
View raw message