Thanks guys! Actually, I'm not doing any caching (at least I'm not calling cache/persist), do I still need to use the DISK_ONLY storage level? However, I do use reduceByKey and sortByKey. Mayur, you mentioned that sortByKey requires data to fit the memory. Is there any way to work around this (maybe by increasing the number of partitions or something similar?).

What alternative would you suggest, if Spark is not the way to go with this kind of scenario. As mentioned, what I like about spark is its high level of abstraction of parallelization. I'm ready to sacrifice speed (if the slowdown is not too big - I'm doing batch processing, nothing real-time) for code simplicity and readability.


On Fri, Jul 4, 2014 at 3:16 PM, Surendranauth Hiraman <suren.hiraman@velos.io> wrote:
>
> When using DISK_ONLY, keep in mind that disk I/O is pretty high. Make sure you are writing to multiple disks for best operation. And even with DISK_ONLY, we've found that there is a minimum threshold for executor ram (spark.executor.memory), which for us seemed to be around 8 GB.
>
> If you find that, with enough disks, you still have errors/exceptions getting the flow to finish, first check iostat to see if disk is the bottleneck.
>
> Then, you may want to try tuning some or all of the following, which affect buffers and timeouts. For us, because we did not have enough disks to start out, the io bottleneck caused timeouts and other errors. In the end, IMHO, it's probably best to solve the problem by adding disks than by tuning the parameters, because it seemed that the i/o bottlenecks eventually backed up the processing.
>
>         //conf.set("spark.shuffle.consolidateFiles","true")
>
>         //conf.set("spark.shuffle.file.buffer.kb", "200")        // does doubling this help? should increase in-memory buffer to decrease disk writes
>         //conf.set("spark.reducer.maxMbInFlight", "96")     // does doubling this help? should allow for more simultaneous shuffle data to be read from remotes
>        
>         // because we use disk-only, we should be able to reverse the default memory usage settings
>         //conf.set("spark.shuffle.memoryFraction","0.6") // default 0.3
>         //conf.set("spark.storage.memoryFraction","0.3")   // default 0.6
>
>         //conf.set("spark.worker.timeout","180")
>        
>         // akka settings
>         //conf.set("spark.akka.threads", "300")   // number of akka actors
>         //conf.set("spark.akka.timeout", "180")   // we saw a problem with smaller numbers
>         //conf.set("spark.akka.frameSize", "100")  // not sure if we need to up this. Default is 10.
>         //conf.set("spark.akka.batchSize", "30")
>         //conf.set("spark.akka.askTimeout", "30") // supposedly this is important for high cpu/io load
>        
>         // block manager
>         //conf.set("spark.storage.blockManagerTimeoutIntervalMs", "180000")
>         //conf.set("spark.blockManagerHeartBeatMs", "80000")
>
>
>
>
>
>
> On Fri, Jul 4, 2014 at 8:52 AM, Mayur Rustagi <mayur.rustagi@gmail.com> wrote:
>>
>> I would go with Spark only if you are certain that you are going to scale out in the near future.
>> You can change the default storage of RDD to DISK_ONLY, that might remove issues around any rdd leveraging memory. Thr are some functions particularly sortbykey that require data to fit in memory to work, so you may be hitting some of those walls too.
>> Regards
>> Mayur
>>
>> Mayur Rustagi
>> Ph: +1 (760) 203 3257
>> http://www.sigmoidanalytics.com
>> @mayur_rustagi
>>
>>
>>
>> On Fri, Jul 4, 2014 at 2:36 PM, Igor Pernek <igor@pernek.net> wrote:
>>>
>>> Hi all!
>>>
>>> I have a folder with 150 G of txt files (around 700 files, on average each 200 MB).
>>>
>>> I'm using scala to process the files and calculate some aggregate statistics in the end. I see two possible approaches to do that: - manually loop through all the files, do the calculations per file and merge the results in the end - read the whole folder to one RDD, do all the operations on this single RDD and let spark do all the parallelization
>>>
>>> I'm leaning towards the second approach as it seems cleaner (no need for parallelization specific code), but I'm wondering if my scenario will fit the constraints imposed by my hardware and data. I have one workstation with 16 threads and 64 GB of RAM available (so the parallelization will be strictly local between different processor cores). I might scale the infrastructure with more machines later on, but for now I would just like to focus on tunning the settings for this one workstation scenario.
>>>
>>> The code I'm using: - reads TSV files, and extracts meaningful data to (String, String, String) triplets - afterwards some filtering, mapping and grouping is performed - finally, the data is reduced and some aggregates are calculated
>>>
>>> I've been able to run this code with a single file (~200 MB of data), however I get a java.lang.OutOfMemoryError: GC overhead limit exceeded and/or a Java out of heap exception when adding more data (the application breaks with 6GB of data but I would like to use it with 150 GB of data).
>>>
>>> I guess I would have to tune some parameters to make this work. I would appreciate any tips on how to approach this problem (how to debug for memory demands). I've tried increasing the 'spark.executor.memory' and using a smaller number of cores (the rational being that each core needs some heap space), but this didn't solve my problems.
>>>
>>> I don't need the solution to be very fast (it can easily run for a few hours even days if needed). I'm also not caching any data, but just saving them to the file system in the end. If you think it would be more feasible to just go with the manual parallelization approach, I could do that as well.
>>>
>>> Thanks,
>>>
>>> Igor
>>
>>
>
>
>
> --
>                                                            
> SUREN HIRAMAN, VP TECHNOLOGY
> Velos
> Accelerating Machine Learning
>
> 440 NINTH AVENUE, 11TH FLOOR
> NEW YORK, NY 10001
> O: (917) 525-2466 ext. 105
> F: 646.349.4063
> E: suren.hiraman@velos.io
> W: www.velos.io
>