spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ben Sidhom <sid...@google.com.INVALID>
Subject Re: Enabling fully disaggregated shuffle on Spark
Date Wed, 20 Nov 2019 17:10:01 GMT
That sounds great!

On Wed, Nov 20, 2019 at 9:02 AM John Zhuge <jzhuge@apache.org> wrote:

> That will be great. Please send us the invite.
>
> On Wed, Nov 20, 2019 at 8:56 AM bo yang <bobyangbo@gmail.com> wrote:
>
>> Cool, thanks Ryan, John, Amogh for the reply! Great to see you
>> interested! Felix will have a Spark Scalability & Reliability Sync
>> meeting on Dec 4 1pm PST. We could discuss more details there. Do you want
>> to join?
>>
>> On Tue, Nov 19, 2019 at 4:23 PM Amogh Margoor <amoghm@qubole.com> wrote:
>>
>>> We at Qubole are also looking at disaggregating shuffle on Spark. Would
>>> love to collaborate and share learnings.
>>>
>>> Regards,
>>> Amogh
>>>
>>> On Tue, Nov 19, 2019 at 4:09 PM John Zhuge <jzhuge@apache.org> wrote:
>>>
>>>> Great work, Bo! Would love to hear the details.
>>>>
>>>>
>>>> On Tue, Nov 19, 2019 at 4:05 PM Ryan Blue <rblue@netflix.com.invalid>
>>>> wrote:
>>>>
>>>>> I'm interested in remote shuffle services as well. I'd love to hear
>>>>> about what you're using in production!
>>>>>
>>>>> rb
>>>>>
>>>>> On Tue, Nov 19, 2019 at 2:43 PM bo yang <bobyangbo@gmail.com> wrote:
>>>>>
>>>>>> Hi Ben,
>>>>>>
>>>>>> Thanks for the writing up! This is Bo from Uber. I am in Felix's
team
>>>>>> in Seattle, and working on disaggregated shuffle (we called it remote
>>>>>> shuffle service, RSS, internally). We have put RSS into production
for a
>>>>>> while, and learned a lot during the work (tried quite a few techniques
to
>>>>>> improve the remote shuffle performance). We could share our learning
with
>>>>>> the community, and also would like to hear feedback/suggestions on
how to
>>>>>> further improve remote shuffle performance. We could chat more details
if
>>>>>> you or other people are interested.
>>>>>>
>>>>>> Best,
>>>>>> Bo
>>>>>>
>>>>>> On Fri, Nov 15, 2019 at 4:10 PM Ben Sidhom <sidhom@google.com.invalid>
>>>>>> wrote:
>>>>>>
>>>>>>> I would like to start a conversation about extending the Spark
>>>>>>> shuffle manager surface to support fully disaggregated shuffle
>>>>>>> implementations. This is closely related to the work in SPARK-25299
>>>>>>> <https://issues.apache.org/jira/browse/SPARK-25299>, which
is
>>>>>>> focused on refactoring the shuffle manager API (and in particular,
>>>>>>> SortShuffleManager) to use a pluggable storage backend. The motivation
for
>>>>>>> that SPIP is further enabling Spark on Kubernetes.
>>>>>>>
>>>>>>>
>>>>>>> The motivation for this proposal is enabling full externalized
>>>>>>> (disaggregated) shuffle service implementations. (Facebook’s
Cosco
>>>>>>> shuffle
>>>>>>> <https://databricks.com/session/cosco-an-efficient-facebook-scale-shuffle-service>
>>>>>>> is one example of such a disaggregated shuffle service.) These
changes
>>>>>>> allow the bulk of the shuffle to run in a remote service such
that minimal
>>>>>>> state resides in executors and local disk spill is minimized.
The net
>>>>>>> effect is increased job stability and performance improvements
in certain
>>>>>>> scenarios. These changes should work well with or are complementary
to
>>>>>>> SPARK-25299. Some or all points may be merged into that issue
as
>>>>>>> appropriate.
>>>>>>>
>>>>>>>
>>>>>>> Below is a description of each component of this proposal. These
>>>>>>> changes can ideally be introduced incrementally. I would like
to gather
>>>>>>> feedback and gauge interest from others in the community to collaborate
on
>>>>>>> this. There are likely more points that would  be useful to disaggregated
>>>>>>> shuffle services. We can outline a more concrete plan after gathering
>>>>>>> enough input. A working session could help us kick off this joint
effort;
>>>>>>> maybe something in the mid-January to mid-February timeframe
(depending on
>>>>>>> interest and availability. I’m happy to host at our Sunnyvale,
CA offices.
>>>>>>>
>>>>>>>
>>>>>>> ProposalScheduling and re-executing tasks
>>>>>>>
>>>>>>> Allow coordination between the service and the Spark DAG scheduler
>>>>>>> as to whether a given block/partition needs to be recomputed
when a task
>>>>>>> fails or when shuffle block data cannot be read. Having such
coordination
>>>>>>> is important, e.g., for suppressing recomputation after aborted
executors
>>>>>>> or for forcing late recomputation if the service internally acts
as a
>>>>>>> cache. One catchall solution is to have the shuffle manager provide
an
>>>>>>> indication of whether shuffle data is external to executors (or
nodes).
>>>>>>> Another option: allow the shuffle manager (likely on the driver)
to be
>>>>>>> queried for the existence of shuffle data for a given executor
ID (or
>>>>>>> perhaps map task, reduce task, etc). Note that this is at the
level of data
>>>>>>> the scheduler is aware of (i.e., map/reduce partitions) rather
than block
>>>>>>> IDs, which are internal details for some shuffle managers.
>>>>>>> ShuffleManager API
>>>>>>>
>>>>>>> Add a heartbeat (keep-alive) mechanism to RDD shuffle output
so that
>>>>>>> the service knows that data is still active. This is one way
to enable
>>>>>>> time-/job-scoped data because a disaggregated shuffle service
cannot rely
>>>>>>> on robust communication with Spark and in general has a distinct
lifecycle
>>>>>>> from the Spark deployment(s) it talks to. This would likely take
the form
>>>>>>> of a callback on ShuffleManager itself, but there are other approaches.
>>>>>>>
>>>>>>>
>>>>>>> Add lifecycle hooks to shuffle readers and writers (e.g., to
>>>>>>> close/recycle connections/streams/file handles as well as provide
commit
>>>>>>> semantics). SPARK-25299 adds commit semantics to the internal
data storage
>>>>>>> layer, but this is applicable to all shuffle managers at a higher
level and
>>>>>>> should apply equally to the ShuffleWriter.
>>>>>>>
>>>>>>>
>>>>>>> Do not require ShuffleManagers to expose ShuffleBlockResolvers
where
>>>>>>> they are not needed. Ideally, this would be an implementation
detail of the
>>>>>>> shuffle manager itself. If there is substantial overlap between
the
>>>>>>> SortShuffleManager and other implementations, then the storage
details can
>>>>>>> be abstracted at the appropriate level. (SPARK-25299 does not
currently
>>>>>>> change this.)
>>>>>>>
>>>>>>>
>>>>>>> Do not require MapStatus to include blockmanager IDs where they
are
>>>>>>> not relevant. This is captured by ShuffleBlockInfo
>>>>>>> <https://docs.google.com/document/d/1d6egnL6WHOwWZe8MWv3m8n4PToNacdx7n_0iMSWwhCQ/edit#heading=h.imi27prnziyj>
>>>>>>> including an optional BlockManagerId in SPARK-25299. However,
this
>>>>>>> change should be lifted to the MapStatus level so that it applies
to all
>>>>>>> ShuffleManagers. Alternatively, use a more general data-location
>>>>>>> abstraction than BlockManagerId. This gives the shuffle manager
more
>>>>>>> flexibility and the scheduler more information with respect to
data
>>>>>>> residence.
>>>>>>> Serialization
>>>>>>>
>>>>>>> Allow serializers to be used more flexibly and efficiently. For
>>>>>>> example, have serializers support writing an arbitrary number
of objects
>>>>>>> into an existing OutputStream or ByteBuffer. This enables objects
to be
>>>>>>> serialized to direct buffers where doing so makes sense. More
importantly,
>>>>>>> it allows arbitrary metadata/framing data to be wrapped around
individual
>>>>>>> objects cheaply. Right now, that’s only possible at the stream
level.
>>>>>>> (There are hacks around this, but this would enable more idiomatic
use in
>>>>>>> efficient shuffle implementations.)
>>>>>>>
>>>>>>>
>>>>>>> Have serializers indicate whether they are deterministic. This
>>>>>>> provides much of the value of a shuffle service because it means
that
>>>>>>> reducers do not need to spill to disk when reading/merging/combining
>>>>>>> inputs--the data can be grouped by the service, even without
the service
>>>>>>> understanding data types or byte representations. Alternative
(less
>>>>>>> preferable since it would break Java serialization, for example):
require
>>>>>>> all serializers to be deterministic.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> - Ben
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Ryan Blue
>>>>> Software Engineer
>>>>> Netflix
>>>>>
>>>>
>>>>
>>>> --
>>>> John Zhuge
>>>>
>>>
>
> --
> John Zhuge
>


-- 
-Ben

Mime
View raw message