spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From John Zhuge <>
Subject Re: Enabling fully disaggregated shuffle on Spark
Date Wed, 20 Nov 2019 17:01:48 GMT
That will be great. Please send us the invite.

On Wed, Nov 20, 2019 at 8:56 AM bo yang <> wrote:

> Cool, thanks Ryan, John, Amogh for the reply! Great to see you interested!
> Felix will have a Spark Scalability & Reliability Sync meeting on Dec 4 1pm
> PST. We could discuss more details there. Do you want to join?
> On Tue, Nov 19, 2019 at 4:23 PM Amogh Margoor <> wrote:
>> We at Qubole are also looking at disaggregating shuffle on Spark. Would
>> love to collaborate and share learnings.
>> Regards,
>> Amogh
>> On Tue, Nov 19, 2019 at 4:09 PM John Zhuge <> wrote:
>>> Great work, Bo! Would love to hear the details.
>>> On Tue, Nov 19, 2019 at 4:05 PM Ryan Blue <>
>>> wrote:
>>>> I'm interested in remote shuffle services as well. I'd love to hear
>>>> about what you're using in production!
>>>> rb
>>>> On Tue, Nov 19, 2019 at 2:43 PM bo yang <> wrote:
>>>>> Hi Ben,
>>>>> Thanks for the writing up! This is Bo from Uber. I am in Felix's team
>>>>> in Seattle, and working on disaggregated shuffle (we called it remote
>>>>> shuffle service, RSS, internally). We have put RSS into production for
>>>>> while, and learned a lot during the work (tried quite a few techniques
>>>>> improve the remote shuffle performance). We could share our learning
>>>>> the community, and also would like to hear feedback/suggestions on how
>>>>> further improve remote shuffle performance. We could chat more details
>>>>> you or other people are interested.
>>>>> Best,
>>>>> Bo
>>>>> On Fri, Nov 15, 2019 at 4:10 PM Ben Sidhom <>
>>>>> wrote:
>>>>>> I would like to start a conversation about extending the Spark
>>>>>> shuffle manager surface to support fully disaggregated shuffle
>>>>>> implementations. This is closely related to the work in SPARK-25299
>>>>>> <>, which
>>>>>> focused on refactoring the shuffle manager API (and in particular,
>>>>>> SortShuffleManager) to use a pluggable storage backend. The motivation
>>>>>> that SPIP is further enabling Spark on Kubernetes.
>>>>>> The motivation for this proposal is enabling full externalized
>>>>>> (disaggregated) shuffle service implementations. (Facebook’s Cosco
>>>>>> shuffle
>>>>>> <>
>>>>>> is one example of such a disaggregated shuffle service.) These changes
>>>>>> allow the bulk of the shuffle to run in a remote service such that
>>>>>> state resides in executors and local disk spill is minimized. The
>>>>>> effect is increased job stability and performance improvements in
>>>>>> scenarios. These changes should work well with or are complementary
>>>>>> SPARK-25299. Some or all points may be merged into that issue as
>>>>>> appropriate.
>>>>>> Below is a description of each component of this proposal. These
>>>>>> changes can ideally be introduced incrementally. I would like to
>>>>>> feedback and gauge interest from others in the community to collaborate
>>>>>> this. There are likely more points that would  be useful to disaggregated
>>>>>> shuffle services. We can outline a more concrete plan after gathering
>>>>>> enough input. A working session could help us kick off this joint
>>>>>> maybe something in the mid-January to mid-February timeframe (depending
>>>>>> interest and availability. I’m happy to host at our Sunnyvale,
CA offices.
>>>>>> ProposalScheduling and re-executing tasks
>>>>>> Allow coordination between the service and the Spark DAG scheduler
>>>>>> to whether a given block/partition needs to be recomputed when a
task fails
>>>>>> or when shuffle block data cannot be read. Having such coordination
>>>>>> important, e.g., for suppressing recomputation after aborted executors
>>>>>> for forcing late recomputation if the service internally acts as
a cache.
>>>>>> One catchall solution is to have the shuffle manager provide an indication
>>>>>> of whether shuffle data is external to executors (or nodes). Another
>>>>>> option: allow the shuffle manager (likely on the driver) to be queried
>>>>>> the existence of shuffle data for a given executor ID (or perhaps
map task,
>>>>>> reduce task, etc). Note that this is at the level of data the scheduler
>>>>>> aware of (i.e., map/reduce partitions) rather than block IDs, which
>>>>>> internal details for some shuffle managers.
>>>>>> ShuffleManager API
>>>>>> Add a heartbeat (keep-alive) mechanism to RDD shuffle output so that
>>>>>> the service knows that data is still active. This is one way to enable
>>>>>> time-/job-scoped data because a disaggregated shuffle service cannot
>>>>>> on robust communication with Spark and in general has a distinct
>>>>>> from the Spark deployment(s) it talks to. This would likely take
the form
>>>>>> of a callback on ShuffleManager itself, but there are other approaches.
>>>>>> Add lifecycle hooks to shuffle readers and writers (e.g., to
>>>>>> close/recycle connections/streams/file handles as well as provide
>>>>>> semantics). SPARK-25299 adds commit semantics to the internal data
>>>>>> layer, but this is applicable to all shuffle managers at a higher
level and
>>>>>> should apply equally to the ShuffleWriter.
>>>>>> Do not require ShuffleManagers to expose ShuffleBlockResolvers where
>>>>>> they are not needed. Ideally, this would be an implementation detail
of the
>>>>>> shuffle manager itself. If there is substantial overlap between the
>>>>>> SortShuffleManager and other implementations, then the storage details
>>>>>> be abstracted at the appropriate level. (SPARK-25299 does not currently
>>>>>> change this.)
>>>>>> Do not require MapStatus to include blockmanager IDs where they are
>>>>>> not relevant. This is captured by ShuffleBlockInfo
>>>>>> <>
>>>>>> including an optional BlockManagerId in SPARK-25299. However, this
>>>>>> change should be lifted to the MapStatus level so that it applies
to all
>>>>>> ShuffleManagers. Alternatively, use a more general data-location
>>>>>> abstraction than BlockManagerId. This gives the shuffle manager more
>>>>>> flexibility and the scheduler more information with respect to data
>>>>>> residence.
>>>>>> Serialization
>>>>>> Allow serializers to be used more flexibly and efficiently. For
>>>>>> example, have serializers support writing an arbitrary number of
>>>>>> into an existing OutputStream or ByteBuffer. This enables objects
to be
>>>>>> serialized to direct buffers where doing so makes sense. More importantly,
>>>>>> it allows arbitrary metadata/framing data to be wrapped around individual
>>>>>> objects cheaply. Right now, that’s only possible at the stream
>>>>>> (There are hacks around this, but this would enable more idiomatic
use in
>>>>>> efficient shuffle implementations.)
>>>>>> Have serializers indicate whether they are deterministic. This
>>>>>> provides much of the value of a shuffle service because it means
>>>>>> reducers do not need to spill to disk when reading/merging/combining
>>>>>> inputs--the data can be grouped by the service, even without the
>>>>>> understanding data types or byte representations. Alternative (less
>>>>>> preferable since it would break Java serialization, for example):
>>>>>> all serializers to be deterministic.
>>>>>> --
>>>>>> - Ben
>>>> --
>>>> Ryan Blue
>>>> Software Engineer
>>>> Netflix
>>> --
>>> John Zhuge

John Zhuge

View raw message