spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Saisai Shao (JIRA)" <>
Subject [jira] [Commented] (SPARK-5206) Accumulators are not re-registered during recovering from checkpoint
Date Tue, 27 Jan 2015 02:54:41 GMT


Saisai Shao commented on SPARK-5206:

IMHO I think this is a general problem in Spark Streaming, any variable which should be registered
both in driver and executor side will lead to error when recovering from failure if the behavior
of readObject lacks of driver re-register.

Also object like broadcast variable will also meet exception when recovering from checkpoint,
since actual data is lost in executor side, and recovery from driver side is not possible
if I understand correctly.

> Accumulators are not re-registered during recovering from checkpoint
> --------------------------------------------------------------------
>                 Key: SPARK-5206
>                 URL:
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.1.0
>            Reporter: vincent ye
> I got exception as following while my streaming application restarts from crash from
> 15/01/12 10:31:06 ERROR scheduler.DAGScheduler:
Failed to update accumulators for ShuffleMapTask(41, 4)
> java.util.NoSuchElementException: key not found: 1
> 	at scala.collection.MapLike$class.default(MapLike.scala:228)
> 	at scala.collection.AbstractMap.default(Map.scala:58)
> 	at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:939)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:938)
> 	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> 	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> 	at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
> 	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> 	at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
> 	at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938)
> 	at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1388)
> 	at
> 	at
> 	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> 	at
> 	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(
> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(
> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> 	at
> I guess that an Accumulator is registered to a singleton Accumulators in Line 58 of org.apache.spark.Accumulable:
> Accumulators.register(this, true)
> This code need to be executed in the driver once. But when the application is recovered
from checkpoint. It won't be executed in the driver. So when the driver process it at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938),
It can't find the Accumulator because it's not re-register during the recovery.

This message was sent by Atlassian JIRA

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message