Thanks!  Sounds like my rough understanding was roughly right :)

Definitely understand cached RDDs can add to the memory requirements.  Luckily, like you mentioned, you can configure spark to flush that to disk and bound its total size in memory via spark.storage.memoryFraction, so I have a pretty good handle on the overall RDD contribution.

Thanks for all the help.

Keith


On Wed, May 28, 2014 at 6:43 AM, Christopher Nguyen <ctn@adatao.com> wrote:
Keith, please see inline.

--
Christopher T. Nguyen
Co-founder & CEO, Adatao


On Tue, May 27, 2014 at 7:22 PM, Keith Simmons <keith@pulse.io> wrote:
A dash of both.  I want to know enough that I can "reason about", rather than "strictly control", the amount of memory Spark will use.  If I have a big data set, I want to understand how I can design it so that Spark's memory consumption falls below my available resources.  Or alternatively, if it's even possible for Spark to process a data set over a certain size.  And if I run into memory problems, I want to know which knobs to turn, and how turning those knobs will affect memory consumption.

In practice, to avoid OOME, a key dial we use is the size (or inversely, number) of the partitions of your dataset. Clearly there is some "blow-up factor" F such that, e.g., if you start out with 128MB on-disk data partitions, you would consume 128F MB of memory, both by Spark and by your closure code. Knowing this, you would want to size the partitions such that AvailableMemoryInMBPerWorker / NumberOfCoresPerWorker > 128F. To arrive at F, you could do some back-of-the-envelope modeling, and/or run the job and observe empirically.
 

It's my understanding that between certain key stages in a Spark DAG (i.e. group by stages), Spark will serialize all data structures necessary to continue the computation at the next stage, including closures.  So in theory, per machine, Spark only needs to hold the transient memory required to process the partitions assigned to the currently active tasks.  Is my understanding correct?  Specifically, once a key/value pair is serialized in the shuffle stage of a task, are the references to the raw java objects released before the next task is started.

Yes, that is correct in non-cached mode. At the same time, Spark also does something else optionally, which is to keep the data structures (RDDs) persistent in memory (*). As such it is possible partitions that are not being actively worked on to be consuming memory. Spark will spill all these to local disk if they take up more memory than it is allowed to take. So the key thing to worry about is less about what Spark does (apart of overhead and yes, the possibility of bugs that need to be fixed), and more about what your closure code does with JVM memory as a whole. If in doubt, refer back to the "blow-up factor" model described above.

(*) this is a fundamentally differentiating feature of Spark over a range of other "in-memory" architectures, that focus on raw-data or transient caches that serve non-equivalent purposes when viewed from the application level. It allows for very fast access to ready-to-consume high-level data structures, as long as available RAM permits.




On Tue, May 27, 2014 at 6:21 PM, Christopher Nguyen <ctn@adatao.com> wrote:
Keith, do you mean "bound" as in (a) strictly control to some quantifiable limit, or (b) try to minimize the amount used by each task?

If "a", then that is outside the scope of Spark's memory management, which you should think of as an application-level (that is, above JVM) mechanism. In this scope, Spark "voluntarily" tracks and limits the amount of memory it uses for explicitly known data structures, such as RDDs. What Spark cannot do is, e.g., control or manage the amount of JVM memory that a given piece of user code might take up. For example, I might write some closure code that allocates a large array of doubles unbeknownst to Spark.

If "b", then your thinking is in the right direction, although quite imperfect, because of things like the example above. We often experience OOME if we're not careful with job partitioning. What I think Spark needs to evolve to is at least to include a mechanism for application-level hints about task memory requirements. We might work on this and submit a PR for it.

--
Christopher T. Nguyen
Co-founder & CEO, Adatao


On Tue, May 27, 2014 at 5:33 PM, Keith Simmons <keith@pulse.io> wrote:
I'm trying to determine how to bound my memory use in a job working with more data than can simultaneously fit in RAM.  From reading the tuning guide, my impression is that Spark's memory usage is roughly the following:

(A) In-Memory RDD use + (B) In memory Shuffle use + (C) Transient memory used by all currently running tasks

I can bound A with spark.storage.memoryFraction and I can bound B with spark.shuffle.memoryFraction.  I'm wondering how to bound C.

It's been hinted at a few times on this mailing list that you can reduce memory use by increasing the number of partitions.  That leads me to believe that the amount of transient memory is roughly follows:

total_data_set_size/number_of_partitions * number_of_tasks_simultaneously_running_per_machine

Does this sound right?  In other words, as I increase the number of partitions, the size of each partition will decrease, and since each task is processing a single partition and there are a bounded number of tasks in flight, my memory use has a rough upper limit.

Keith