spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Imran Rashid <iras...@cloudera.com>
Subject Re: flatMap output on disk / flatMap memory overhead
Date Wed, 10 Jun 2015 01:57:33 GMT
I agree with Richard.  It looks like the issue here is shuffling, and
shuffle data is always written to disk, so the issue is definitely not that
all the output of flatMap has to be stored in memory.

If at all possible, I'd first suggest upgrading to a new version of spark
-- even in 1.2, there were big improvements to shuffle with sort based
shuffle as the default.

On Tue, Jun 2, 2015 at 1:09 PM, Richard Marscher <rmarscher@localytics.com>
wrote:

> Are you sure it's memory related? What is the disk utilization and IO
> performance on the workers? The error you posted looks to be related to
> shuffle trying to obtain block data from another worker node and failing to
> do so in reasonable amount of time. It may still be memory related, but I'm
> not sure that other resources are ruled out yet.
>
> On Tue, Jun 2, 2015 at 5:10 AM, octavian.ganea <octavian.ganea@inf.ethz.ch
> > wrote:
>
>> I was tried using reduceByKey, without success.
>>
>> I also tried this: rdd.persist(MEMORY_AND_DISK).flatMap(...).reduceByKey .
>> However, I got the same error as before, namely the error described here:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-td23098.html
>>
>> My task is to count the frequencies of pairs of words that occur in a set
>> of
>> documents at least 5 times. I know that this final output is sparse and
>> should comfortably fit in memory. However, the intermediate pairs that are
>> spilled by flatMap might need to be stored on the disk, but I don't
>> understand why the persist option does not work and my job fails.
>>
>> My code:
>>
>> rdd.persist(StorageLevel.MEMORY_AND_DISK)
>>      .flatMap(x => outputPairsOfWords(x)) // outputs pairs of type
>> ((word1,word2) , 1)
>>     .reduceByKey((a,b) => (a + b).toShort)
>>     .filter({case((x,y),count) => count >= 5})
>>
>>
>> My cluster has 8 nodes, each with 129 GB of RAM and 16 cores per node. One
>> node I keep for the master, 7 nodes for the workers.
>>
>> my conf:
>>
>>     conf.set("spark.cores.max", "128")
>>     conf.set("spark.akka.frameSize", "1024")
>>     conf.set("spark.executor.memory", "115g")
>>     conf.set("spark.shuffle.file.buffer.kb", "1000")
>>
>> my spark-env.sh:
>>  ulimit -n 200000
>>  SPARK_JAVA_OPTS="-Xss1g -Xmx129g -d64 -XX:-UseGCOverheadLimit
>> -XX:-UseCompressedOops"
>>  SPARK_DRIVER_MEMORY=129G
>>
>> spark version: 1.1.1
>>
>> Thank you a lot for your help!
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-tp23098p23108.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>>
>

Mime
View raw message