spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ravindra Pesala (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-2408) RDD.map(func) dependencies issue after checkpoint & count
Date Tue, 05 Aug 2014 11:59:11 GMT

    [ https://issues.apache.org/jira/browse/SPARK-2408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14086150#comment-14086150
] 

Ravindra Pesala commented on SPARK-2408:
----------------------------------------

Yes, Daniel is right, it is the problem of Spark shell. I went through the code of spark shell
and ClosureCleaner. I don't think it is an issue of ClosureCleaner as it keeps only the outer
object. But the spark shell generates the class for the last line  str_rdd.map(test).count
in below manner. For each line entering into spark shell it creates one class and imports
the previous attributes/names to it.

class $read extends Serializable {
  class $iwC extends Serializable {
val $VAL3 = $line3.$read.INSTANCE;
import $VAL3.$iw.$iw.`sc`;
class $iwC extends Serializable {
import org.apache.spark.SparkContext._
class $iwC extends Serializable {
val $VAL5 = $line5.$read.INSTANCE;
import $VAL5.$iw.$iw.$iw.$iw.`r`;
val $VAL9 = $line9.$read.INSTANCE;
import $VAL9.$iw.$iw.$iw.$iw.`str_arr`;
val $VAL10 = $line10.$read.INSTANCE;
import $VAL10.$iw.$iw.$iw.$iw.`str_rdd`;
val $VAL11 = $line11.$read.INSTANCE;
import $VAL11.$iw.$iw.$iw.$iw.`test`;
class $iwC extends Serializable {
       val res0 =
              str_rdd.map(test).count
     
}
val $iw = new $iwC;
}
val $iw = new $iwC;
}
val $iw = new $iwC;
}
val $iw = new $iwC;

}
object $read {
  val INSTANCE = new $read();
}

Here the outer object contains all previous imports wrapped in classes. As the statement import
related to val r = new scala.util.Random() is present in one of the outer class, it try to
serialize that and it fails.I think some how we should try to manage the imports of previous
requests of generated code or we should handle this scenarios in ClosureCleaner. Please comment.

> RDD.map(func) dependencies issue after checkpoint & count
> ---------------------------------------------------------
>
>                 Key: SPARK-2408
>                 URL: https://issues.apache.org/jira/browse/SPARK-2408
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 0.9.1, 1.0.0
>            Reporter: Daniel Fry
>
> i am noticing strange behavior with a simple example use of rdd.checkpoint(). 
> you can paste the following code into any spark-shell (e.g. with MASTER=local[*]) 
> // build an array of 100 random lowercase strings of length 10
> val r = new scala.util.Random()
> val str_arr = (1 to 100).map(a => (1 to 10).map(b => new Character(((Math.abs(r.nextInt)
% 26) + 97).toChar)).mkString(""))
> // make this into an rdd
> val str_rdd = sc.parallelize(str_arr)
> // checkpoint & count
> sc.setCheckpointDir("hdfs://[namenode]:54310/path/to/some/spark_checkpoint_dir")
> str_rdd.checkpoint()
> str_rdd.count
> // rdd.map some dummy function
> def test(a : String) : String = { return a }
> str_rdd.map(test).count
> this results in a surprising exception! 
> org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable:
java.io.NotSerializableException: scala.util.Random
>         at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
>         at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
>         at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
>         at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>         at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
>         at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770)
>         at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713)
>         at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697)
>         at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>         at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>         at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



--
This message was sent by Atlassian JIRA
(v6.2#6252)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message