It would be nice to have a code-configurable fall-back plan for such cases. Any generalized solution can cause problems elsewhere.
Simply replicating hot cached blocks would be complicated to maintain and could cause OOME. In the case I described on the JIRA, the hot partition will be changing e.g. every hour. Even though the persistence is calculated to be 1xMEM_SERIALIZED, replicating it will eventually break this contract and cause OOME. Of course in some cases the hot partition will be the same so it makes sense to replicate (possibly even to every node).
What would be very helpful, would be a way to configure caching/scheduling on the RDD level. So something like this would suit most cases (Simplified as it would require much more thought):
RDD.maxPartitionCache=5: Maximum number of times a partition can be cached
RDD.maxTTLMillis=60000: Simple time based eviction policy to drop extra copied after X millis of inactivity. Alternatively, these copies could have a lower priority when BlockManager evicts cached RDDs.
RDD.nonNodePolicy=Recompute: A hint that if a task is not accepted by LOCAL or NODE to re-compute the RDD. (Note that the profiling evidence of mentioned Jira was evenly distributed when RDD was not cached)
PS. I don’t have adequate Scala/Spark source knowledge to suggest an actual solution or make sure that what I am suggesting is even possible
From: Reynold Xin [mailto:email@example.com]
Sent: 09 March 2016 04:20
To: Prabhu Joseph
Cc: user; Spark dev list
Subject: Re: Spark Scheduler creating Straggler Node
You just want to be able to replicate hot cached blocks right?
On Tuesday, March 8, 2016, Prabhu Joseph <firstname.lastname@example.org> wrote:
When a Spark Job is running, and one of the Spark Executor on Node A has some partitions cached. Later for some other stage, Scheduler tries to assign a task to Node A to process a cached partition (PROCESS_LOCAL). But meanwhile the Node A is occupied with some other
tasks and got busy. Scheduler waits for spark.locality.wait interval and times out and tries to find some other node B which is NODE_LOCAL. The executor on Node B will try to get the cached partition from Node A which adds network IO to node and also some extra CPU for I/O. Eventually,
every node will have a task that is waiting to fetch some cached partition from node A and so the spark job / cluster is basically blocked on a single node.
Spark JIRA is created https://issues.apache.org/jira/browse/SPARK-13718
Beginning from Spark 1.2, Spark introduced External Shuffle Service to enable executors fetch shuffle files from an external service instead of from each other which will offload the load on Spark Executors.
We want to check whether a similar thing of an External Service is implemented for transferring the cached partition to other executors.
Thanks, Prabhu Joseph