spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Akhil Das <ak...@sigmoidanalytics.com>
Subject Re: Accumulators : Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext
Date Sun, 26 Oct 2014 15:01:40 GMT
Just tried the below code and works for me, not sure why is sparkContext
being sent inside the mapPartitions function in your case. Can you try with
simple map() instead of mapPartition?

val ac = sc.accumulator(0)
> val or = sc.parallelize(1 to 10000)
> val ps = or.map(x => (x,x+2)).map(x => ac +=1)
> val test = ps.collect()
> println(ac.value)




Thanks
Best Regards

On Sun, Oct 26, 2014 at 1:44 AM, octavian.ganea <octavian.ganea@inf.ethz.ch>
wrote:

> Hi all,
>
> I tried to use accumulators without any success so far.
>
> My code is simple:
>
>   val sc = new SparkContext(conf)
>   val accum = sc.accumulator(0)
>   val partialStats = sc.textFile(f.getAbsolutePath())
>                 .map(line => { val key = line.split("\t").head; (key ,
> line)} )
>                 .groupByKey(128)
>                 .mapPartitions{iter => { accum += 1; foo(iter)}}
>                 .reduce(_ + _)
>   println(accum.value)
>
> Now, if I remove the 'accum += 1', everything works fine. If I keep it, I
> get this weird error:
>
> Exception in thread "main" 14/10/25 21:58:56 INFO TaskSchedulerImpl:
> Cancelling stage 0
> org.apache.spark.SparkException: Job aborted due to stage failure: Task not
> serializable: java.io.NotSerializableException:
> org.apache.spark.SparkContext
>         at
> org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
>         at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
>         at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
>         at
>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>         at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
>         at
> org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770)
>         at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:890)
>         at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$1.apply(DAGScheduler.scala:887)
>         at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$1.apply(DAGScheduler.scala:887)
>         at scala.Option.foreach(Option.scala:236)
>         at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:887)
>         at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:886)
>         at
>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>         at
>
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:886)
>         at
>
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1204)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>         at
>
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
> Can someone please help!
>
> Thank you!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Accumulators-Task-not-serializable-java-io-NotSerializableException-org-apache-spark-SparkContext-tp17262.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Mime
View raw message