spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Xiangrui Meng (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-24374) SPIP: Support Barrier Scheduling in Apache Spark
Date Mon, 04 Jun 2018 21:46:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-24374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16500900#comment-16500900
] 

Xiangrui Meng commented on SPARK-24374:
---------------------------------------

[~leftnoteasy] Thanks for your input! 

{quote}
This JIRA is trying to solve the gang scheduling problem of ML applications, however, gang
scheduling should be handled by underlying resource scheduler instead of Spark. Because Spark
with non-standalone deployment has no control of how to do resource allocation.
{quote}

Agree that the overall resource provision should be handled by the resource manager. I think
this is true for standalone as well. However, inside Spark, we still need to fine-grained
job scheduling to allocate tasks. For example, a skewed join might hold some task slots for
quite long and hence the tasks from next stage have to wait to start all together. Ideally,
Spark should be able to talk to the resource manager for better elasticity.

{quote}
If the proposed API is like to implement gang-scheduling by using gather-and-hold pattern,
existing Spark API should be good enough – just to request resources until it reaches target
#containers. Application needs to wait in both cases.
{quote}

The Spark API is not good enough for two reasons:

1) The all-reduce patten could be implemented by a single gather and broadcast, but driver
that gathers the message would become the bottleneck when the message is big (~20 million
features) or there are too many nodes. This is why we started with SPARK-1485 (all-reduce)
but ended up at SPARK-2174 (tree-reduce).

2) If we ask user program to set a barrier and wait for all tasks to be ready. It won't work
in case of failures, because Spark will only retry the failed task instead of all. This requires
significant code changes on users side to handle the failure scenario.

{quote}
MPI needs launched processes to contact their master so master can launch slaves and make
them to interconnect to each other (phone-home). Application needs to implement logics to
talk to different RMs.
{quote}

[~jiangxb1987] made a prototype of this scenario, not on YARN but on standalone to help discuss
the design. In the barrier stage, users can easily get node infos of all tasks. So the MPI
setup could be simplified. We will take a look at mpich2-yarn code base. Is there a design
doc there? So we can quickly get the high-level design choices.

{quote}
One potential benefit I can think about embedding app to Spark is, applications could directly
read from memory of Spark tasks.
{quote}

I'm preparing a SPIP doc for accelerating the data exchange between Spark and 3rd-party frameworks.
I would treat it as an orthogonal issue here. Fast data exchange would help the model inference
use case a lot. For training, we might just need a standard data interface to simplify data
conversions.

> SPIP: Support Barrier Scheduling in Apache Spark
> ------------------------------------------------
>
>                 Key: SPARK-24374
>                 URL: https://issues.apache.org/jira/browse/SPARK-24374
>             Project: Spark
>          Issue Type: Epic
>          Components: ML, Spark Core
>    Affects Versions: 3.0.0
>            Reporter: Xiangrui Meng
>            Assignee: Xiangrui Meng
>            Priority: Major
>              Labels: SPIP
>         Attachments: SPIP_ Support Barrier Scheduling in Apache Spark.pdf
>
>
> (See details in the linked/attached SPIP doc.)
> {quote}
> The proposal here is to add a new scheduling model to Apache Spark so users can properly
embed distributed DL training as a Spark stage to simplify the distributed training workflow.
For example, Horovod uses MPI to implement all-reduce to accelerate distributed TensorFlow
training. The computation model is different from MapReduce used by Spark. In Spark, a task
in a stage doesn’t depend on any other tasks in the same stage, and hence it can be scheduled
independently. In MPI, all workers start at the same time and pass messages around. To embed
this workload in Spark, we need to introduce a new scheduling model, tentatively named “barrier
scheduling”, which launches tasks at the same time and provides users enough information
and tooling to embed distributed DL training. Spark can also provide an extra layer of fault
tolerance in case some tasks failed in the middle, where Spark would abort all tasks and restart
the stage.
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message