spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Saisai Shao <sai.sai.s...@gmail.com>
Subject Re: Enabling fully disaggregated shuffle on Spark
Date Wed, 04 Dec 2019 09:57:40 GMT
Hi Ben and Felix, I'm also interested in this. Would you please add me to
the invite, thanks a lot.

Best regards,
Saisai

Greg Lee <lihaowhu@gmail.com> 于2019年12月2日周一 下午11:34写道:

> Hi Felix & Ben,
>
> This is Li Hao from Baidu, same team with Linhong.
>
> As mentioned in Linhong’s email, independent disaggregated shuffle service
> is also our solution and continuous exploring direction for  improving
> stability of Hadoop MR and Spark in the production environment. We would
> love to hear about this topic from community and share our experience .
>
> Please add me to this event, thanks.
>
> Best Regards
> Li Hao
>
> Liu,Linhong <liulinhong@baidu.com> 于2019年11月29日周五 下午5:09写道:
>
>> Hi Felix & Ben,
>>
>> This is Linhong from Baidu based in Beijing, and we are internally using
>> a disaggregated shuffle service (we call it DCE) as well. We launched this
>> in production 3 years ago for Hadoop shuffle. Last year we migrated spark
>> shuffle to the same DCE shuffle service and stability improved a lot (we
>> can handle more than 100T shuffle now).
>>
>> It would be nice if there is a Spark shuffle API support fully
>> disaggregated shuffle and my team and I are very glad to share our
>> experience and help on this topic.
>>
>> So, if It’s possible, please add me to this event.
>>
>>
>>
>> Thanks,
>>
>> Liu, Linhong
>>
>>
>>
>> *From: *Aniket Mokashi <aniket486@gmail.com>
>> *Date: *Thursday, November 21, 2019 at 2:12 PM
>> *To: *Felix Cheung <felixcheung_m@hotmail.com>
>> *Cc: *Ben Sidhom <sidhom@google.com.invalid>, John Zhuge <
>> jzhuge@apache.org>, bo yang <bobyangbo@gmail.com>, Amogh Margoor <
>> amoghm@qubole.com>, Ryan Blue <rblue@netflix.com>, Spark Dev List <
>> dev@spark.apache.org>, Christopher Crosbie <crosbiec@google.com>,
>> Griselda Cuevas <gris@google.com>, Holden Karau <holden@pigscanfly.ca>,
>> Mayank Ahuja <mahuja@qubole.com>, Kalyan Sivakumar <kalyans@qubole.com>,
>> "alfozan@fb.com" <alfozan@fb.com>, Felix Cheung <felixc@uber.com>, Matt
>> Cheah <mcheah@palantir.com>, "Yifei Huang (PD)" <yifeih@palantir.com>
>> *Subject: *Re: Enabling fully disaggregated shuffle on Spark
>>
>>
>>
>> Felix - please add me to this event.
>>
>>
>>
>> Ben - should we move this proposal to a doc and open it up for
>> edits/comments.
>>
>>
>>
>> On Wed, Nov 20, 2019 at 5:37 PM Felix Cheung <felixcheung_m@hotmail.com>
>> wrote:
>>
>> Great!
>>
>>
>>
>> Due to number of constraints I won’t be sending link directly here but
>> please r me and I will add you.
>>
>>
>>
>>
>> ------------------------------
>>
>> *From:* Ben Sidhom <sidhom@google.com.INVALID>
>> *Sent:* Wednesday, November 20, 2019 9:10:01 AM
>> *To:* John Zhuge <jzhuge@apache.org>
>> *Cc:* bo yang <bobyangbo@gmail.com>; Amogh Margoor <amoghm@qubole.com>;
>> Ryan Blue <rblue@netflix.com>; Ben Sidhom <sidhom@google.com.invalid>;
>> Spark Dev List <dev@spark.apache.org>; Christopher Crosbie <
>> crosbiec@google.com>; Griselda Cuevas <gris@google.com>; Holden Karau <
>> holden@pigscanfly.ca>; Mayank Ahuja <mahuja@qubole.com>; Kalyan
>> Sivakumar <kalyans@qubole.com>; alfozan@fb.com <alfozan@fb.com>; Felix
>> Cheung <felixc@uber.com>; Matt Cheah <mcheah@palantir.com>; Yifei Huang
>> (PD) <yifeih@palantir.com>
>> *Subject:* Re: Enabling fully disaggregated shuffle on Spark
>>
>>
>>
>> That sounds great!
>>
>>
>>
>> On Wed, Nov 20, 2019 at 9:02 AM John Zhuge <jzhuge@apache.org> wrote:
>>
>> That will be great. Please send us the invite.
>>
>>
>>
>> On Wed, Nov 20, 2019 at 8:56 AM bo yang <bobyangbo@gmail.com> wrote:
>>
>> Cool, thanks Ryan, John, Amogh for the reply! Great to see you
>> interested! Felix will have a Spark Scalability & Reliability Sync
>> meeting on Dec 4 1pm PST. We could discuss more details there. Do you want
>> to join?
>>
>>
>>
>> On Tue, Nov 19, 2019 at 4:23 PM Amogh Margoor <amoghm@qubole.com> wrote:
>>
>> We at Qubole are also looking at disaggregating shuffle on Spark. Would
>> love to collaborate and share learnings.
>>
>>
>>
>> Regards,
>>
>> Amogh
>>
>>
>>
>> On Tue, Nov 19, 2019 at 4:09 PM John Zhuge <jzhuge@apache.org> wrote:
>>
>> Great work, Bo! Would love to hear the details.
>>
>>
>>
>>
>>
>> On Tue, Nov 19, 2019 at 4:05 PM Ryan Blue <rblue@netflix.com.invalid>
>> wrote:
>>
>> I'm interested in remote shuffle services as well. I'd love to hear about
>> what you're using in production!
>>
>>
>>
>> rb
>>
>>
>>
>> On Tue, Nov 19, 2019 at 2:43 PM bo yang <bobyangbo@gmail.com> wrote:
>>
>> Hi Ben,
>>
>>
>>
>> Thanks for the writing up! This is Bo from Uber. I am in Felix's team in
>> Seattle, and working on disaggregated shuffle (we called it remote shuffle
>> service, RSS, internally). We have put RSS into production for a while, and
>> learned a lot during the work (tried quite a few techniques to improve the
>> remote shuffle performance). We could share our learning with the
>> community, and also would like to hear feedback/suggestions on how to
>> further improve remote shuffle performance. We could chat more details if
>> you or other people are interested.
>>
>>
>>
>> Best,
>>
>> Bo
>>
>>
>>
>> On Fri, Nov 15, 2019 at 4:10 PM Ben Sidhom <sidhom@google.com.invalid>
>> wrote:
>>
>> I would like to start a conversation about extending the Spark shuffle
>> manager surface to support fully disaggregated shuffle implementations.
>> This is closely related to the work in SPARK-25299
>> <https://issues.apache.org/jira/browse/SPARK-25299>, which is focused on
>> refactoring the shuffle manager API (and in particular, SortShuffleManager)
>> to use a pluggable storage backend. The motivation for that SPIP is further
>> enabling Spark on Kubernetes.
>>
>>
>>
>> The motivation for this proposal is enabling full externalized
>> (disaggregated) shuffle service implementations. (Facebook’s Cosco
>> shuffle
>> <https://databricks.com/session/cosco-an-efficient-facebook-scale-shuffle-service>
>> is one example of such a disaggregated shuffle service.) These changes
>> allow the bulk of the shuffle to run in a remote service such that minimal
>> state resides in executors and local disk spill is minimized. The net
>> effect is increased job stability and performance improvements in certain
>> scenarios. These changes should work well with or are complementary to
>> SPARK-25299. Some or all points may be merged into that issue as
>> appropriate.
>>
>>
>>
>> Below is a description of each component of this proposal. These changes
>> can ideally be introduced incrementally. I would like to gather feedback
>> and gauge interest from others in the community to collaborate on this.
>> There are likely more points that would  be useful to disaggregated shuffle
>> services. We can outline a more concrete plan after gathering enough input.
>> A working session could help us kick off this joint effort; maybe something
>> in the mid-January to mid-February timeframe (depending on interest and
>> availability. I’m happy to host at our Sunnyvale, CA offices.
>>
>>
>> Proposal Scheduling and re-executing tasks
>>
>> Allow coordination between the service and the Spark DAG scheduler as to
>> whether a given block/partition needs to be recomputed when a task fails or
>> when shuffle block data cannot be read. Having such coordination is
>> important, e.g., for suppressing recomputation after aborted executors or
>> for forcing late recomputation if the service internally acts as a cache.
>> One catchall solution is to have the shuffle manager provide an indication
>> of whether shuffle data is external to executors (or nodes). Another
>> option: allow the shuffle manager (likely on the driver) to be queried for
>> the existence of shuffle data for a given executor ID (or perhaps map task,
>> reduce task, etc). Note that this is at the level of data the scheduler is
>> aware of (i.e., map/reduce partitions) rather than block IDs, which are
>> internal details for some shuffle managers.
>> ShuffleManager API
>>
>> Add a heartbeat (keep-alive) mechanism to RDD shuffle output so that the
>> service knows that data is still active. This is one way to enable
>> time-/job-scoped data because a disaggregated shuffle service cannot rely
>> on robust communication with Spark and in general has a distinct lifecycle
>> from the Spark deployment(s) it talks to. This would likely take the form
>> of a callback on ShuffleManager itself, but there are other approaches.
>>
>>
>>
>> Add lifecycle hooks to shuffle readers and writers (e.g., to
>> close/recycle connections/streams/file handles as well as provide commit
>> semantics). SPARK-25299 adds commit semantics to the internal data storage
>> layer, but this is applicable to all shuffle managers at a higher level and
>> should apply equally to the ShuffleWriter.
>>
>>
>>
>> Do not require ShuffleManagers to expose ShuffleBlockResolvers where they
>> are not needed. Ideally, this would be an implementation detail of the
>> shuffle manager itself. If there is substantial overlap between the
>> SortShuffleManager and other implementations, then the storage details can
>> be abstracted at the appropriate level. (SPARK-25299 does not currently
>> change this.)
>>
>>
>>
>> Do not require MapStatus to include blockmanager IDs where they are not
>> relevant. This is captured by ShuffleBlockInfo
>> <https://docs.google.com/document/d/1d6egnL6WHOwWZe8MWv3m8n4PToNacdx7n_0iMSWwhCQ/edit#heading=h.imi27prnziyj>
>> including an *optional* BlockManagerId in SPARK-25299. However, this
>> change should be lifted to the MapStatus level so that it applies to all
>> ShuffleManagers. Alternatively, use a more general data-location
>> abstraction than BlockManagerId. This gives the shuffle manager more
>> flexibility and the scheduler more information with respect to data
>> residence.
>> Serialization
>>
>> Allow serializers to be used more flexibly and efficiently. For example,
>> have serializers support writing an arbitrary number of objects into an
>> existing OutputStream or ByteBuffer. This enables objects to be serialized
>> to direct buffers where doing so makes sense. More importantly, it allows
>> arbitrary metadata/framing data to be wrapped around individual objects
>> cheaply. Right now, that’s only possible at the stream level. (There are
>> hacks around this, but this would enable more idiomatic use in efficient
>> shuffle implementations.)
>>
>>
>>
>> Have serializers indicate whether they are deterministic. This provides
>> much of the value of a shuffle service because it means that reducers do
>> not need to spill to disk when reading/merging/combining inputs--the data
>> can be grouped by the service, even without the service understanding data
>> types or byte representations. Alternative (less preferable since it would
>> break Java serialization, for example): require all serializers to be
>> deterministic.
>>
>>
>>
>>
>>
>> --
>>
>> - Ben
>>
>>
>>
>>
>> --
>>
>> Ryan Blue
>>
>> Software Engineer
>>
>> Netflix
>>
>>
>>
>>
>> --
>>
>> John Zhuge
>>
>>
>>
>>
>> --
>>
>> John Zhuge
>>
>>
>>
>>
>> --
>>
>> -Ben
>>
>>
>>
>>
>> --
>>
>> "...:::Aniket:::... Quetzalco@tl"
>>
>

Mime
View raw message