spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Imran Rashid (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-19263) DAGScheduler should avoid sending conflicting task set.
Date Sun, 19 Feb 2017 22:44:44 GMT

     [ https://issues.apache.org/jira/browse/SPARK-19263?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Imran Rashid updated SPARK-19263:
---------------------------------
    Fix Version/s:     (was: 1.2.0)
                   2.2.0

> DAGScheduler should avoid sending conflicting task set.
> -------------------------------------------------------
>
>                 Key: SPARK-19263
>                 URL: https://issues.apache.org/jira/browse/SPARK-19263
>             Project: Spark
>          Issue Type: Bug
>          Components: Scheduler
>    Affects Versions: 2.1.0
>            Reporter: jin xing
>            Assignee: jin xing
>             Fix For: 2.2.0
>
>
> In current *DAGScheduler handleTaskCompletion* code, when *event.reason* is *Success*,
it will first do *stage.pendingPartitions -= task.partitionId*, which maybe a bug when *FetchFailed*
happens. Think about below:
> # Stage 0 runs and generates shuffle output data.
> # Stage 1 reads the output from stage 0 and generates more shuffle data. It has two tasks:
ShuffleMapTask1 and ShuffleMapTask2, and these tasks are launched on executorA.
> # ShuffleMapTask1 fails to fetch blocks locally and sends a FetchFailed to the driver.
The driver marks executorA as lost and updates failedEpoch;
> # The driver resubmits stage 0 so the missing output can be re-generated, and then once
it completes, resubmits stage 1 with ShuffleMapTask1x and ShuffleMapTask2x.
> # ShuffleMapTask2 (from the original attempt of stage 1) successfully finishes on executorA
and sends Success back to driver. This causes DAGScheduler::handleTaskCompletion to remove
partition 2 from stage.pendingPartitions (line 1149), but it does not add the partition to
the set of output locations (line 1192), because the task’s epoch is less than the failure
epoch for the executor (because of the earlier failure on executor A)
> # ShuffleMapTask1x successfully finishes on executorB, causing the driver to remove partition
1 from stage.pendingPartitions. Combined with the previous step, this means that there are
no more pending partitions for the stage, so the DAGScheduler marks the stage as finished
(line 1196). However, the shuffle stage is not available (line 1215) because the completion
for ShuffleMapTask2 was ignored because of its epoch, so the DAGScheduler resubmits the stage.
> # ShuffleMapTask2x is still running, so when TaskSchedulerImpl::submitTasks is called
for the re-submitted stage, it throws an error, because there’s an existing active task
set
> To reproduce the bug:
> 1. We need to do some modification in *ShuffleBlockFetcherIterator*: check whether the
task's index in *TaskSetManager* and stage attempt equal to 0 at the same time, if so, throw
FetchFailedException;
> 2. Rebuild spark then submit following job:
> {code}
>     val rdd = sc.parallelize(List((0, 1), (1, 1), (2, 1), (3, 1), (1, 2), (0, 3), (2,
1), (3, 1)), 2)
>     rdd.reduceByKey {
>       (v1, v2) => {
>         Thread.sleep(10000)
>         v1 + v2
>       }
>     }.map {
>       keyAndValue => {
>         (keyAndValue._1 % 2, keyAndValue._2)
>       }
>     }.reduceByKey {
>       (v1, v2) => {
>         Thread.sleep(10000)
>         v1 + v2
>       }
>     }.collect
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message