spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo yang <>
Subject Re: Enabling fully disaggregated shuffle on Spark
Date Wed, 20 Nov 2019 16:56:42 GMT
Cool, thanks Ryan, John, Amogh for the reply! Great to see you interested!
Felix will have a Spark Scalability & Reliability Sync meeting on Dec 4 1pm
PST. We could discuss more details there. Do you want to join?

On Tue, Nov 19, 2019 at 4:23 PM Amogh Margoor <> wrote:

> We at Qubole are also looking at disaggregating shuffle on Spark. Would
> love to collaborate and share learnings.
> Regards,
> Amogh
> On Tue, Nov 19, 2019 at 4:09 PM John Zhuge <> wrote:
>> Great work, Bo! Would love to hear the details.
>> On Tue, Nov 19, 2019 at 4:05 PM Ryan Blue <>
>> wrote:
>>> I'm interested in remote shuffle services as well. I'd love to hear
>>> about what you're using in production!
>>> rb
>>> On Tue, Nov 19, 2019 at 2:43 PM bo yang <> wrote:
>>>> Hi Ben,
>>>> Thanks for the writing up! This is Bo from Uber. I am in Felix's team
>>>> in Seattle, and working on disaggregated shuffle (we called it remote
>>>> shuffle service, RSS, internally). We have put RSS into production for a
>>>> while, and learned a lot during the work (tried quite a few techniques to
>>>> improve the remote shuffle performance). We could share our learning with
>>>> the community, and also would like to hear feedback/suggestions on how to
>>>> further improve remote shuffle performance. We could chat more details if
>>>> you or other people are interested.
>>>> Best,
>>>> Bo
>>>> On Fri, Nov 15, 2019 at 4:10 PM Ben Sidhom <>
>>>> wrote:
>>>>> I would like to start a conversation about extending the Spark shuffle
>>>>> manager surface to support fully disaggregated shuffle implementations.
>>>>> This is closely related to the work in SPARK-25299
>>>>> <>, which is focused
>>>>> on refactoring the shuffle manager API (and in particular,
>>>>> SortShuffleManager) to use a pluggable storage backend. The motivation
>>>>> that SPIP is further enabling Spark on Kubernetes.
>>>>> The motivation for this proposal is enabling full externalized
>>>>> (disaggregated) shuffle service implementations. (Facebook’s Cosco
>>>>> shuffle
>>>>> <>
>>>>> is one example of such a disaggregated shuffle service.) These changes
>>>>> allow the bulk of the shuffle to run in a remote service such that minimal
>>>>> state resides in executors and local disk spill is minimized. The net
>>>>> effect is increased job stability and performance improvements in certain
>>>>> scenarios. These changes should work well with or are complementary to
>>>>> SPARK-25299. Some or all points may be merged into that issue as
>>>>> appropriate.
>>>>> Below is a description of each component of this proposal. These
>>>>> changes can ideally be introduced incrementally. I would like to gather
>>>>> feedback and gauge interest from others in the community to collaborate
>>>>> this. There are likely more points that would  be useful to disaggregated
>>>>> shuffle services. We can outline a more concrete plan after gathering
>>>>> enough input. A working session could help us kick off this joint effort;
>>>>> maybe something in the mid-January to mid-February timeframe (depending
>>>>> interest and availability. I’m happy to host at our Sunnyvale, CA offices.
>>>>> ProposalScheduling and re-executing tasks
>>>>> Allow coordination between the service and the Spark DAG scheduler as
>>>>> to whether a given block/partition needs to be recomputed when a task
>>>>> or when shuffle block data cannot be read. Having such coordination is
>>>>> important, e.g., for suppressing recomputation after aborted executors
>>>>> for forcing late recomputation if the service internally acts as a cache.
>>>>> One catchall solution is to have the shuffle manager provide an indication
>>>>> of whether shuffle data is external to executors (or nodes). Another
>>>>> option: allow the shuffle manager (likely on the driver) to be queried
>>>>> the existence of shuffle data for a given executor ID (or perhaps map
>>>>> reduce task, etc). Note that this is at the level of data the scheduler
>>>>> aware of (i.e., map/reduce partitions) rather than block IDs, which are
>>>>> internal details for some shuffle managers.
>>>>> ShuffleManager API
>>>>> Add a heartbeat (keep-alive) mechanism to RDD shuffle output so that
>>>>> the service knows that data is still active. This is one way to enable
>>>>> time-/job-scoped data because a disaggregated shuffle service cannot
>>>>> on robust communication with Spark and in general has a distinct lifecycle
>>>>> from the Spark deployment(s) it talks to. This would likely take the
>>>>> of a callback on ShuffleManager itself, but there are other approaches.
>>>>> Add lifecycle hooks to shuffle readers and writers (e.g., to
>>>>> close/recycle connections/streams/file handles as well as provide commit
>>>>> semantics). SPARK-25299 adds commit semantics to the internal data storage
>>>>> layer, but this is applicable to all shuffle managers at a higher level
>>>>> should apply equally to the ShuffleWriter.
>>>>> Do not require ShuffleManagers to expose ShuffleBlockResolvers where
>>>>> they are not needed. Ideally, this would be an implementation detail
of the
>>>>> shuffle manager itself. If there is substantial overlap between the
>>>>> SortShuffleManager and other implementations, then the storage details
>>>>> be abstracted at the appropriate level. (SPARK-25299 does not currently
>>>>> change this.)
>>>>> Do not require MapStatus to include blockmanager IDs where they are
>>>>> not relevant. This is captured by ShuffleBlockInfo
>>>>> <>
>>>>> including an optional BlockManagerId in SPARK-25299. However, this
>>>>> change should be lifted to the MapStatus level so that it applies to
>>>>> ShuffleManagers. Alternatively, use a more general data-location
>>>>> abstraction than BlockManagerId. This gives the shuffle manager more
>>>>> flexibility and the scheduler more information with respect to data
>>>>> residence.
>>>>> Serialization
>>>>> Allow serializers to be used more flexibly and efficiently. For
>>>>> example, have serializers support writing an arbitrary number of objects
>>>>> into an existing OutputStream or ByteBuffer. This enables objects to
>>>>> serialized to direct buffers where doing so makes sense. More importantly,
>>>>> it allows arbitrary metadata/framing data to be wrapped around individual
>>>>> objects cheaply. Right now, that’s only possible at the stream level.
>>>>> (There are hacks around this, but this would enable more idiomatic use
>>>>> efficient shuffle implementations.)
>>>>> Have serializers indicate whether they are deterministic. This
>>>>> provides much of the value of a shuffle service because it means that
>>>>> reducers do not need to spill to disk when reading/merging/combining
>>>>> inputs--the data can be grouped by the service, even without the service
>>>>> understanding data types or byte representations. Alternative (less
>>>>> preferable since it would break Java serialization, for example): require
>>>>> all serializers to be deterministic.
>>>>> --
>>>>> - Ben
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>> --
>> John Zhuge

View raw message