spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Matthias Boehm <>
Subject Re: Fair scheduler pool leak
Date Sat, 07 Apr 2018 21:32:45 GMT
No, these pools are not created per job but per parfor worker and
thus, used to execute many jobs. For all scripts with a single
top-level parfor this is equivalent to static initialization. However,
yes we create these pools dynamically on demand to avoid unnecessary
initialization and handle scenarios of nested parfor.

At the end of the day, we just want to configure fair scheduling in a
programmatic way without the need for additional configuration files
which is a hassle for a library that is meant to work out-of-the-box.
Simply setting 'spark.scheduler.mode' to FAIR does not do the trick
because we end up with a single default fair scheduler pool in FIFO
mode, which is equivalent to FIFO. Providing a way to set the mode of
the default scheduler would be awesome.

Regarding why fair scheduling showed generally better performance for
out-of-core datasets, I don't have a good answer. My guess was
isolated job scheduling and better locality of in-memory partitions.


On Sat, Apr 7, 2018 at 8:50 AM, Mark Hamstra <> wrote:
> Sorry, but I'm still not understanding this use case. Are you somehow
> creating additional scheduling pools dynamically as Jobs execute? If so,
> that is a very unusual thing to do. Scheduling pools are intended to be
> statically configured -- initialized, living and dying with the Application.
> On Sat, Apr 7, 2018 at 12:33 AM, Matthias Boehm <> wrote:
>> Thanks for the clarification Imran - that helped. I was mistakenly
>> assuming that these pools are removed via weak references, as the
>> ContextCleaner does for RDDs, broadcasts, and accumulators, etc. For
>> the time being, we'll just work around it, but I'll file a
>> nice-to-have improvement JIRA. Also, you're right, we see indeed these
>> warnings but they're usually hidden when running with ERROR or INFO
>> (due to overwhelming output) log levels.
>> Just to give the context: We use these scheduler pools in SystemML's
>> parallel for loop construct (parfor), which allows combining data- and
>> task-parallel computation. If the data fits into the remote memory
>> budget, the optimizer may decide to execute the entire loop as a
>> single spark job (with groups of iterations mapped to spark tasks). If
>> the data is too large and non-partitionable, the parfor loop is
>> executed as a multi-threaded operator in the driver and each worker
>> might spawn several data-parallel spark jobs in the context of the
>> worker's scheduler pool, for operations that don't fit into the
>> driver.
>> We decided to use these fair scheduler pools (w/ fair scheduling
>> across pools, FIFO per pool) instead of the default FIFO scheduler
>> because it gave us better and more robust performance back in the
>> Spark 1.x line. This was especially true for concurrent jobs over
>> shared input data (e.g., for hyper parameter tuning) and when the data
>> size exceeded aggregate memory. The only downside was that we had to
>> guard against scenarios where concurrently jobs would lazily pull a
>> shared RDD into cache because that lead to thread contention on the
>> executors' block managers and spurious replicated in-memory
>> partitions.
>> Regards,
>> Matthias
>> On Fri, Apr 6, 2018 at 8:08 AM, Imran Rashid <> wrote:
>> > Hi Matthias,
>> >
>> > This doeesn't look possible now.  It may be worth filing an improvement
>> > jira
>> > for.
>> >
>> > But I'm trying to understand what you're trying to do a little better.
>> > So
>> > you intentionally have each thread create a new unique pool when its
>> > submits
>> > a job?  So that pool will just get the default pool configuration, and
>> > you
>> > will see lots of these messages in your logs?
>> >
>> >
>> >
>> >
>> > What is the use case for creating pools this way?
>> >
>> > Also if I understand correctly, it doesn't even matter if the thread
>> > dies --
>> > that pool will still stay around, as the rootPool will retain a
>> > reference to
>> > its (the pools aren't really actually tied to specific threads).
>> >
>> > Imran
>> >
>> > On Thu, Apr 5, 2018 at 9:46 PM, Matthias Boehm <>
>> > wrote:
>> >>
>> >> Hi all,
>> >>
>> >> for concurrent Spark jobs spawned from the driver, we use Spark's fair
>> >> scheduler pools, which are set and unset in a thread-local manner by
>> >> each worker thread. Typically (for rather long jobs), this works very
>> >> well. Unfortunately, in an application with lots of very short
>> >> parallel sections, we see 1000s of these pools remaining in the Spark
>> >> UI, which indicates some kind of leak. Each worker cleans up its local
>> >> property by setting it to null, but not all pools are properly
>> >> removed. I've checked and reproduced this behavior with Spark 2.1-2.3.
>> >>
>> >> Now my question: Is there a way to explicitly remove these pools,
>> >> either globally, or locally while the thread is still alive?
>> >>
>> >> Regards,
>> >> Matthias
>> >>
>> >> ---------------------------------------------------------------------
>> >> To unsubscribe e-mail:
>> >>
>> >
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail:

To unsubscribe e-mail:

View raw message