spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aaron Davidson <ilike...@gmail.com>
Subject Re: oome from blockmanager
Date Fri, 22 Nov 2013 22:21:30 GMT
Jerry, I need to correct what I said about the 100KB for
each FastBufferedOutputStream -- this is actually a Spark buffer, not a
compression buffer. The size can be configured using the
"spark.shuffle.file.buffer.kb" System property, and it defaults to 100. I
am still curious if you're using compression or seeing more than 48k
DiskBlockObjectWriters to account for the remaining memory used.


On Fri, Nov 22, 2013 at 9:05 AM, Aaron Davidson <ilikerps@gmail.com> wrote:

> Great, thanks for the feedback. It sounds like you're using the LZF
> compression scheme -- switching to Snappy should see significantly less
> buffer space used up per DiskBlockObjectWriter, but this doesn't really
> solve the underlying problem. In general I've been thinking of "Spark
> nodes" as having high memory and a moderate number of cores, but with 24
> cores and 40GB of memory, each core really doesn't get that much memory
> individually, despite every one needing its own set of
> DiskBlockObjectWriters.
>
> One thing that is a little odd is that with your numbers, you should have
> 2000 (reducers) * 24 (cores) = 48k DiskBlockObjectWriters. These should
> only require a total of 4.8GB for the entire node, though, rather than 80%
> of your JVM memory. Were you seeing significantly more than 48k
> DiskBlockObjectWriters?
>
>
> On Fri, Nov 22, 2013 at 1:38 AM, Shao, Saisai <saisai.shao@intel.com>wrote:
>
>>  Hi Aaron,
>>
>>
>>
>> I’ve also met the same problem that shuffle takes so much overhead for
>> large number of partitions. I think it is an important issue when
>> processing large data.
>>
>>
>>
>> In my case I have 2000 mapper and 2000 reducers,  I dump the memory of
>> executor and found that byte array takes about 80% of total jvm memory,
>>  which are referred by FastBufferedOutputStream, and created by
>> DiskBlockObjectWriter. It seems that there are so many instances of
>> DiskBlockObjectWriter and each DiskBlockObjectWriter will has 100KB buffer
>> for FastBufferedOutputStream by default. These buffers are persisted
>> through task execution period and cannot be garbage collected unless task
>> is finished.
>>
>>
>>
>> My cluster has 6 nodes, and 40G memory and 24 core per node, I tried with
>> 5000 partitions, this will easily got OOM.
>>
>>
>>
>> What a dilemma is that my application needs groupByKey transformation
>> which requires small partitions size, but small partition size will lead to
>> more partition numbers that also consumes lots of memory.
>>
>>
>>
>> Thanks
>>
>> Jerry
>>
>>
>>
>> *From:* Aaron Davidson [mailto:ilikerps@gmail.com]
>> *Sent:* Friday, November 22, 2013 2:54 PM
>> *To:* user@spark.incubator.apache.org
>> *Subject:* Re: oome from blockmanager
>>
>>
>>
>> Thanks for your feedback; I think this is a very important issue on the
>> usability front. One thing to consider is that at some data size, one
>> simply needs larger or more nodes. m1.large is essentially the smallest ec2
>> instance size that can run a Spark job of any reasonable size. That's not
>> an excuse for an OOM, really -- one should generally just see (heavily)
>> degraded performance instead of actually failing the job. Additionally, the
>> number of open files scales with the number of reducers in Spark, rather
>> than, say, Map Reduce, where each mapper only writes to one file, at the
>> cost of later sorting the entire thing. This unfortunately means that
>> adding nodes isn't really a full solution in your case, since each one
>> would try to have 36k compressed output streams open.
>>
>>
>>
>> The short term solutions have already been discussed: decrease the number
>> of reducers (and mappers, if you need them to be tied) or potentially turn
>> off compression if Snappy is holding too much buffer space. A third option
>> would actually be to decrease the number of executors per node to 1, since
>> that would double the available memory and roughly halve the usage. Clearly
>> either of the latter two solutions will produce a significant slowdown,
>> while the first should keep the same or better performance. While Spark is
>> good at handling a large number of partitions, there is still some cost to
>> schedule every task, as well as to store and forward the metadata for every
>> shuffle block (which grows with R * M), so the ideal partition size is one
>> that fits exactly into memory without OOMing -- although this is of course
>> an unrealistic situation to aim for.
>>
>>
>>
>> The longer term solutions include algorithms which degrade gracefully
>> instead of OOMing (although this would be a solution for too-large
>> partitions instead of too-little, where the metadata and buffering becomes
>> the issue) and to potentially adopt a more Map-Reducey style of shuffling
>> where we would only need to write to 1 file per executor at a time, with
>> some significant processing and disk bandwidth cost. I am currently
>> investigating shuffle file performance, and thanks to your feedback here,
>> I'll additionally investigate the memory overheads inherent in shuffling as
>> well.
>>
>>
>>
>>
>>
>> On Thu, Nov 21, 2013 at 10:20 PM, Stephen Haberman <
>> stephen.haberman@gmail.com> wrote:
>>
>>
>> > More significant in shuffling data is the number of reducers
>>
>> Makes sense.
>>
>>
>> > so the lower bound on the number of reducers is 1.1TB/8GB = 138
>>
>> This seems slightly optimistic. My math would be: m1.large = 7.5gb total,
>> leave
>> 2gb to OS/worker/etc., split 5.5gb between 2 executors = 2.75gb, plus say
>> Spark
>> will need 20% or so as metadata/overhead, so ~2gb actually available to
>> each
>> executor to put our working data in memory.
>>
>> But the 1.1tb of data is compressed, say with a 50% reduction. And we
>> wrap a
>> case class around each line to abstract away the parsing logic, and, as
>> you say,
>> Java instances will be a good deal bigger than the raw data they
>> encapsulate.
>> Maybe 3x bigger? So, 2gb / 2 / 3 = ~300mb of raw data that, once
>> uncompressed
>> and loaded as Java objects, would likely fit in RAM.
>>
>> 1.1tb/.3gb = 3666 reducers.
>>
>> Perhaps I'm being pessmistic, but an 8gb partition size seems way high.
>> Are
>> other Spark users really using partitions this large?
>>
>> I'll admit our current value of 64mb is probably way low. We had seen a
>> lot of OOMEs when first using Spark, due to having too many partitions
>> (one per file loaded from S3). When writing our "auto coalesce" logic,
>> I didn't know a good partition size to shoot for, but had read that
>> HDFS used 64mb blocks.
>>
>> I thought we'd get the most parity with regular Spark/HDFS users by
>> using the same value, so that's what we went with. Perhaps this was
>> a bad assumption?
>>
>>
>> > So a key question for you is, how many reducers did you use in this
>> > task?
>>
>> 18,000. Yes, I know that seems naive.
>>
>> As an explanation, we prefer for our reports to not have/require any
>> manual partitioning hints from the programmer. Our theory is that, once
>> the data is loaded and we make a good guessimiate about partitioning
>> (which is handled by a utility library that knows our goal partition
>> size), the report logic itself just shouldn't care.
>>
>> So, in this case, the report is just cogrouping the 18k partition RDD with
>> another RDD, and since we don't have spark.default.parallelism set, the
>> resulting RDD is also 18k partitions.
>>
>> To us, this seems like the only safe default behavior; if the map-side
>> RDD was
>> correctly partitioned into 18k, and any fewer partitions would (in
>> theory) risk
>> OOMEs, then the reduce-side RDD should have the same number of partitions,
>> because it will have, for a cogroup, data from multiple RDDs, not just the
>> biggest upstream RDD.
>>
>> We would like to avoid having the report hard-code partition size
>> overrides into a few/all of it cogroup calls--how would the report know
>> what value to hard code? What date range is it currently being ran for?
>> How much data is really there for this run?
>>
>> Also, I'm generally cautious about dropping the number of partitions
>> too low, because my impression is that Spark excels at/prefers lots of
>> small tasks, since its architecture allows it to schedule/move/recover
>> them quickly.
>>
>>
>> > I'll also be very interested so see any heap dumps
>>
>> Sure! I followed up with Aaron offlist.
>>
>> - Stephen
>>
>>
>>
>
>

Mime
View raw message