spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Surendranauth Hiraman <suren.hira...@velos.io>
Subject Re: Spark memory optimization
Date Mon, 07 Jul 2014 12:30:52 GMT
Using persist() is a sort of a "hack" or a hint (depending on your
perspective :-)) to make the RDD use disk, not memory. As I mentioned
though, the disk io has consequences, mainly (I think) making sure you have
enough disks to not let io be a bottleneck.

Increasing partitions I think is the other common approach people take,
from what I've read.

For alternatives, if your data is in HDFS or you just want to stick with
Map/Reduce, then the higher level abstractions on top of M/R you might want
to look at include the following, which have both Scala and Java
implementations in some cases.

Scalding (Scala API on top of Cascading and it seems is the most active of
such projects, at least on the surface)
Scoobi
Scrunch (Scala wrapper around Crunch)

There are other parallel distributed frameworks outside of the Hadoop
ecosystem, of course.

-Suren




On Mon, Jul 7, 2014 at 7:31 AM, Igor Pernek <igor@pernek.net> wrote:

> Thanks guys! Actually, I'm not doing any caching (at least I'm not calling
> cache/persist), do I still need to use the DISK_ONLY storage level?
> However, I do use reduceByKey and sortByKey. Mayur, you mentioned that
> sortByKey requires data to fit the memory. Is there any way to work around
> this (maybe by increasing the number of partitions or something similar?).
>
> What alternative would you suggest, if Spark is not the way to go with
> this kind of scenario. As mentioned, what I like about spark is its high
> level of abstraction of parallelization. I'm ready to sacrifice speed (if
> the slowdown is not too big - I'm doing batch processing, nothing
> real-time) for code simplicity and readability.
>
>
>
> On Fri, Jul 4, 2014 at 3:16 PM, Surendranauth Hiraman <
> suren.hiraman@velos.io> wrote:
> >
> > When using DISK_ONLY, keep in mind that disk I/O is pretty high. Make
> sure you are writing to multiple disks for best operation. And even with
> DISK_ONLY, we've found that there is a minimum threshold for executor ram
> (spark.executor.memory), which for us seemed to be around 8 GB.
> >
> > If you find that, with enough disks, you still have errors/exceptions
> getting the flow to finish, first check iostat to see if disk is the
> bottleneck.
> >
> > Then, you may want to try tuning some or all of the following, which
> affect buffers and timeouts. For us, because we did not have enough disks
> to start out, the io bottleneck caused timeouts and other errors. In the
> end, IMHO, it's probably best to solve the problem by adding disks than by
> tuning the parameters, because it seemed that the i/o bottlenecks
> eventually backed up the processing.
> >
> >         //conf.set("spark.shuffle.consolidateFiles","true")
> >
> >         //conf.set("spark.shuffle.file.buffer.kb", "200")        // does
> doubling this help? should increase in-memory buffer to decrease disk writes
> >         //conf.set("spark.reducer.maxMbInFlight", "96")     // does
> doubling this help? should allow for more simultaneous shuffle data to be
> read from remotes
> >
> >         // because we use disk-only, we should be able to reverse the
> default memory usage settings
> >         //conf.set("spark.shuffle.memoryFraction","0.6") // default 0.3
> >         //conf.set("spark.storage.memoryFraction","0.3")   // default 0.6
> >
> >         //conf.set("spark.worker.timeout","180")
> >
> >         // akka settings
> >         //conf.set("spark.akka.threads", "300")   // number of akka
> actors
> >         //conf.set("spark.akka.timeout", "180")   // we saw a problem
> with smaller numbers
> >         //conf.set("spark.akka.frameSize", "100")  // not sure if we
> need to up this. Default is 10.
> >         //conf.set("spark.akka.batchSize", "30")
> >         //conf.set("spark.akka.askTimeout", "30") // supposedly this is
> important for high cpu/io load
> >
> >         // block manager
> >         //conf.set("spark.storage.blockManagerTimeoutIntervalMs",
> "180000")
> >         //conf.set("spark.blockManagerHeartBeatMs", "80000")
> >
> >
> >
> >
> >
> >
> > On Fri, Jul 4, 2014 at 8:52 AM, Mayur Rustagi <mayur.rustagi@gmail.com>
> wrote:
> >>
> >> I would go with Spark only if you are certain that you are going to
> scale out in the near future.
> >> You can change the default storage of RDD to DISK_ONLY, that might
> remove issues around any rdd leveraging memory. Thr are some functions
> particularly sortbykey that require data to fit in memory to work, so you
> may be hitting some of those walls too.
> >> Regards
> >> Mayur
> >>
> >> Mayur Rustagi
> >> Ph: +1 (760) 203 3257
> >> http://www.sigmoidanalytics.com
> >> @mayur_rustagi
> >>
> >>
> >>
> >> On Fri, Jul 4, 2014 at 2:36 PM, Igor Pernek <igor@pernek.net> wrote:
> >>>
> >>> Hi all!
> >>>
> >>> I have a folder with 150 G of txt files (around 700 files, on average
> each 200 MB).
> >>>
> >>> I'm using scala to process the files and calculate some aggregate
> statistics in the end. I see two possible approaches to do that: - manually
> loop through all the files, do the calculations per file and merge the
> results in the end - read the whole folder to one RDD, do all the
> operations on this single RDD and let spark do all the parallelization
> >>>
> >>> I'm leaning towards the second approach as it seems cleaner (no need
> for parallelization specific code), but I'm wondering if my scenario will
> fit the constraints imposed by my hardware and data. I have one workstation
> with 16 threads and 64 GB of RAM available (so the parallelization will be
> strictly local between different processor cores). I might scale the
> infrastructure with more machines later on, but for now I would just like
> to focus on tunning the settings for this one workstation scenario.
> >>>
> >>> The code I'm using: - reads TSV files, and extracts meaningful data to
> (String, String, String) triplets - afterwards some filtering, mapping and
> grouping is performed - finally, the data is reduced and some aggregates
> are calculated
> >>>
> >>> I've been able to run this code with a single file (~200 MB of data),
> however I get a java.lang.OutOfMemoryError: GC overhead limit exceeded
> and/or a Java out of heap exception when adding more data (the application
> breaks with 6GB of data but I would like to use it with 150 GB of data).
> >>>
> >>> I guess I would have to tune some parameters to make this work. I
> would appreciate any tips on how to approach this problem (how to debug for
> memory demands). I've tried increasing the 'spark.executor.memory' and
> using a smaller number of cores (the rational being that each core needs
> some heap space), but this didn't solve my problems.
> >>>
> >>> I don't need the solution to be very fast (it can easily run for a few
> hours even days if needed). I'm also not caching any data, but just saving
> them to the file system in the end. If you think it would be more feasible
> to just go with the manual parallelization approach, I could do that as
> well.
> >>>
> >>> Thanks,
> >>>
> >>> Igor
> >>
> >>
> >
> >
> >
> > --
> >
> > SUREN HIRAMAN, VP TECHNOLOGY
> > Velos
> > Accelerating Machine Learning
> >
> > 440 NINTH AVENUE, 11TH FLOOR
> > NEW YORK, NY 10001
> > O: (917) 525-2466 ext. 105
> > F: 646.349.4063
> > E: suren.hiraman@velos.io
> > W: www.velos.io
> >
>



-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v <suren.hiraman@sociocast.com>elos.io
W: www.velos.io

Mime
View raw message