spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Shao, Saisai" <saisai.s...@intel.com>
Subject RE: oome from blockmanager
Date Fri, 22 Nov 2013 09:38:27 GMT
Hi Aaron,

I've also met the same problem that shuffle takes so much overhead for large number of partitions.
I think it is an important issue when processing large data.

In my case I have 2000 mapper and 2000 reducers,  I dump the memory of executor and found
that byte array takes about 80% of total jvm memory,  which are referred by FastBufferedOutputStream,
and created by DiskBlockObjectWriter. It seems that there are so many instances of DiskBlockObjectWriter
and each DiskBlockObjectWriter will has 100KB buffer for FastBufferedOutputStream by default.
These buffers are persisted through task execution period and cannot be garbage collected
unless task is finished.

My cluster has 6 nodes, and 40G memory and 24 core per node, I tried with 5000 partitions,
this will easily got OOM.

What a dilemma is that my application needs groupByKey transformation which requires small
partitions size, but small partition size will lead to more partition numbers that also consumes
lots of memory.

Thanks
Jerry

From: Aaron Davidson [mailto:ilikerps@gmail.com]
Sent: Friday, November 22, 2013 2:54 PM
To: user@spark.incubator.apache.org
Subject: Re: oome from blockmanager

Thanks for your feedback; I think this is a very important issue on the usability front. One
thing to consider is that at some data size, one simply needs larger or more nodes. m1.large
is essentially the smallest ec2 instance size that can run a Spark job of any reasonable size.
That's not an excuse for an OOM, really -- one should generally just see (heavily) degraded
performance instead of actually failing the job. Additionally, the number of open files scales
with the number of reducers in Spark, rather than, say, Map Reduce, where each mapper only
writes to one file, at the cost of later sorting the entire thing. This unfortunately means
that adding nodes isn't really a full solution in your case, since each one would try to have
36k compressed output streams open.

The short term solutions have already been discussed: decrease the number of reducers (and
mappers, if you need them to be tied) or potentially turn off compression if Snappy is holding
too much buffer space. A third option would actually be to decrease the number of executors
per node to 1, since that would double the available memory and roughly halve the usage. Clearly
either of the latter two solutions will produce a significant slowdown, while the first should
keep the same or better performance. While Spark is good at handling a large number of partitions,
there is still some cost to schedule every task, as well as to store and forward the metadata
for every shuffle block (which grows with R * M), so the ideal partition size is one that
fits exactly into memory without OOMing -- although this is of course an unrealistic situation
to aim for.

The longer term solutions include algorithms which degrade gracefully instead of OOMing (although
this would be a solution for too-large partitions instead of too-little, where the metadata
and buffering becomes the issue) and to potentially adopt a more Map-Reducey style of shuffling
where we would only need to write to 1 file per executor at a time, with some significant
processing and disk bandwidth cost. I am currently investigating shuffle file performance,
and thanks to your feedback here, I'll additionally investigate the memory overheads inherent
in shuffling as well.


On Thu, Nov 21, 2013 at 10:20 PM, Stephen Haberman <stephen.haberman@gmail.com<mailto:stephen.haberman@gmail.com>>
wrote:

> More significant in shuffling data is the number of reducers
Makes sense.

> so the lower bound on the number of reducers is 1.1TB/8GB = 138
This seems slightly optimistic. My math would be: m1.large = 7.5gb total, leave
2gb to OS/worker/etc., split 5.5gb between 2 executors = 2.75gb, plus say Spark
will need 20% or so as metadata/overhead, so ~2gb actually available to each
executor to put our working data in memory.

But the 1.1tb of data is compressed, say with a 50% reduction. And we wrap a
case class around each line to abstract away the parsing logic, and, as you say,
Java instances will be a good deal bigger than the raw data they encapsulate.
Maybe 3x bigger? So, 2gb / 2 / 3 = ~300mb of raw data that, once uncompressed
and loaded as Java objects, would likely fit in RAM.

1.1tb/.3gb = 3666 reducers.

Perhaps I'm being pessmistic, but an 8gb partition size seems way high. Are
other Spark users really using partitions this large?

I'll admit our current value of 64mb is probably way low. We had seen a
lot of OOMEs when first using Spark, due to having too many partitions
(one per file loaded from S3). When writing our "auto coalesce" logic,
I didn't know a good partition size to shoot for, but had read that
HDFS used 64mb blocks.

I thought we'd get the most parity with regular Spark/HDFS users by
using the same value, so that's what we went with. Perhaps this was
a bad assumption?

> So a key question for you is, how many reducers did you use in this
> task?
18,000. Yes, I know that seems naive.

As an explanation, we prefer for our reports to not have/require any
manual partitioning hints from the programmer. Our theory is that, once
the data is loaded and we make a good guessimiate about partitioning
(which is handled by a utility library that knows our goal partition
size), the report logic itself just shouldn't care.

So, in this case, the report is just cogrouping the 18k partition RDD with
another RDD, and since we don't have spark.default.parallelism set, the
resulting RDD is also 18k partitions.

To us, this seems like the only safe default behavior; if the map-side RDD was
correctly partitioned into 18k, and any fewer partitions would (in theory) risk
OOMEs, then the reduce-side RDD should have the same number of partitions,
because it will have, for a cogroup, data from multiple RDDs, not just the
biggest upstream RDD.

We would like to avoid having the report hard-code partition size
overrides into a few/all of it cogroup calls--how would the report know
what value to hard code? What date range is it currently being ran for?
How much data is really there for this run?

Also, I'm generally cautious about dropping the number of partitions
too low, because my impression is that Spark excels at/prefers lots of
small tasks, since its architecture allows it to schedule/move/recover
them quickly.

> I'll also be very interested so see any heap dumps
Sure! I followed up with Aaron offlist.

- Stephen


Mime
View raw message