spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Matt Cheah <>
Subject Re: [DISCUSS][SPARK-25299] SPIP: Shuffle storage API
Date Thu, 06 Jun 2019 00:32:37 GMT
Hi everyone,


I wanted to pick this back up again. The discussion has quieted down both on this thread and
on the document.


We made a few revisions to the document to hopefully make it easier to read and to clarify
our criteria for success in the project. Some of the APIs have also been adjusted based on
further discussion and things we’ve learned.


I was hoping to discuss what our next steps could be here. Specifically,
Would any PMC be willing to become the shepherd for this SPIP?
Is there any more feedback regarding this proposal?
What would we need to do to take this to a voting phase and to begin proposing our work against
upstream Spark?



-Matt Cheah


From: "Yifei Huang (PD)" <>
Date: Monday, May 13, 2019 at 1:04 PM
To: Mridul Muralidharan <>
Cc: Bo Yang <>, Ilan Filonenko <>, Imran Rashid <>,
Justin Uang <>, Liang Tang <>, Marcelo Vanzin
<>, Matei Zaharia <>, Matt Cheah <>,
Min Shen <>, Reynold Xin <>, Ryan Blue <>,
Vinoo Ganesh <>, Will Manning <>, ""
<>, "" <>, ""
<>, "" <>, ""
<>, "" <>, ""
Subject: Re: [DISCUSS][SPARK-25299] SPIP: Shuffle storage API


Hi Mridul - thanks for taking the time to give us feedback! Thoughts on the points that you


The API is meant to work with the existing SortShuffleManager algorithm. There aren't strict
requirements on how other ShuffleManager implementations must behave, so it seems impractical
to design an API that could also satisfy those unknown requirements. However, we do believe
that the API is rather generic, using OutputStreams for writes and InputStreams for reads,
and indexing the data by a shuffleId-mapId-reduceId combo, so if other shuffle algorithms
treat the data in the same chunks and want an interface for storage, then they can also use
this API from within their implementation.


About speculative execution, we originally made the assumption that each shuffle task is deterministic,
which meant that even if a later mapper overrode a previous committed mapper's value, it's
still the same contents. Having searched some tickets and reading
more carefully, I think there are problems with our original thought if the writer writes
all attempts of a task to the same location. One example is if the writer implementation writes
each partition to the remote host in a sequence of chunks. In such a situation, a reducer
might read data half written by the original task and half written by the running speculative
task, which will not be the correct contents if the mapper output is unordered. Therefore,
writes by a single mapper might have to be transactioned, which is not clear from the API,
and seems rather complex to reason about, so we shouldn't expect this from the implementer.


However, this doesn't affect the fundamentals of the API since we only need to add an additional
attemptId to the storage data index (which can be stored within the MapStatus) to solve the
problem of concurrent writes. This would also make it more clear that the writer should use
attempt ID as an index to ensure that writes from speculative tasks don't interfere with one
another (we can add that to the API docs as well).


From: Mridul Muralidharan <>
Date: Wednesday, May 8, 2019 at 8:18 PM
To: "Yifei Huang (PD)" <>
Cc: Bo Yang <>, Ilan Filonenko <>, Imran Rashid <>,
Justin Uang <>, Liang Tang <>, Marcelo Vanzin
<>, Matei Zaharia <>, Matt Cheah <>,
Min Shen <>, Reynold Xin <>, Ryan Blue <>,
Vinoo Ganesh <>, Will Manning <>, ""
<>, "" <>, ""
<>, "" <>, ""
<>, "" <>, ""
Subject: Re: [DISCUSS][SPARK-25299] SPIP: Shuffle storage API



Unfortunately I do not have bandwidth to do a detailed review, but a few things come to mind
after a quick read:


- While it might be tactically beneficial to align with existing implementation, a clean design
which does not tie into existing shuffle implementation would be preferable (if it can be
done without over engineering). Shuffle implementation can change and there are custom implementations
and experiments which differ quite a bit from what comes with Apache Spark.



- Please keep speculative execution in mind while designing the interfaces: in spark, implicitly
due to task scheduler logic, you won’t have conflicts at an executor for (shuffleId, mapId)
and (shuffleId, mapId, reducerId) tuple.

When you externalize it, there can be conflict : passing a way to distinguish different tasks
for same partition would be necessary for nontrivial implementations.



This would be a welcome and much needed enhancement to spark- looking forward to its progress








On Wed, May 8, 2019 at 11:24 AM Yifei Huang (PD) <> wrote:

Hi everyone,

For the past several months, we have been working on an API for pluggable storage of shuffle
data. In this SPIP, we describe the proposed API, its implications, and how it fits into other
work being done in the Spark shuffle space. If you're interested in Spark shuffle, and especially
if you have done some work in this area already, please take a look at the SPIP and give us
your thoughts and feedback.

Jira Ticket: []

Thank you!

Yifei Huang and Matt Cheah


View raw message