spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Yifei Huang (PD)" <yif...@palantir.com>
Subject Re: [DISCUSS][SPARK-25299] SPIP: Shuffle storage API
Date Mon, 13 May 2019 20:04:41 GMT
Hi Mridul - thanks for taking the time to give us feedback! Thoughts on the points that you
mentioned:

 

The API is meant to work with the existing SortShuffleManager algorithm. There aren't strict
requirements on how other ShuffleManager implementations must behave, so it seems impractical
to design an API that could also satisfy those unknown requirements. However, we do believe
that the API is rather generic, using OutputStreams for writes and InputStreams for reads,
and indexing the data by a shuffleId-mapId-reduceId combo, so if other shuffle algorithms
treat the data in the same chunks and want an interface for storage, then they can also use
this API from within their implementation.

 

About speculative execution, we originally made the assumption that each shuffle task is deterministic,
which meant that even if a later mapper overrode a previous committed mapper's value, it's
still the same contents. Having searched some tickets and reading https://github.com/apache/spark/pull/22112/files
more carefully, I think there are problems with our original thought if the writer writes
all attempts of a task to the same location. One example is if the writer implementation writes
each partition to the remote host in a sequence of chunks. In such a situation, a reducer
might read data half written by the original task and half written by the running speculative
task, which will not be the correct contents if the mapper output is unordered. Therefore,
writes by a single mapper might have to be transactioned, which is not clear from the API,
and seems rather complex to reason about, so we shouldn't expect this from the implementer.

 

However, this doesn't affect the fundamentals of the API since we only need to add an additional
attemptId to the storage data index (which can be stored within the MapStatus) to solve the
problem of concurrent writes. This would also make it more clear that the writer should use
attempt ID as an index to ensure that writes from speculative tasks don't interfere with one
another (we can add that to the API docs as well).

 

From: Mridul Muralidharan <mridul@gmail.com>
Date: Wednesday, May 8, 2019 at 8:18 PM
To: "Yifei Huang (PD)" <yifeih@palantir.com>
Cc: Bo Yang <boy@uber.com>, Ilan Filonenko <if56@cornell.edu>, Imran Rashid <irashid@cloudera.com>,
Justin Uang <juang@palantir.com>, Liang Tang <latang@linkedin.com>, Marcelo Vanzin
<vanzin@cloudera.com>, Matei Zaharia <matei.zaharia@gmail.com>, Matt Cheah <mcheah@palantir.com>,
Min Shen <mshen@linkedin.com>, Reynold Xin <rxin@databricks.com>, Ryan Blue <rblue@netflix.com>,
Vinoo Ganesh <vganesh@palantir.com>, Will Manning <wmanning@palantir.com>, "bcho@fb.com"
<bcho@fb.com>, "dev@spark.apache.org" <dev@spark.apache.org>, "felixc@uber.com"
<felixc@uber.com>, "fli@linkedin.com" <fli@linkedin.com>, "tgravescs1@gmail.com"
<tgravescs1@gmail.com>, "yezhou@linkedin.com" <yezhou@linkedin.com>, "yue.li@memverge.com"
<yue.li@memverge.com>
Subject: Re: [DISCUSS][SPARK-25299] SPIP: Shuffle storage API

 

 

Unfortunately I do not have bandwidth to do a detailed review, but a few things come to mind
after a quick read:

 

- While it might be tactically beneficial to align with existing implementation, a clean design
which does not tie into existing shuffle implementation would be preferable (if it can be
done without over engineering). Shuffle implementation can change and there are custom implementations
and experiments which differ quite a bit from what comes with Apache Spark.

 

 

- Please keep speculative execution in mind while designing the interfaces: in spark, implicitly
due to task scheduler logic, you won’t have conflicts at an executor for (shuffleId, mapId)
and (shuffleId, mapId, reducerId) tuple.

When you externalize it, there can be conflict : passing a way to distinguish different tasks
for same partition would be necessary for nontrivial implementations.

 

 

This would be a welcome and much needed enhancement to spark- looking forward to its progress
!

 

 

Regards,

Mridul

 

 

 

On Wed, May 8, 2019 at 11:24 AM Yifei Huang (PD) <yifeih@palantir.com> wrote:

Hi everyone,

For the past several months, we have been working on an API for pluggable storage of shuffle
data. In this SPIP, we describe the proposed API, its implications, and how it fits into other
work being done in the Spark shuffle space. If you're interested in Spark shuffle, and especially
if you have done some work in this area already, please take a look at the SPIP and give us
your thoughts and feedback.

Jira Ticket: https://issues.apache.org/jira/browse/SPARK-25299 [issues.apache.org]
SPIP: https://docs.google.com/document/d/1d6egnL6WHOwWZe8MWv3m8n4PToNacdx7n_0iMSWwhCQ/edit
[docs.google.com]

Thank you!

Yifei Huang and Matt Cheah

 


Mime
View raw message