spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Igor Pernek <>
Subject Re: Spark memory optimization
Date Mon, 07 Jul 2014 11:31:08 GMT
Thanks guys! Actually, I'm not doing any caching (at least I'm not calling
cache/persist), do I still need to use the DISK_ONLY storage level?
However, I do use reduceByKey and sortByKey. Mayur, you mentioned that
sortByKey requires data to fit the memory. Is there any way to work around
this (maybe by increasing the number of partitions or something similar?).

What alternative would you suggest, if Spark is not the way to go with this
kind of scenario. As mentioned, what I like about spark is its high level
of abstraction of parallelization. I'm ready to sacrifice speed (if the
slowdown is not too big - I'm doing batch processing, nothing real-time)
for code simplicity and readability.

On Fri, Jul 4, 2014 at 3:16 PM, Surendranauth Hiraman <> wrote:
> When using DISK_ONLY, keep in mind that disk I/O is pretty high. Make
sure you are writing to multiple disks for best operation. And even with
DISK_ONLY, we've found that there is a minimum threshold for executor ram
(spark.executor.memory), which for us seemed to be around 8 GB.
> If you find that, with enough disks, you still have errors/exceptions
getting the flow to finish, first check iostat to see if disk is the
> Then, you may want to try tuning some or all of the following, which
affect buffers and timeouts. For us, because we did not have enough disks
to start out, the io bottleneck caused timeouts and other errors. In the
end, IMHO, it's probably best to solve the problem by adding disks than by
tuning the parameters, because it seemed that the i/o bottlenecks
eventually backed up the processing.
>         //conf.set("spark.shuffle.consolidateFiles","true")
>         //conf.set("spark.shuffle.file.buffer.kb", "200")        // does
doubling this help? should increase in-memory buffer to decrease disk writes
>         //conf.set("spark.reducer.maxMbInFlight", "96")     // does
doubling this help? should allow for more simultaneous shuffle data to be
read from remotes
>         // because we use disk-only, we should be able to reverse the
default memory usage settings
>         //conf.set("spark.shuffle.memoryFraction","0.6") // default 0.3
>         //conf.set("","0.3")   // default 0.6
>         //conf.set("spark.worker.timeout","180")
>         // akka settings
>         //conf.set("spark.akka.threads", "300")   // number of akka actors
>         //conf.set("spark.akka.timeout", "180")   // we saw a problem
with smaller numbers
>         //conf.set("spark.akka.frameSize", "100")  // not sure if we need
to up this. Default is 10.
>         //conf.set("spark.akka.batchSize", "30")
>         //conf.set("spark.akka.askTimeout", "30") // supposedly this is
important for high cpu/io load
>         // block manager
>         //conf.set("",
>         //conf.set("spark.blockManagerHeartBeatMs", "80000")
> On Fri, Jul 4, 2014 at 8:52 AM, Mayur Rustagi <>
>> I would go with Spark only if you are certain that you are going to
scale out in the near future.
>> You can change the default storage of RDD to DISK_ONLY, that might
remove issues around any rdd leveraging memory. Thr are some functions
particularly sortbykey that require data to fit in memory to work, so you
may be hitting some of those walls too.
>> Regards
>> Mayur
>> Mayur Rustagi
>> Ph: +1 (760) 203 3257
>> @mayur_rustagi
>> On Fri, Jul 4, 2014 at 2:36 PM, Igor Pernek <> wrote:
>>> Hi all!
>>> I have a folder with 150 G of txt files (around 700 files, on average
each 200 MB).
>>> I'm using scala to process the files and calculate some aggregate
statistics in the end. I see two possible approaches to do that: - manually
loop through all the files, do the calculations per file and merge the
results in the end - read the whole folder to one RDD, do all the
operations on this single RDD and let spark do all the parallelization
>>> I'm leaning towards the second approach as it seems cleaner (no need
for parallelization specific code), but I'm wondering if my scenario will
fit the constraints imposed by my hardware and data. I have one workstation
with 16 threads and 64 GB of RAM available (so the parallelization will be
strictly local between different processor cores). I might scale the
infrastructure with more machines later on, but for now I would just like
to focus on tunning the settings for this one workstation scenario.
>>> The code I'm using: - reads TSV files, and extracts meaningful data to
(String, String, String) triplets - afterwards some filtering, mapping and
grouping is performed - finally, the data is reduced and some aggregates
are calculated
>>> I've been able to run this code with a single file (~200 MB of data),
however I get a java.lang.OutOfMemoryError: GC overhead limit exceeded
and/or a Java out of heap exception when adding more data (the application
breaks with 6GB of data but I would like to use it with 150 GB of data).
>>> I guess I would have to tune some parameters to make this work. I would
appreciate any tips on how to approach this problem (how to debug for
memory demands). I've tried increasing the 'spark.executor.memory' and
using a smaller number of cores (the rational being that each core needs
some heap space), but this didn't solve my problems.
>>> I don't need the solution to be very fast (it can easily run for a few
hours even days if needed). I'm also not caching any data, but just saving
them to the file system in the end. If you think it would be more feasible
to just go with the manual parallelization approach, I could do that as
>>> Thanks,
>>> Igor
> --
> Velos
> Accelerating Machine Learning
> NEW YORK, NY 10001
> O: (917) 525-2466 ext. 105
> F: 646.349.4063
> E:
> W:

View raw message