spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Hegner, Travis" <>
Subject Re: SPARK-18689: A proposal for priority based app scheduling utilizing linux cgroups.
Date Thu, 15 Dec 2016 18:38:46 GMT
Thanks for the response Jörn,

This patch is intended only for spark standalone.

My understanding of the YARN cgroup support is that it only limits cpu, rather than allocates
it based on the priority or shares system. This could be old documentation that I'm remembering,
however. Another issue with YARN is that it has a lot more overhead than standalone mode,
and always seemed a bit less responsive in general. Lastly, I remember struggling greatly
with yet another resource abstraction layer (as if spark doesn't have enough already), it
still statically allocated cores (albeit virtual ones), and it was much more cumbersome to
find a proper balance of resources to request for an app.

My experience in trying to accomplish something like this in Mesos was always met with frustration
because the system still statically allocated cores away to be reserved by individual apps.
Trying to adjust the priority of individual applications was only possible by increasing the
core count, further starving other apps of available cores. It was impossible to give a priority
lower than the default to an app. The cpu.shares parameter was abstracted away as a multiple
of the number of requested cores, which had a double down affect on the app: not only was
it given more cores, it was also given a higher priority to run on them. Perhaps this has
changed in more recent versions, but this was my experience when testing it.

I'm not familiar with a spark scheduler for kubernetes, unless you mean to launch a standalone
cluster in containers with kubernetes? In that case, this patch would simply divvy up the
resources allocated to the spark-worker container among each of it's executors, based on the
shares that each executor is given. This is similar to how my current environment works, I'm
just not using kubernetes as a container launcher. I found kubernetes was quite limiting in
the way we wanted our network to be structured, and it also seemed quite difficult to get
new functionality exposed in the form of their yaml API system.

My goal with this patch is to essentially eliminate the static allocation of cpu cores at
all. Give each app time on the cpu equal to the number of shares it has as a percentage of
the total pool.



From: Jörn Franke <>
Sent: Thursday, December 15, 2016 12:48
To: Hegner, Travis
Cc: Apache Spark Dev
Subject: Re: SPARK-18689: A proposal for priority based app scheduling utilizing linux cgroups.


What about yarn or mesos used in combination with Spark. The have also cgroups. Or a kubernetes
etc deployment.

On 15 Dec 2016, at 17:37, Hegner, Travis <<>>

Hello Spark Devs,

I have finally completed a mostly working proof of concept. I do not want to create a pull
request for this code, as I don't believe it's production worthy at the moment. My intent
is to better communicate what I'd like to accomplish. Please review the following patch:

What the code does:

Currently, it exposes two options "spark.cgroups.enabled", which defaults to false, and "spark.executor.shares"
which defaults to None. When cgroups mode is enabled, a single executor is created on each
worker, with access to all cores. The worker will create a parent cpu cgroup (on first executor
launch) called "spark-worker" to house any executors that it launches. Each executor is put
into it's own cgroup named with the app id, under the parent cgroup. The cpu.shares parameter
is set to the value in "spark.executor.shares", if this is "None", it inherits the value from
the parent cgroup.

Tested on Ubuntu 16:04 (docker containers), kernel 4.4.0-53-generic: I have not run unit tests.
I do not know if/how cgroups v2 (kernel 4.5) is going to change this code base, but it looks
like the kernel interface is the same for the most part.

I was able to launch a spark shell which consumed all cores in the cluster, but sat idle.
I was then able to launch an application (client deploy-mode) which was also allocated all
cores in the cluster, and ran to completion unhindered. Each of the executors on each worker
was properly placed into it's respective cgroup, which in turn had the correct cpu.shares
value allocated.

What the code still needs:

* Documentation (assuming the community moves forward with some kind of implementation)

* Sometimes the cgroups get destroyed after app completion, sometimes they don't. (need to
put `.destroy()` call in a `finally` block., or in the `maybeCleanupApplication()` method;
what do you think?)

* Proper handling of drivers's resources when running `--deploy-mode cluster`

* Better web UI indication of cgroup mode or core sharing (currently just shows up as an over
allocation of cores per worker)

* Better environment/os/platform detection and testing (I won't be surprised if there is something
broken if trying to run this on a different OS)

* Security/permissions for cgroups if running worker as non-root (perhaps creating the parent
cgroup with correct permissions before launching the worker is all that is necessary)

  - running the worker in a container currently requires --privileged mode (I haven't figured
out if/what capability makes cgroups writable, or if it's possible to use a new cgroup mount

* More user defined options

  - cgroup root path (currently hard coded)

  - driver cpu.shares (for cluster deploy-mode: would require a specially named cgroup...
s"$appId-driver" ? default same #shares as executor? default double shares?

  - parent cpu.shares (currently os default)

  - parent cgroup name (currently hard coded)

I tried to structure the initial concept to make it easy to add support for more cgroup features
(cpuset, mem, etc...), should the community feel there is value in adding them. Linux cgroups
are an extremely powerful resource allocation and isolation tool, and this patch is only scratching
the surface of their general capabilities. Of course, as Mr. Loughran's points out, expanding
into these features will require more code maintenance, but not enough that we should shy
away from it.


I personally believe that any multi-node resource allocation system should offload as much
of the scheduling and resource allocation as possible to the underlying kernel within the
node level. Each node's own kernel is the best equipped place to manage those resources. Only
the node's kernel can allocate a few seconds worth of cpu to the low priority app, while the
high priority app is waiting on disk I/O, and instantly give it back to the high priority
app when it needs it, with (near) real-time granularity

The multi-node system should set up a proper framework to give each node's kernel the information
it needs to allocate the resources correctly. Naturally, the system should allow resource
reservations, and even limits, for the purposes of meeting and testing for SLAs and worst
case scenarios as well. Linux cgroups are capable of doing those things in a near real-time

With a proper convention of priorities/shares for applications within an organization, I believe
that everyone can get better throughput out of their hardware, at any cluster size. But, alas,
*that* is not a problem I'm trying to solve currently.


Sorry that the patch is pretty rough still, as I'm still getting my head wrapped around spark's
code base structure. Looking forward to any feedback.



From: Hegner, Travis <<>>
Sent: Tuesday, December 6, 2016 10:49
To: Steve Loughran
Cc: Shuai Lin; Apache Spark Dev
Subject: Re: SPARK-18689: A proposal for priority based app scheduling utilizing linux cgroups.


I appreciate your experience and insight when dealing with large clusters at the data-center
scale. I'm also well aware of the complex nature of schedulers, and that it is an area of
ongoing research being done by people/companies with many more resources than I have. This
might explain my apprehension in even calling this idea a *scheduler*: I wanted to avoid this
exact kind of debate over what I want to accomplish. This is also why I mentioned that this
idea will mostly benefit users with small clusters.

I've used many of the big named "cluster schedulers" (YARN, Mesos, and Kubernetes) and the
main thing that they have in common is that they don't work well for my use case. Those systems
are designed for large scale 1000+ node clusters, and become painful to manage in the small
cluster range. Most of the tools that we've attempted to use don't work well for us, so we've
written several of our own:

It can be most easily stated by the fact that *we are not* Google, Facebook, or Amazon: we
don't have a *data-center* of servers to manage, we barely have half of a rack. *We are not
trying to solve the problem that you are referring to*. We are operating at a level that if
we aren't meeting SLAs, then we could just buy another server to add to the cluster. I imagine
that we are not alone in that fact either, I've seen that many of the questions on SO and
on the user list are from others operating at a level similar to ours.

I understand that pre-emption isn't inherently a bad thing, and that these multi-node systems
typically handle it gracefully. However, if idle CPU is expensive, then how much more does
wasted CPU cost when a nearly complete task is pre-empted and has to be started over? Fortunately
for me, that isn't a problem that I have to solve at the moment.

>Instead? Use a multi-user cluster scheduler and spin up different spark instances for
the different workloads

See my above comment on how well these cluster schedulers work for us. I have considered the
avenue of multiple spark clusters, and in reality the infrastructure we have set up would
allow me to do this relatively easily. In fact, in my environment, this is a similar solution
to what I'm proposing, just managed one layer up the stack and with less flexibility. I am
trying to avoid this solution however because it does require more overhead and maintenance.
What if I want two spark apps to run on the same cluster at the same time, sharing the available
CPU capacity equally? I can't accomplish that easily with multiple spark clusters. Also, we
are a 1 to 2 man operation at this point, I don't have teams of ops people to task with managing
as many spark clusters as I feel like launching.

>FWIW, it's often memory consumption that's most problematic here.

Perhaps in the use-cases you have experience with, but not currently in mine. In fact, my
initial proposal is net yet changing the allocation of memory as a resource. This would still
be statically allocated in a FIFO manner as long as memory is available on the cluster, the
same way it is now.

>I would strongly encourage you to avoid this topic

Thanks for the suggestion, but I will choose how I spend my time. If I can find a simple solution
to a problem that I face, and I'm willing to share that solution, I'd hope one would encourage
that instead.

Perhaps I haven't yet clearly communicated what I'm trying to do. In short, *I am not trying
to write a scheduler*: I am trying to slightly (and optionally) tweak the way executors are
allocated and launched, so that I can more intuitively and more optimally utilize my small
spark cluster.



From: Steve Loughran <<>>
Sent: Tuesday, December 6, 2016 06:54
To: Hegner, Travis
Cc: Shuai Lin; Apache Spark Dev
Subject: Re: SPARK-18689: A proposal for priority based app scheduling utilizing linux cgroups.

This is essentially what the cluster schedulers do: allow different people to submit work
with different credentials and priority; cgroups & equivalent to limit granted resources
to requested ones. If you have pre-emption enabled, you can even have one job kill work off
the others. Spark does recognise pre-emption failures and doesn't treat it as a sign of problems
in the executor, that is: it doesn't over-react.

cluster scheduling is one of the cutting edge bits of datacentre-scale computing —if you
are curious about what is state of the art, look at the Morning Paper
for coverage last week of MS and google work there. YARN, Mesos, Borg, whatever Amazon use,
at scale it's not just meeting SLAs, its about how much idle CPU costs, and how expensive
even a 1-2% drop in throughput would be.

I would strongly encourage you to avoid this topic, unless you want dive deep into the whole
world of cluster scheduling, the debate over centralized vs decentralized, the idelogical
one of "should services ever get allocated RAM/CPU in times of low overall load?", the challenge
of swap, or more specifically, "how do you throttle memory consumption", as well as what to
do when the IO load of a service is actually incurred on a completely different host from
the one your work is running on.

There's also a fair amount of engineering work; to get a hint download the Hadoop tree and
look at hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux
for the cgroup support, and then hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl
for the native code needed alongside this. Then consider that it's not just a matter of writing
something similar, it's getting an OSS project to actually commit to maintaining such code
after you provide that initial contribution.

Instead? Use a multi-user cluster scheduler and spin up different spark instances for the
different workloads, with different CPU & memory limits, queue priorities, etc. Other
people have done the work, written the tests, deployed it in production, met their own SLAs
*and are therefore committed to maintaining this stuff*.


On 5 Dec 2016, at 15:36, Hegner, Travis <<>>

My apologies, in my excitement of finding a rather simple way to accomplish the scheduling
goal I have in mind, I hastily jumped straight into a technical solution, without explaining
that goal, or the problem it's attempting to solve.

You are correct that I'm looking for an additional running mode for the standalone scheduler.
Perhaps you could/should classify it as a different scheduler, but I don't want to give the
impression that this will be as difficult to implement as most schedulers are. Initially,
from a memory perspective, we would still allocate in a FIFO manner. This new scheduling mode
(or new scheduler, if you'd rather) would mostly benefit any users with small-ish clusters,
both on-premise and cloud based. Essentially, my end goal is to be able to run multiple *applications*
simultaneously with each application having *access* to the entire core count of the cluster.

I have a very cpu intensive application that I'd like to run weekly. I have a second application
that I'd like to run hourly. The hourly application is more time critical (higher priority),
so I'd like it to finish in a small amount of time. If I allow the first app to run with all
cores (this takes several days on my 64 core cluster), then nothing else can be executed when
running with the default FIFO scheduler. All of the cores have been allocated to the first
application, and it will not release them until it is finished. Dynamic allocation does not
help in this case, as there is always a backlog of tasks to run until the first application
is nearing the end anyway. Naturally, I could just limit the number of cores that the first
application has access to, but then I have idle cpu time when the second app is not running,
and that is not optimal. Secondly in that case, the second application only has access to
the *leftover* cores that the first app has not allocated, and will take a considerably longer
amount of time to run.

You could also imagine a scenario where a developer has a spark-shell running without specifying
the number of cores they want to utilize (whether intentionally or not). As I'm sure you know,
the default is to allocate the entire cluster to this application. The cores allocated to
this shell are unavailable to other applications, even if they are just sitting idle while
a developer is getting their environment set up to run a very big job interactively. Other
developers that would like to launch interactive shells are stuck waiting for the first one
to exit their shell.

My proposal would eliminate this static nature of core counts and allow as many simultaneous
applications to be running as the cluster memory (still statically partitioned, at least initially)
will allow. Applications could be configured with a "cpu shares" parameter (just an arbitrary
integer relative only to other applications) which is essentially just passed through to the
linux cgroup cpu.shares setting. Since each executor of an application on a given worker runs
in it's own process/jvm, then that process could be easily be placed into a cgroup created
and dedicated for that application.

Linux cgroups cpu.shares are pretty well documented, but the gist is that processes competing
for cpu time are allocated a percentage of time equal to their share count as a percentage
of all shares in that level of the cgroup hierarchy. If two applications are both scheduled
on the same core with the same weight, each will get to utilize 50% of the time on that core.
This is all built into the kernel, and the only thing the spark worker has to do is create
a cgroup for each application, set the cpu.shares parameter, and assign the executors for
that application to the new cgroup. If multiple executors are running on a single worker,
for a single application, the cpu time available to that application is divided among each
of those executors equally. The default for cpu.shares is that they are not limiting in any
way. A process can consume all available cpu time if it would otherwise be idle anyway.

That's the issue that surfaces in google papers: should jobs get idle capacity. Current consensus
is "no". Why not? Because you may end up writing an SLA-sensitive app which just happens to
meet it's SLAs in times of light cluster load, but precisely when the cluster is busy, it
suddenly slows down, leading to stress all round, in the "why is this service suddenly unusable"
kind of stress. Instead you keep the cluster busy with low priority preemptible work, use
labels to allocate specific hosts to high-SLA apps, etc.

Another benefit to passing cpu.shares directly to the kernel (as opposed to some abstraction)
is that cpu share allocations are heterogeneous to all processes running on a machine. An
admin could have very fine grained control over which processes get priority access to cpu
time, depending on their needs.

To continue my personal example above, my long running cpu intensive application could utilize
100% of all cluster cores if they are idle. Then my time sensitive app could be launched with
nine times the priority and the linux kernel would scale back the first application to 10%
of all cores (completely seemlessly and automatically: no pre-emption, just fewer time slices
of cpu allocated by the kernel to the first application), while the second application gets
90% of all the cores until it completes.

FWIW, it's often memory consumption that's most problematic here. If one process starts to
swap, it hurts everything else. But Java isn't that good at handling limited heap/memory size;
you have to spec that heap up front.

The only downside that I can think of currently is that this scheduling mode would create
an increase in context switching on each host. This issue is somewhat mitigated by still statically
allocating memory however, since there wouldn't typically be an exorbitant number of applications
running at once.

In my opinion, this would allow the most optimal usage of cluster resources. Linux cgroups
allow you to control access to more than just cpu shares. You can apply the same concept to
other resources (memory, disk io). You can also set up hard limits so that an application
will never get more than is allocated to it. I know that those limitations are important for
some use cases involving predictability of application execution times. Eventually, this idea
could be expanded to include many more of the features that cgroups provide.

Thanks again for any feedback on this idea. I hope that I have explained it a bit better now.
Does anyone else can see value in it?

I'm not saying "don't get involved in the scheduling problem"; I'm trying to show just how
complex it gets in a large system. Before you begin to write a line of code, I'd recommend

-you read as much of the published work as you can, including the google and microsoft papers,
Facebook's FairScheduler work, etc, etc.
-have a look at the actual code inside those schedulers whose source is public, that's YARN
and Mesos.
-try using these schedulers for your own workloads.

really: scheduling work across a datacentre a complex problem that is still considered a place
for cutting-edge research. Avoid unless you want to do that.


View raw message