spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Christopher Nguyen <...@adatao.com>
Subject Re: Spark Memory Bounds
Date Wed, 28 May 2014 13:43:52 GMT
Keith, please see inline.

--
Christopher T. Nguyen
Co-founder & CEO, Adatao <http://adatao.com>
linkedin.com/in/ctnguyen



On Tue, May 27, 2014 at 7:22 PM, Keith Simmons <keith@pulse.io> wrote:

> A dash of both.  I want to know enough that I can "reason about", rather
> than "strictly control", the amount of memory Spark will use.  If I have a
> big data set, I want to understand how I can design it so that Spark's
> memory consumption falls below my available resources.  Or alternatively,
> if it's even possible for Spark to process a data set over a certain size.
>  And if I run into memory problems, I want to know which knobs to turn, and
> how turning those knobs will affect memory consumption.
>

In practice, to avoid OOME, a key dial we use is the size (or inversely,
number) of the partitions of your dataset. Clearly there is some "blow-up
factor" F such that, e.g., if you start out with 128MB on-disk data
partitions, you would consume 128F MB of memory, both by Spark and by your
closure code. Knowing this, you would want to size the partitions such that
AvailableMemoryInMBPerWorker / NumberOfCoresPerWorker > 128F. To arrive at
F, you could do some back-of-the-envelope modeling, and/or run the job and
observe empirically.


>
> It's my understanding that between certain key stages in a Spark DAG (i.e.
> group by stages), Spark will serialize all data structures necessary to
> continue the computation at the next stage, including closures.  So in
> theory, per machine, Spark only needs to hold the transient memory required
> to process the partitions assigned to the currently active tasks.  Is my
> understanding correct?  Specifically, once a key/value pair is serialized
> in the shuffle stage of a task, are the references to the raw java objects
> released before the next task is started.
>

Yes, that is correct in non-cached mode. At the same time, Spark also does
something else optionally, which is to keep the data structures (RDDs)
persistent in memory (*). As such it is possible partitions that are not
being actively worked on to be consuming memory. Spark will spill all these
to local disk if they take up more memory than it is allowed to take. So
the key thing to worry about is less about what Spark does (apart of
overhead and yes, the possibility of bugs that need to be fixed), and more
about what your closure code does with JVM memory as a whole. If in doubt,
refer back to the "blow-up factor" model described above.

(*) this is a fundamentally differentiating feature of Spark over a range
of other "in-memory" architectures, that focus on raw-data or transient
caches that serve non-equivalent purposes when viewed from the application
level. It allows for very fast access to ready-to-consume high-level data
structures, as long as available RAM permits.


>
>
> On Tue, May 27, 2014 at 6:21 PM, Christopher Nguyen <ctn@adatao.com>wrote:
>
>> Keith, do you mean "bound" as in (a) strictly control to some
>> quantifiable limit, or (b) try to minimize the amount used by each task?
>>
>> If "a", then that is outside the scope of Spark's memory management,
>> which you should think of as an application-level (that is, above JVM)
>> mechanism. In this scope, Spark "voluntarily" tracks and limits the amount
>> of memory it uses for explicitly known data structures, such as RDDs. What
>> Spark cannot do is, e.g., control or manage the amount of JVM memory that a
>> given piece of user code might take up. For example, I might write some
>> closure code that allocates a large array of doubles unbeknownst to Spark.
>>
>> If "b", then your thinking is in the right direction, although quite
>> imperfect, because of things like the example above. We often experience
>> OOME if we're not careful with job partitioning. What I think Spark needs
>> to evolve to is at least to include a mechanism for application-level hints
>> about task memory requirements. We might work on this and submit a PR for
>> it.
>>
>> --
>> Christopher T. Nguyen
>> Co-founder & CEO, Adatao <http://adatao.com>
>> linkedin.com/in/ctnguyen
>>
>>
>>
>> On Tue, May 27, 2014 at 5:33 PM, Keith Simmons <keith@pulse.io> wrote:
>>
>>> I'm trying to determine how to bound my memory use in a job working with
>>> more data than can simultaneously fit in RAM.  From reading the tuning
>>> guide, my impression is that Spark's memory usage is roughly the following:
>>>
>>> (A) In-Memory RDD use + (B) In memory Shuffle use + (C) Transient memory
>>> used by all currently running tasks
>>>
>>> I can bound A with spark.storage.memoryFraction and I can bound B with spark.shuffle.memoryFraction.
>>>  I'm wondering how to bound C.
>>>
>>> It's been hinted at a few times on this mailing list that you can reduce
>>> memory use by increasing the number of partitions.  That leads me to
>>> believe that the amount of transient memory is roughly follows:
>>>
>>> total_data_set_size/number_of_partitions *
>>> number_of_tasks_simultaneously_running_per_machine
>>>
>>> Does this sound right?  In other words, as I increase the number of
>>> partitions, the size of each partition will decrease, and since each task
>>> is processing a single partition and there are a bounded number of tasks in
>>> flight, my memory use has a rough upper limit.
>>>
>>> Keith
>>>
>>
>>
>

Mime
View raw message