spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Josh Rosen (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-14658) when executor lost DagScheduer may submit one stage twice even if the first running taskset for this stage is not finished
Date Thu, 16 Feb 2017 22:49:41 GMT

    [ https://issues.apache.org/jira/browse/SPARK-14658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15870816#comment-15870816
] 

Josh Rosen commented on SPARK-14658:
------------------------------------

Here's the logs from my reproduction, excerpted down to only the relevant parts (as near as
I can tell):

First attempt of task set being submitted:

{code}
17/02/13 20:11:59 INFO DAGScheduler: waiting: Set(ShuffleMapStage 3086, ResultStage 3087)
17/02/13 20:11:59 INFO DAGScheduler: failed: Set()
17/02/13 20:11:59 INFO DAGScheduler: Submitting ShuffleMapStage 3086 (MapPartitionsRDD[34696]
at cache at <console>:61), which has no missing parents
17/02/13 20:11:59 INFO MemoryStore: Block broadcast_2871 stored as values in memory (estimated
size 67.1 KB, free 9.1 GB)
17/02/13 20:11:59 INFO MemoryStore: Block broadcast_2871_piece0 stored as bytes in memory
(estimated size 28.1 KB, free 9.1 GB)
17/02/13 20:11:59 INFO BlockManagerInfo: Added broadcast_2871_piece0 in memory on <IP>:45333
(size: 28.1 KB, free: 10.6 GB)
17/02/13 20:11:59 INFO SparkContext: Created broadcast 2871 from broadcast at DAGScheduler.scala:996
17/02/13 20:11:59 INFO DAGScheduler: Submitting 2213 missing tasks from ShuffleMapStage 3086
(MapPartitionsRDD[34696] at cache at <console>:61)
17/02/13 20:11:59 INFO TaskSchedulerImpl: Adding task set 3086.0 with 2213 tasks
17/02/13 20:11:59 INFO FairSchedulableBuilder: Added task set TaskSet_3086.0 tasks to pool
1969095006217179029
{code}

While the task set was running some tasks failed due to fetch failures from the parent stage,
causing both the stage with the fetch failures and the parent stage to be resubmitted:

{code}
17/02/13 20:44:34 WARN TaskSetManager: Lost task 1213.0 in stage 3086.0 (TID 370751, <IP>,
executor 622): FetchFailed(null, shuffleId=638, mapId=-1, reduceId=0, message=
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle
638
    at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:697)
    at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:693)
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
    at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:693)
    at org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:147)
    at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:49)
    at org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:169)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
    at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:957)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:948)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

)
17/02/13 20:44:34 INFO TaskSetManager: Task 1213.0 in stage 3086.0 (TID 370751) failed, but
another instance of the task has already succeeded, so not re-queuing the task to be re-executed.
17/02/13 20:44:34 INFO DAGScheduler: Marking ShuffleMapStage 3086 (cache at <console>:61)
as failed due to a fetch failure from ShuffleMapStage 3085 (cache at <console>:72)
17/02/13 20:44:34 INFO DAGScheduler: ShuffleMapStage 3086 (cache at <console>:61) failed
in 1954.859 s due to org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
location for shuffle 638
    at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:697)
    at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:693)
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
    at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:693)
    at org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:147)
    at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:49)
    at org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:169)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
    at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:957)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:948)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

17/02/13 20:44:34 INFO DAGScheduler: Resubmitting ShuffleMapStage 3085 (cache at <console>:72)
and ShuffleMapStage 3086 (cache at <console>:61) due to fetch failure

{code}

After the missing parent stages were recomputed, a new attempt of the fetch-failed task set
was submitted:

{code}
17/02/13 20:46:40 INFO TaskSchedulerImpl: Removed TaskSet 3085.1, whose tasks have all completed,
from pool 1969095006217179029
17/02/13 20:46:40 INFO DAGScheduler: ShuffleMapStage 3085 (cache at <console>:72) finished
in 0.958 s
17/02/13 20:46:40 INFO DAGScheduler: looking for newly runnable stages
17/02/13 20:46:40 INFO DAGScheduler: running: Set()
17/02/13 20:46:40 INFO DAGScheduler: waiting: Set(ShuffleMapStage 3086, ResultStage 3087)
17/02/13 20:46:40 INFO DAGScheduler: failed: Set()
17/02/13 20:46:40 INFO DAGScheduler: Submitting ShuffleMapStage 3086 (MapPartitionsRDD[34696]
at cache at <console>:61), which has no missing parents
17/02/13 20:46:40 INFO MemoryStore: Block broadcast_2876 stored as values in memory (estimated
size 67.1 KB, free 9.1 GB)
17/02/13 20:46:40 INFO MemoryStore: Block broadcast_2876_piece0 stored as bytes in memory
(estimated size 28.2 KB, free 9.1 GB)
17/02/13 20:46:40 INFO BlockManagerInfo: Added broadcast_2876_piece0 in memory on <IP>:45333
(size: 28.2 KB, free: 10.6 GB)
17/02/13 20:46:40 INFO SparkContext: Created broadcast 2876 from broadcast at DAGScheduler.scala:996
17/02/13 20:46:40 INFO DAGScheduler: Submitting 1081 missing tasks from ShuffleMapStage 3086
(MapPartitionsRDD[34696] at cache at <console>:61)
17/02/13 20:46:40 INFO TaskSchedulerImpl: Adding task set 3086.1 with 1081 tasks
17/02/13 20:46:40 INFO FairSchedulableBuilder: Added task set TaskSet_3086.1 tasks to pool
1969095006217179029
17/02/13 20:46:40 INFO TaskSetManager: Starting task 0.0 in stage 3086.1 (TID 370883, <IP>,
executor 618, partition 7, PROCESS_LOCAL, 9477 bytes)
[...]
{code}


The new attempt made steady progress, but after some time the original task set 3086.0 is
removed after all of its running tasks finish:

{code}
17/02/13 21:21:35 INFO BlockManagerMaster: Removal of executor 620 requested
17/02/13 21:21:35 INFO BlockManagerMasterEndpoint: Trying to remove executor 620 from BlockManagerMaster.
17/02/13 21:26:26 INFO BlockManagerInfo: Added rdd_34686_721 in memory on <IP>:43591
(size: 172.9 MB, free: 9.9 GB)
17/02/13 21:26:31 INFO TaskSetManager: Finished task 68.0 in stage 3086.1 (TID 370951) in
2375575 ms on <IP> (executor 488) (1074/1081)
17/02/13 21:29:10 INFO BlockManagerInfo: Added rdd_34686_232 in memory on <IP>:34541
(size: 267.2 MB, free: 9.8 GB)
17/02/13 21:29:16 INFO TaskSetManager: Finished task 20.0 in stage 3086.1 (TID 370903) in
2556399 ms on <IP> (executor 607) (1075/1081)
17/02/13 21:30:07 INFO BlockManagerInfo: Added rdd_34686_745 in memory on <IP>:39432
(size: 178.3 MB, free: 8.7 GB)
17/02/13 21:30:12 INFO TaskSetManager: Finished task 745.1 in stage 3086.0 (TID 370582) in
2900631 ms on <IP> (executor 509) (1211/2213)
17/02/13 21:30:39 INFO BlockManagerInfo: Added rdd_34686_745 in memory on <IP>:46234
(size: 178.3 MB, free: 9.9 GB)
17/02/13 21:30:43 INFO TaskSetManager: Finished task 73.0 in stage 3086.1 (TID 370956) in
2625162 ms on <IP> (executor 615) (1076/1081)
17/02/13 21:31:20 INFO BlockManagerInfo: Added rdd_34686_634 in memory on <IP>:43591
(size: 329.9 MB, free: 9.6 GB)
17/02/13 21:33:55 INFO BlockManagerInfo: Added rdd_34686_708 in memory on <IP>:38543
(size: 355.5 MB, free: 9.5 GB)
17/02/13 21:34:37 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20170209171214-0007/615
is now LOST (worker lost)
17/02/13 21:34:37 INFO StandaloneSchedulerBackend: Executor app-20170209171214-0007/615 removed:
worker lost
17/02/13 21:34:37 ERROR TaskSchedulerImpl: Lost executor 615 on <IP>: worker lost
17/02/13 21:34:37 INFO DAGScheduler: Executor lost: 615 (epoch 1335)
17/02/13 21:34:37 INFO BlockManagerMasterEndpoint: Trying to remove executor 615 from BlockManagerMaster.
17/02/13 21:34:37 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(615,
<IP>, 46234, None)
17/02/13 21:34:37 INFO BlockManagerMaster: Removed 615 successfully in removeExecutor
17/02/13 21:34:37 INFO DAGScheduler: Shuffle files lost for executor: 615 (epoch 1335)
17/02/13 21:34:37 INFO ShuffleMapStage: ShuffleMapStage 3082 is now unavailable on executor
615 (113/128, false)
17/02/13 21:34:37 INFO ShuffleMapStage: ShuffleMapStage 3086 is now unavailable on executor
615 (1963/2213, false)
17/02/13 21:34:37 INFO ShuffleMapStage: ShuffleMapStage 3084 is now unavailable on executor
615 (434/470, false)
17/02/13 21:34:37 INFO ShuffleMapStage: ShuffleMapStage 3085 is now unavailable on executor
615 (180/200, false)
17/02/13 21:34:37 INFO ShuffleMapStage: ShuffleMapStage 3083 is now unavailable on executor
615 (194/200, false)
17/02/13 21:34:37 WARN TransportChannelHandler: Exception in connection from /<IP>:41438
java.io.IOException: Connection reset by peer
    at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
    at sun.nio.ch.IOUtil.read(IOUtil.java:192)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
    at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221)
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899)
    at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:275)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:652)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
    at java.lang.Thread.run(Thread.java:745)
17/02/13 21:34:37 INFO BlockManagerMaster: Removal of executor 615 requested
17/02/13 21:34:37 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Asked to remove non-existent
executor 615
17/02/13 21:34:37 INFO BlockManagerMasterEndpoint: Trying to remove executor 615 from BlockManagerMaster.
17/02/13 21:41:19 INFO TaskSetManager: Finished task 639.1 in stage 3086.0 (TID 370583) in
3567620 ms on <IP> (executor 606) (1214/2213)
17/02/13 21:41:19 INFO TaskSchedulerImpl: Removed TaskSet 3086.0, whose tasks have all completed,
from pool 1969095006217179029
17/02/13 21:42:42 INFO BlockManagerInfo: Added rdd_34686_639 in memory on <IP>:40605
(size: 328.3 MB, free: 9.8 GB)
17/02/13 21:42:42 INFO BlockManagerInfo: Removed broadcast_2871_piece0 on <IP>:34149
in memory (size: 28.1 KB, free: 10.3 GB)
17/02/13 21:42:42 INFO BlockManagerInfo: Removed broadcast_2871_piece0 on <IP>:40605
in memory (size: 28.1 KB, free: 9.8 GB)
{code}

A bunch more time passes and then the error mentioned in this ticket occurs:

{code}
17/02/13 21:46:58 INFO TaskSetManager: Finished task 0.0 in stage 3085.2 (TID 372035) in 911
ms on <IP> (executor 619) (27/28)
17/02/13 21:46:58 INFO TaskSetManager: Finished task 24.0 in stage 3085.2 (TID 372059) in
1016 ms on <IP> (executor 619) (28/28)
17/02/13 21:46:58 INFO TaskSchedulerImpl: Removed TaskSet 3085.2, whose tasks have all completed,
from pool 1969095006217179029
17/02/13 21:46:58 INFO DAGScheduler: ShuffleMapStage 3085 (cache at <console>:72) finished
in 1.021 s
17/02/13 21:46:58 INFO DAGScheduler: looking for newly runnable stages
17/02/13 21:46:58 INFO DAGScheduler: running: Set()
17/02/13 21:46:58 INFO DAGScheduler: waiting: Set(ShuffleMapStage 3086, ResultStage 3087)
17/02/13 21:46:58 INFO DAGScheduler: failed: Set()
17/02/13 21:46:58 INFO DAGScheduler: Submitting ShuffleMapStage 3086 (MapPartitionsRDD[34696]
at cache at <console>:61), which has no missing parents
17/02/13 21:46:58 INFO MemoryStore: Block broadcast_2883 stored as values in memory (estimated
size 67.1 KB, free 9.1 GB)
17/02/13 21:46:58 INFO MemoryStore: Block broadcast_2883_piece0 stored as bytes in memory
(estimated size 28.2 KB, free 9.1 GB)
17/02/13 21:46:58 INFO BlockManagerInfo: Added broadcast_2883_piece0 in memory on <IP>:45333
(size: 28.2 KB, free: 10.6 GB)
17/02/13 21:46:58 INFO SparkContext: Created broadcast 2883 from broadcast at DAGScheduler.scala:996
17/02/13 21:46:58 INFO DAGScheduler: Submitting 353 missing tasks from ShuffleMapStage 3086
(MapPartitionsRDD[34696] at cache at <console>:61)
17/02/13 21:46:58 INFO TaskSchedulerImpl: Adding task set 3086.2 with 353 tasks
17/02/13 21:46:58 ERROR DAGSchedulerEventProcessLoop: DAGSchedulerEventProcessLoop failed;
shutting down SparkContext
java.lang.IllegalStateException: more than one active taskSet for stage 3086: 3086.2,3086.1
    at org.apache.spark.scheduler.TaskSchedulerImpl.submitTasks(TaskSchedulerImpl.scala:185)
    at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1043)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:918)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$submitWaitingChildStages$6.apply(DAGScheduler.scala:765)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$submitWaitingChildStages$6.apply(DAGScheduler.scala:764)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
    at org.apache.spark.scheduler.DAGScheduler.submitWaitingChildStages(DAGScheduler.scala:764)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1228)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1647)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
17/02/13 21:46:58 INFO TaskSchedulerImpl: Cancelling stage 3086
17/02/13 21:46:58 ERROR DAGSchedulerEventProcessLoop: DAGScheduler failed to cancel all jobs.
java.lang.NullPointerException
    at org.apache.spark.scheduler.TaskSchedulerImpl.taskSetFinished(TaskSchedulerImpl.scala:247)
    at org.apache.spark.scheduler.TaskSetManager.maybeFinishTaskSet(TaskSetManager.scala:489)
    at org.apache.spark.scheduler.TaskSetManager.abort(TaskSetManager.scala:813)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$2$$anonfun$apply$3.apply(TaskSchedulerImpl.scala:229)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$2$$anonfun$apply$3.apply(TaskSchedulerImpl.scala:218)
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
    at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
    at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$2.apply(TaskSchedulerImpl.scala:218)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$2.apply(TaskSchedulerImpl.scala:217)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.TaskSchedulerImpl.cancelTasks(TaskSchedulerImpl.scala:217)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages$1.apply$mcVI$sp(DAGScheduler.scala:1461)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages$1.apply(DAGScheduler.scala:1447)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages$1.apply(DAGScheduler.scala:1447)
    at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1447)
    at org.apache.spark.scheduler.DAGScheduler.handleJobCancellation(DAGScheduler.scala:1375)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$doCancelAllJobs$1.apply$mcVI$sp(DAGScheduler.scala:721)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$doCancelAllJobs$1.apply(DAGScheduler.scala:721)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$doCancelAllJobs$1.apply(DAGScheduler.scala:721)
    at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
    at org.apache.spark.scheduler.DAGScheduler.doCancelAllJobs(DAGScheduler.scala:721)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onError(DAGScheduler.scala:1659)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:52)
{code}


> when executor lost DagScheduer may submit one stage twice even if the first running taskset
for this stage is not finished
> --------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-14658
>                 URL: https://issues.apache.org/jira/browse/SPARK-14658
>             Project: Spark
>          Issue Type: Bug
>          Components: Scheduler
>    Affects Versions: 1.6.1, 2.0.0, 2.1.0, 2.2.0
>         Environment: spark1.6.1  hadoop-2.6.0-cdh5.4.2
>            Reporter: yixiaohua
>
> {code}
> 16/04/14 15:35:22 ERROR DAGSchedulerEventProcessLoop: DAGSchedulerEventProcessLoop failed;
shutting down SparkContext
> java.lang.IllegalStateException: more than one active taskSet for stage 57: 57.2,57.1
>         at org.apache.spark.scheduler.TaskSchedulerImpl.submitTasks(TaskSchedulerImpl.scala:173)
>         at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1052)
>         at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:921)
>         at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1214)
>         at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1637)
>         at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
>         at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
>         at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> {code}
> First Time:
> {code}
> 16/04/14 15:35:20 INFO DAGScheduler: Resubmitting ShuffleMapStage 57 (run at AccessController.java:-2)
because some of its tasks had failed: 5, 8, 9, 12, 13, 16, 17, 18, 19, 23, 26, 27, 28, 29,
30, 31, 40, 42, 43, 48, 49, 50, 51, 52, 53, 55, 56, 57, 59, 60, 61, 67, 70, 71, 84, 85, 86,
87, 98, 99, 100, 101, 108, 109, 110, 111, 112, 113, 114, 115, 126, 127, 134, 136, 137, 146,
147, 150, 151, 154, 155, 158, 159, 162, 163, 164, 165, 166, 167, 170, 171, 172, 173, 174,
175, 176, 177, 178, 179, 180, 181, 188, 189, 190, 191, 198, 199, 204, 206, 207, 208, 218,
219, 222, 223, 230, 231, 236, 238, 239
> 16/04/14 15:35:20 DEBUG DAGScheduler: submitStage(ShuffleMapStage 57)
> 16/04/14 15:35:20 DEBUG DAGScheduler: missing: List()
> 16/04/14 15:35:20 INFO DAGScheduler: Submitting ShuffleMapStage 57 (MapPartitionsRDD[7887]
at run at AccessController.java:-2), which has no missing parents
> 16/04/14 15:35:20 DEBUG DAGScheduler: submitMissingTasks(ShuffleMapStage 57)
> 16/04/14 15:35:20 INFO DAGScheduler: Submitting 100 missing tasks from ShuffleMapStage
57 (MapPartitionsRDD[7887] at run at AccessController.java:-2)
> 16/04/14 15:35:20 DEBUG DAGScheduler: New pending partitions: Set(206, 177, 127, 98,
48, 27, 23, 163, 238, 188, 159, 28, 109, 59, 9, 176, 126, 207, 174, 43, 170, 208, 158, 108,
29, 8, 204, 154, 223, 173, 219, 190, 111, 61, 40, 136, 115, 86, 57, 155, 55, 230, 222, 180,
172, 151, 101, 18, 166, 56, 137, 87, 52, 171, 71, 42, 167, 198, 67, 17, 236, 165, 13, 5, 53,
178, 99, 70, 49, 218, 147, 164, 114, 85, 60, 31, 179, 150, 19, 100, 50, 175, 146, 134, 113,
84, 51, 30, 199, 26, 16, 191, 162, 112, 12, 239, 231, 189, 181, 110)
> {code}
> Second Time:
> {code}
> 16/04/14 15:35:22 INFO DAGScheduler: Resubmitting ShuffleMapStage 57 (run at AccessController.java:-2)
because some of its tasks had failed: 26
> 16/04/14 15:35:22 DEBUG DAGScheduler: submitStage(ShuffleMapStage 57)
> 16/04/14 15:35:22 DEBUG DAGScheduler: missing: List()
> 16/04/14 15:35:22 INFO DAGScheduler: Submitting ShuffleMapStage 57 (MapPartitionsRDD[7887]
at run at AccessController.java:-2), which has no missing parents
> 16/04/14 15:35:22 DEBUG DAGScheduler: submitMissingTasks(ShuffleMapStage 57)
> 16/04/14 15:35:22 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage
57 (MapPartitionsRDD[7887] at run at AccessController.java:-2)
> 16/04/14 15:35:22 DEBUG DAGScheduler: New pending partitions: Set(26)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message