spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mark Hamstra <m...@clearstorydata.com>
Subject Re: HDFS as Shuffle Service
Date Thu, 28 Apr 2016 18:08:47 GMT
Yes, replicated and distributed shuffle materializations are key
requirement to maintain performance in a fully elastic cluster where
Executors aren't just reallocated across an essentially fixed number of
Worker nodes, but rather the number of Workers itself is dynamic.
Retaining the file interface to those shuffle materializations while also
using HDFS for the spark.local.dirs has a certain amount of attraction, but
I also wonder whether a typical HDFS deployment is really sufficient to
handle this kind of elastic cluster scaling.  For instance and assuming
HDFS co-located on worker nodes, if after a work-load burst your cluster
dynamically changes from 10000 workers to 1000, will the typical HDFS
replication factor be sufficient to retain access to the shuffle files in
HDFS, or will we instead be seeing numerous FetchFailure exceptions, Tasks
recomputed or Stages aborted, etc. so that the net effect is not all that
much different than if the shuffle files had not been relocated to HDFS and
the Executors or ShuffleService instances had just disappeared along with
the worker nodes?

On Thu, Apr 28, 2016 at 10:46 AM, Michael Gummelt <mgummelt@mesosphere.io>
wrote:

> > Why would you run the shuffle service on 10K nodes but Spark executors
> on just 100 nodes? wouldn't you also run that service just on the 100
> nodes?
>
> We have to start the service beforehand, out of band, and we don't know a
> priori where the Spark executors will land.  Those 100 executors could land
> on any of the 10K nodes.
>
> > What does plumbing it through HDFS buy you in comparison?
>
> It drops the shuffle service requirement, which is HUGE.  It means Spark
> can completely vacate the machine when it's not in use, which is crucial
> for a large, multi-tenant cluster.  ShuffledRDDs can now read the map files
> from HDFS, rather than the ancestor executors, which means we can shut
> executors down immediately after the shuffle files are written.
>
> > There's some additional overhead and if anything you lose some control
> over locality, in a context where I presume HDFS itself is storing data on
> much more than the 100 Spark nodes.
>
> Write locality would be sacrificed, but the descendent executors were
> already doing a remote read (they have to read from multiple ancestor
> executors), so there's no additional cost in read locality.  In fact, if we
> take advantage of HDFS's favored node feature, we could make it likely that
> all map files for a given partition land on the same node, so the
> descendent executor would never have to do a remote read!  We'd effectively
> shift the remote IO from read side to write side, for theoretically no
> change in performance.
>
> In summary:
>
> Advantages:
> - No shuffle service dependency (increased utilization, decreased
> management cost)
> - Shut executors down immediately after shuffle files are written, rather
> than waiting for a timeout (increased utilization)
> - HDFS is HA, so shuffle files survive a node failure, which isn't true
> for the shuffle service (decreased latency during failures)
> - Potential ability to parallelize shuffle file reads if we write a new
> shuffle iterator (decreased latency)
>
> Disadvantages
> - Increased write latency (but potentially not if we implement it
> efficiently.  See above).
> - Would need some sort of GC on HDFS shuffle files
>
>
>
>
>
> On Thu, Apr 28, 2016 at 1:36 AM, Sean Owen <sowen@cloudera.com> wrote:
>
>> Why would you run the shuffle service on 10K nodes but Spark executors
>> on just 100 nodes? wouldn't you also run that service just on the 100
>> nodes?
>>
>> What does plumbing it through HDFS buy you in comparison? There's some
>> additional overhead and if anything you lose some control over
>> locality, in a context where I presume HDFS itself is storing data on
>> much more than the 100 Spark nodes.
>>
>> On Thu, Apr 28, 2016 at 1:34 AM, Michael Gummelt <mgummelt@mesosphere.io>
>> wrote:
>> >> Are you suggesting to have shuffle service persist and fetch data with
>> >> hdfs, or skip shuffle service altogether and just write to hdfs?
>> >
>> > Skip shuffle service altogether.  Write to HDFS.
>> >
>> > Mesos environments tend to be multi-tenant, and running the shuffle
>> service
>> > on all nodes could be extremely wasteful.  If you're running a 10K node
>> > cluster, and you'd like to run a Spark job that consumes 100 nodes, you
>> > would have to run the shuffle service on all 10K nodes out of band of
>> Spark
>> > (e.g. marathon).  I'd like a solution for dynamic allocation that
>> doesn't
>> > require this overhead.
>> >
>> > I'll look at SPARK-1529.
>> >
>> > On Wed, Apr 27, 2016 at 10:24 AM, Steve Loughran <
>> stevel@hortonworks.com>
>> > wrote:
>> >>
>> >>
>> >> > On 27 Apr 2016, at 04:59, Takeshi Yamamuro <linguin.m.s@gmail.com>
>> >> > wrote:
>> >> >
>> >> > Hi, all
>> >> >
>> >> > See SPARK-1529 for related discussion.
>> >> >
>> >> > // maropu
>> >>
>> >>
>> >> I'd not seen that discussion.
>> >>
>> >> I'm actually curious about why the 15% diff in performance between Java
>> >> NIO and Hadoop FS APIs, and, if it is the case (Hadoop still uses the
>> >> pre-NIO libraries, *has anyone thought of just fixing Hadoop Local FS
>> >> codepath?*
>> >>
>> >> It's not like anyone hasn't filed JIRAs on that ... it's just that
>> nothing
>> >> has ever got to a state where it was considered ready to adopt, where
>> >> "ready" means: passes all unit and load tests against Linux, Unix,
>> Windows
>> >> filesystems. There's been some attempts, but they never quite got much
>> >> engagement or support, especially as nio wasn't there properly until
>> Java 7,
>> >> —and Hadoop was stuck on java 6 support until 2015. That's no longer a
>> >> constraint: someone could do the work, using the existing JIRAs as
>> starting
>> >> points.
>> >>
>> >>
>> >> If someone did do this in RawLocalFS, it'd be nice if the patch also
>> >> allowed you to turn off CRC creation and checking.
>> >>
>> >> That's not only part of the overhead, it means that flush() doesn't,
>> not
>> >> until you reach the end of a CRC32 block ... so breaking what few
>> durability
>> >> guarantees POSIX offers.
>> >>
>> >>
>> >>
>> >
>> >
>> >
>> > --
>> > Michael Gummelt
>> > Software Engineer
>> > Mesosphere
>>
>
>
>
> --
> Michael Gummelt
> Software Engineer
> Mesosphere
>

Mime
View raw message