spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aniket Mokashi <aniket...@gmail.com>
Subject Re: Enabling fully disaggregated shuffle on Spark
Date Thu, 21 Nov 2019 06:11:46 GMT
Felix - please add me to this event.

Ben - should we move this proposal to a doc and open it up for
edits/comments.

On Wed, Nov 20, 2019 at 5:37 PM Felix Cheung <felixcheung_m@hotmail.com>
wrote:

> Great!
>
> Due to number of constraints I won’t be sending link directly here but
> please r me and I will add you.
>
>
> ------------------------------
> *From:* Ben Sidhom <sidhom@google.com.INVALID>
> *Sent:* Wednesday, November 20, 2019 9:10:01 AM
> *To:* John Zhuge <jzhuge@apache.org>
> *Cc:* bo yang <bobyangbo@gmail.com>; Amogh Margoor <amoghm@qubole.com>;
> Ryan Blue <rblue@netflix.com>; Ben Sidhom <sidhom@google.com.invalid>;
> Spark Dev List <dev@spark.apache.org>; Christopher Crosbie <
> crosbiec@google.com>; Griselda Cuevas <gris@google.com>; Holden Karau <
> holden@pigscanfly.ca>; Mayank Ahuja <mahuja@qubole.com>; Kalyan Sivakumar
> <kalyans@qubole.com>; alfozan@fb.com <alfozan@fb.com>; Felix Cheung <
> felixc@uber.com>; Matt Cheah <mcheah@palantir.com>; Yifei Huang (PD) <
> yifeih@palantir.com>
> *Subject:* Re: Enabling fully disaggregated shuffle on Spark
>
> That sounds great!
>
> On Wed, Nov 20, 2019 at 9:02 AM John Zhuge <jzhuge@apache.org> wrote:
>
> That will be great. Please send us the invite.
>
> On Wed, Nov 20, 2019 at 8:56 AM bo yang <bobyangbo@gmail.com> wrote:
>
> Cool, thanks Ryan, John, Amogh for the reply! Great to see you interested!
> Felix will have a Spark Scalability & Reliability Sync meeting on Dec 4 1pm
> PST. We could discuss more details there. Do you want to join?
>
> On Tue, Nov 19, 2019 at 4:23 PM Amogh Margoor <amoghm@qubole.com> wrote:
>
> We at Qubole are also looking at disaggregating shuffle on Spark. Would
> love to collaborate and share learnings.
>
> Regards,
> Amogh
>
> On Tue, Nov 19, 2019 at 4:09 PM John Zhuge <jzhuge@apache.org> wrote:
>
> Great work, Bo! Would love to hear the details.
>
>
> On Tue, Nov 19, 2019 at 4:05 PM Ryan Blue <rblue@netflix.com.invalid>
> wrote:
>
> I'm interested in remote shuffle services as well. I'd love to hear about
> what you're using in production!
>
> rb
>
> On Tue, Nov 19, 2019 at 2:43 PM bo yang <bobyangbo@gmail.com> wrote:
>
> Hi Ben,
>
> Thanks for the writing up! This is Bo from Uber. I am in Felix's team in
> Seattle, and working on disaggregated shuffle (we called it remote shuffle
> service, RSS, internally). We have put RSS into production for a while, and
> learned a lot during the work (tried quite a few techniques to improve the
> remote shuffle performance). We could share our learning with the
> community, and also would like to hear feedback/suggestions on how to
> further improve remote shuffle performance. We could chat more details if
> you or other people are interested.
>
> Best,
> Bo
>
> On Fri, Nov 15, 2019 at 4:10 PM Ben Sidhom <sidhom@google.com.invalid>
> wrote:
>
> I would like to start a conversation about extending the Spark shuffle
> manager surface to support fully disaggregated shuffle implementations.
> This is closely related to the work in SPARK-25299
> <https://issues.apache.org/jira/browse/SPARK-25299>, which is focused on
> refactoring the shuffle manager API (and in particular, SortShuffleManager)
> to use a pluggable storage backend. The motivation for that SPIP is further
> enabling Spark on Kubernetes.
>
>
> The motivation for this proposal is enabling full externalized
> (disaggregated) shuffle service implementations. (Facebook’s Cosco shuffle
> <https://databricks.com/session/cosco-an-efficient-facebook-scale-shuffle-service>
> is one example of such a disaggregated shuffle service.) These changes
> allow the bulk of the shuffle to run in a remote service such that minimal
> state resides in executors and local disk spill is minimized. The net
> effect is increased job stability and performance improvements in certain
> scenarios. These changes should work well with or are complementary to
> SPARK-25299. Some or all points may be merged into that issue as
> appropriate.
>
>
> Below is a description of each component of this proposal. These changes
> can ideally be introduced incrementally. I would like to gather feedback
> and gauge interest from others in the community to collaborate on this.
> There are likely more points that would  be useful to disaggregated shuffle
> services. We can outline a more concrete plan after gathering enough input.
> A working session could help us kick off this joint effort; maybe something
> in the mid-January to mid-February timeframe (depending on interest and
> availability. I’m happy to host at our Sunnyvale, CA offices.
>
>
> Proposal Scheduling and re-executing tasks
>
> Allow coordination between the service and the Spark DAG scheduler as to
> whether a given block/partition needs to be recomputed when a task fails or
> when shuffle block data cannot be read. Having such coordination is
> important, e.g., for suppressing recomputation after aborted executors or
> for forcing late recomputation if the service internally acts as a cache.
> One catchall solution is to have the shuffle manager provide an indication
> of whether shuffle data is external to executors (or nodes). Another
> option: allow the shuffle manager (likely on the driver) to be queried for
> the existence of shuffle data for a given executor ID (or perhaps map task,
> reduce task, etc). Note that this is at the level of data the scheduler is
> aware of (i.e., map/reduce partitions) rather than block IDs, which are
> internal details for some shuffle managers.
> ShuffleManager API
>
> Add a heartbeat (keep-alive) mechanism to RDD shuffle output so that the
> service knows that data is still active. This is one way to enable
> time-/job-scoped data because a disaggregated shuffle service cannot rely
> on robust communication with Spark and in general has a distinct lifecycle
> from the Spark deployment(s) it talks to. This would likely take the form
> of a callback on ShuffleManager itself, but there are other approaches.
>
>
> Add lifecycle hooks to shuffle readers and writers (e.g., to close/recycle
> connections/streams/file handles as well as provide commit semantics).
> SPARK-25299 adds commit semantics to the internal data storage layer, but
> this is applicable to all shuffle managers at a higher level and should
> apply equally to the ShuffleWriter.
>
>
> Do not require ShuffleManagers to expose ShuffleBlockResolvers where they
> are not needed. Ideally, this would be an implementation detail of the
> shuffle manager itself. If there is substantial overlap between the
> SortShuffleManager and other implementations, then the storage details can
> be abstracted at the appropriate level. (SPARK-25299 does not currently
> change this.)
>
>
> Do not require MapStatus to include blockmanager IDs where they are not
> relevant. This is captured by ShuffleBlockInfo
> <https://docs.google.com/document/d/1d6egnL6WHOwWZe8MWv3m8n4PToNacdx7n_0iMSWwhCQ/edit#heading=h.imi27prnziyj>
> including an optional BlockManagerId in SPARK-25299. However, this change
> should be lifted to the MapStatus level so that it applies to all
> ShuffleManagers. Alternatively, use a more general data-location
> abstraction than BlockManagerId. This gives the shuffle manager more
> flexibility and the scheduler more information with respect to data
> residence.
> Serialization
>
> Allow serializers to be used more flexibly and efficiently. For example,
> have serializers support writing an arbitrary number of objects into an
> existing OutputStream or ByteBuffer. This enables objects to be serialized
> to direct buffers where doing so makes sense. More importantly, it allows
> arbitrary metadata/framing data to be wrapped around individual objects
> cheaply. Right now, that’s only possible at the stream level. (There are
> hacks around this, but this would enable more idiomatic use in efficient
> shuffle implementations.)
>
>
> Have serializers indicate whether they are deterministic. This provides
> much of the value of a shuffle service because it means that reducers do
> not need to spill to disk when reading/merging/combining inputs--the data
> can be grouped by the service, even without the service understanding data
> types or byte representations. Alternative (less preferable since it would
> break Java serialization, for example): require all serializers to be
> deterministic.
>
>
>
> --
>
> - Ben
>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>
>
>
> --
> John Zhuge
>
>
>
> --
> John Zhuge
>
>
>
> --
> -Ben
>


-- 
"...:::Aniket:::... Quetzalco@tl"

Mime
View raw message