It would be nice to have a code-configurable fall-back plan for such cases. Any generalized solution can cause problems elsewhere.


Simply replicating hot cached blocks would be complicated to maintain and could cause OOME. In the case I described on the JIRA, the hot partition will be changing e.g. every hour. Even though the persistence is calculated to be 1xMEM_SERIALIZED, replicating it will eventually break this contract and cause OOME. Of course in some cases the hot partition will be the same so it makes sense to replicate (possibly even to every node).


What would be very helpful, would be a way to configure caching/scheduling on the RDD level. So something like this would suit most cases (Simplified as it would require much more thought):

RDD.maxPartitionCache=5: Maximum number of times a partition can be cached

RDD.maxTTLMillis=60000: Simple time based eviction policy to drop extra copied after X millis of inactivity. Alternatively, these copies could have a lower priority when BlockManager evicts cached RDDs.

RDD.nonNodePolicy=Recompute: A hint that if a task is not accepted by LOCAL or NODE to re-compute the RDD. (Note that the profiling evidence of mentioned Jira was evenly distributed when RDD was not cached)


PS. I don’t have adequate Scala/Spark source knowledge to suggest an actual solution or make sure that what I am suggesting is even possible




From: Reynold Xin []
Sent: 09 March 2016 04:20
To: Prabhu Joseph
Cc: user; Spark dev list
Subject: Re: Spark Scheduler creating Straggler Node


You just want to be able to replicate hot cached blocks right?

On Tuesday, March 8, 2016, Prabhu Joseph <> wrote:

Hi All,

    When a Spark Job is running, and one of the Spark Executor on Node A has some partitions cached. Later for some other stage, Scheduler tries to assign a task to Node A to process a cached partition (PROCESS_LOCAL). But meanwhile the Node A is occupied with some other
tasks and got busy. Scheduler waits for spark.locality.wait interval and times out and tries to find some other node B which is NODE_LOCAL. The executor on Node B will try to get the cached partition from Node A which adds network IO to node and also some extra CPU for I/O. Eventually,
every node will have a task that is waiting to fetch some cached partition from node A and so the spark job / cluster is basically blocked on a single node.

Spark JIRA is created

Beginning from Spark 1.2, Spark introduced External Shuffle Service to enable executors fetch shuffle files from an external service instead of from each other which will offload the load on Spark Executors.

We want to check whether a similar thing of an External Service is implemented for transferring the cached partition to other executors.


Thanks, Prabhu Joseph


This e-mail (including any attachments) is private and confidential, may contain proprietary or privileged information and is intended for the named recipient(s) only. Unintended recipients are strictly prohibited from taking action on the basis of information in this e-mail and must contact the sender immediately, delete this e-mail (and all attachments) and destroy any hard copies. Nomura will not accept responsibility or liability for the accuracy or completeness of, or the presence of any virus or disabling code in, this e-mail. If verification is sought please request a hard copy. Any reference to the terms of executed transactions should be treated as preliminary only and subject to formal written confirmation by Nomura. Nomura reserves the right to retain, monitor and intercept e-mail communications through its networks (subject to and in accordance with applicable laws). No confidentiality or privilege is waived or lost by Nomura by any mistransmission of this e-mail. Any reference to "Nomura" is a reference to any entity in the Nomura Holdings, Inc. group. Please read our Electronic Communications Legal Notice which forms part of this e-mail: