spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Nicholas Brett Marcott (Jira)" <>
Subject [jira] [Commented] (SPARK-18886) Delay scheduling should not delay some executors indefinitely if one task is scheduled before delay timeout
Date Wed, 04 Dec 2019 08:06:00 GMT


Nicholas Brett Marcott commented on SPARK-18886:

Thanks for mentioning the PRs here.

My proposed solution in the second [PR mentioned above|] is
what I believe Kay said was ideal in comments of this [PR|], but
seemed to think was impractical. 

*The proposed solution:*

Currently the time window that locality wait times are measuring is the time since the last
task launched for a TSM. The proposed change is to instead measure the time since this TSM's
available slots were fully utilized.

The number of available slots for a TSM can be determined by dividing all slots among the
TSMs according to the scheduling policy (FIFO vs FAIR).

*Other possible solutions and their issues:*
 # Never reset timer: delay scheduling would likely only work on first wave*
 # Per slot timer: delay scheduling should apply per task/taskset, otherwise, timers started
by one taskset could cause delay scheduling to be ignored for the next taskset,  which might
lead you to try approach #3
 # Per slot per stage timer: tasks can be starved by being offered unique slots over a period
of time. Possibly a taskset or other job that doesn't care about locality would use those
resources.  Also too many timers/bookkeeping
 # Per task timer: you still need a way to distinguish between when a task is waiting for
a slot to become available vs it has them available but is not utilizing them (which is what
this PR does). To do this right seems to be this PR + more timers.


*wave = one round of running as many tasks as there are available slots for a taskset. imagine
you have 2 slots and 10 tasks. it would take 10 / 2 = 5 waves to complete the taskset


> Delay scheduling should not delay some executors indefinitely if one task is scheduled
before delay timeout
> -----------------------------------------------------------------------------------------------------------
>                 Key: SPARK-18886
>                 URL:
>             Project: Spark
>          Issue Type: Bug
>          Components: Scheduler
>    Affects Versions: 2.1.0
>            Reporter: Imran Rashid
>            Priority: Major
> Delay scheduling can introduce an unbounded delay and underutilization of cluster resources
under the following circumstances:
> 1. Tasks have locality preferences for a subset of available resources
> 2. Tasks finish in less time than the delay scheduling.
> Instead of having *one* delay to wait for resources with better locality, spark waits
> As an example, consider a cluster with 100 executors, and a taskset with 500 tasks. 
Say all tasks have a preference for one executor, which is by itself on one host.  Given the
default locality wait of 3s per level, we end up with a 6s delay till we schedule on other
hosts (process wait + host wait).
> If each task takes 5 seconds (under the 6 second delay), then _all 500_ tasks get scheduled
on _only one_ executor.  This means you're only using a 1% of your cluster, and you get a
~100x slowdown.  You'd actually be better off if tasks took 7 seconds.
> (1) You can change the locality wait times so that it is shorter than the task execution
time.  You need to take into account the sum of all wait times to use all the resources on
your cluster.  For example, if you have resources on different racks, this will include the
sum of "spark.locality.wait.process" + "spark.locality.wait.node" + "spark.locality.wait.rack".
 Those each default to "3s".  The simplest way to be to set "spark.locality.wait.process"
to your desired wait interval, and set both "spark.locality.wait.node" and "spark.locality.wait.rack"
to "0".  For example, if your tasks take ~3 seconds on average, you might set "spark.locality.wait.process"
to "1s".  *NOTE*: due to SPARK-18967, avoid setting the {{spark.locality.wait=0}} -- instead,
use {{spark.locality.wait=1ms}}.
> Note that this workaround isn't perfect --with less delay scheduling, you may not get
as good resource locality.  After this issue is fixed, you'd most likely want to undo these
configuration changes.
> (2) The worst case here will only happen if your tasks have extreme skew in their locality
preferences.  Users may be able to modify their job to controlling the distribution of the
original input data.
> (2a) A shuffle may end up with very skewed locality preferences, especially if you do
a repartition starting from a small number of partitions.  (Shuffle locality preference is
assigned if any node has more than 20% of the shuffle input data -- by chance, you may have
one node just above that threshold, and all other nodes just below it.)  In this case, you
can turn off locality preference for shuffle data by setting {{spark.shuffle.reduceLocality.enabled=false}}

This message was sent by Atlassian Jira

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message