spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From <>
Subject RE: Spark Scheduler creating Straggler Node
Date Wed, 09 Mar 2016 10:43:14 GMT
It would be nice to have a code-configurable fall-back plan for such cases. Any generalized
solution can cause problems elsewhere.

Simply replicating hot cached blocks would be complicated to maintain and could cause OOME.
In the case I described on the JIRA, the hot partition will be changing e.g. every hour. Even
though the persistence is calculated to be 1xMEM_SERIALIZED, replicating it will eventually
break this contract and cause OOME. Of course in some cases the hot partition will be the
same so it makes sense to replicate (possibly even to every node).

What would be very helpful, would be a way to configure caching/scheduling on the RDD level.
So something like this would suit most cases (Simplified as it would require much more thought):
RDD.maxPartitionCache=5: Maximum number of times a partition can be cached
RDD.maxTTLMillis=60000: Simple time based eviction policy to drop extra copied after X millis
of inactivity. Alternatively, these copies could have a lower priority when BlockManager evicts
cached RDDs.
RDD.nonNodePolicy=Recompute: A hint that if a task is not accepted by LOCAL or NODE to re-compute
the RDD. (Note that the profiling evidence of mentioned Jira was evenly distributed when RDD
was not cached)

PS. I don’t have adequate Scala/Spark source knowledge to suggest an actual solution or
make sure that what I am suggesting is even possible

From: Reynold Xin []
Sent: 09 March 2016 04:20
To: Prabhu Joseph
Cc: user; Spark dev list
Subject: Re: Spark Scheduler creating Straggler Node

You just want to be able to replicate hot cached blocks right?

On Tuesday, March 8, 2016, Prabhu Joseph <<>>
Hi All,

    When a Spark Job is running, and one of the Spark Executor on Node A has some partitions
cached. Later for some other stage, Scheduler tries to assign a task to Node A to process
a cached partition (PROCESS_LOCAL). But meanwhile the Node A is occupied with some other
tasks and got busy. Scheduler waits for spark.locality.wait interval and times out and tries
to find some other node B which is NODE_LOCAL. The executor on Node B will try to get the
cached partition from Node A which adds network IO to node and also some extra CPU for I/O.
every node will have a task that is waiting to fetch some cached partition from node A and
so the spark job / cluster is basically blocked on a single node.

Spark JIRA is created<>

Beginning from Spark 1.2, Spark introduced External Shuffle Service to enable executors fetch
shuffle files from an external service instead of from each other which will offload the load
on Spark Executors.

We want to check whether a similar thing of an External Service is implemented for transferring
the cached partition to other executors.

Thanks, Prabhu Joseph

This e-mail (including any attachments) is private and confidential, may contain proprietary
or privileged information and is intended for the named recipient(s) only. Unintended recipients
are strictly prohibited from taking action on the basis of information in this e-mail and
must contact the sender immediately, delete this e-mail (and all attachments) and destroy
any hard copies. Nomura will not accept responsibility or liability for the accuracy or completeness
of, or the presence of any virus or disabling code in, this e-mail. If verification is sought
please request a hard copy. Any reference to the terms of executed transactions should be
treated as preliminary only and subject to formal written confirmation by Nomura. Nomura reserves
the right to retain, monitor and intercept e-mail communications through its networks (subject
to and in accordance with applicable laws). No confidentiality or privilege is waived or lost
by Nomura by any mistransmission of this e-mail. Any reference to "Nomura" is a reference
to any entity in the Nomura Holdings, Inc. group. Please read our Electronic Communications
Legal Notice which forms part of this e-mail:

View raw message