spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Matei Zaharia <matei.zaha...@gmail.com>
Subject Re: map - reduce only with disk
Date Tue, 02 Jun 2015 21:27:56 GMT
Yup, exactly.

All the workers will use local disk in addition to RAM, but maybe one thing you need to configure
is the directory to use for that. It should be set trough spark.local.dir. By default it's
/tmp, which on some machines is also in RAM, so that could be a problem. You should set it
to a folder on a real disk, or even better, a comma-separated list of disks (e.g. /mnt1,/mnt2)
if you have multiple disks.

Matei

> On Jun 2, 2015, at 1:03 PM, Octavian Ganea <octavian.ganea@inf.ethz.ch> wrote:
> 
> Thanks a lot! 
> 
> So my understanding is that you call persist only if you need to use the rdd at least
twice to compute different things. So, if I just need the RDD for a single scan , like in
a simple flatMap(..).reduceByKey(..).saveAsTextFile(..) how do I force the slaves to use the
hard-disk (in addition to the RAM) when the RAM is full and not to fail like they do now?
> 
> Thank you! 
> 
> 2015-06-02 21:25 GMT+02:00 Matei Zaharia <matei.zaharia@gmail.com <mailto:matei.zaharia@gmail.com>>:
> You shouldn't have to persist the RDD at all, just call flatMap and reduce on it directly.
If you try to persist it, that will try to load the original dat into memory, but here you
are only scanning through it once.
> 
> Matei
> 
>> On Jun 2, 2015, at 2:09 AM, Octavian Ganea <octavian.ganea@inf.ethz.ch <mailto:octavian.ganea@inf.ethz.ch>>
wrote:
>> 
>> Thanks,
>> 
>> I was actually using reduceByKey, not groupByKey. 
>> 
>> I also tried this: rdd.persist(MEMORY_AND_DISK).flatMap(...).reduceByKey . However,
I got the same error as before, namely the error described here: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-td23098.html
<http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-td23098.html>
>> 
>> My task is to count the frequencies of pairs of words that occur in a set of documents
at least 5 times. I know that this final output is sparse and should comfortably fit in memory.
However, the intermediate pairs that are spilled by flatMap might need to be stored on the
disk, but I don't understand why the persist option does not work and my job fails.
>> 
>> My code:
>> 
>> rdd.persist(StorageLevel.MEMORY_AND_DISK)
>>      .flatMap(x => outputPairsOfWords(x)) // outputs pairs of type ((word1,word2)
, 1)
>>     .reduceByKey((a,b) => (a + b).toShort)
>>     .filter({case((x,y),count) => count >= 5})
>>  
>> 
>> My cluster has 8 nodes, each with 129 GB of RAM and 16 cores per node. One node I
keep for the master, 7 nodes for the workers.
>> 
>> my conf:
>> 
>>     conf.set("spark.cores.max", "128")
>>     conf.set("spark.akka.frameSize", "1024")
>>     conf.set("spark.executor.memory", "115g")
>>     conf.set("spark.shuffle.file.buffer.kb", "1000")
>> 
>> my spark-env.sh:
>>  ulimit -n 200000
>>  SPARK_JAVA_OPTS="-Xss1g -Xmx129g -d64 -XX:-UseGCOverheadLimit -XX:-UseCompressedOops"
>>  SPARK_DRIVER_MEMORY=129G
>> 
>> spark version: 1.1.1
>> 
>> Thank you a lot for your help!
>> 
>> 
>> 2015-06-02 4:40 GMT+02:00 Matei Zaharia <matei.zaharia@gmail.com <mailto:matei.zaharia@gmail.com>>:
>> As long as you don't use cache(), these operations will go from disk to disk, and
will only use a fixed amount of memory to build some intermediate results. However, note that
because you're using groupByKey, that needs the values for each key to all fit in memory at
once. In this case, if you're going to reduce right after, you should use reduceByKey, which
will be more efficient.
>> 
>> Matei
>> 
>> > On Jun 1, 2015, at 2:21 PM, octavian.ganea <octavian.ganea@inf.ethz.ch <mailto:octavian.ganea@inf.ethz.ch>>
wrote:
>> >
>> > Dear all,
>> >
>> > Does anyone know how can I force Spark to use only the disk when doing a
>> > simple flatMap(..).groupByKey.reduce(_ + _) ? Thank you!
>> >
>> >
>> >
>> > --
>> > View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/map-reduce-only-with-disk-tp23102.html
<http://apache-spark-user-list.1001560.n3.nabble.com/map-reduce-only-with-disk-tp23102.html>
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com <http://nabble.com/>.
>> >
>> > ---------------------------------------------------------------------
>> > To unsubscribe, e-mail: user-unsubscribe@spark.apache.org <mailto:user-unsubscribe@spark.apache.org>
>> > For additional commands, e-mail: user-help@spark.apache.org <mailto:user-help@spark.apache.org>
>> >
>> 
>> 
>> 
>> 
>> -- 
>> Octavian Ganea
>> 
>> Research assistant at ETH Zurich
>> octavian.ganea@inf.ethz.ch <mailto:octavian.ganea@inf.ethz.ch>
>> http://da.inf.ethz.ch/people/OctavianGanea/ <http://da.inf.ethz.ch/people/OctavianGanea/>
> 
> 
> 
> 
> -- 
> Octavian Ganea
> 
> Research assistant at ETH Zurich
> octavian.ganea@inf.ethz.ch <mailto:octavian.ganea@inf.ethz.ch>
> http://da.inf.ethz.ch/people/OctavianGanea/ <http://da.inf.ethz.ch/people/OctavianGanea/>


Mime
View raw message