spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Puneet Kapoor <puneet.cse.i...@gmail.com>
Subject Re: flatMap output on disk / flatMap memory overhead
Date Sat, 01 Aug 2015 09:41:04 GMT
Hi Ocatavian,

Just out of curiosity, did you try persisting your RDD in serialized format
"MEMORY_AND_DISK_SER"  or "MEMORY_ONLY_SER" ??
i.e. changing your :
"rdd.persist(MEMORY_AND_DISK)"
to
"rdd.persist(MEMORY_ONLY_SER)"

Regards

On Wed, Jun 10, 2015 at 7:27 AM, Imran Rashid <irashid@cloudera.com> wrote:

> I agree with Richard.  It looks like the issue here is shuffling, and
> shuffle data is always written to disk, so the issue is definitely not that
> all the output of flatMap has to be stored in memory.
>
> If at all possible, I'd first suggest upgrading to a new version of spark
> -- even in 1.2, there were big improvements to shuffle with sort based
> shuffle as the default.
>
> On Tue, Jun 2, 2015 at 1:09 PM, Richard Marscher <rmarscher@localytics.com
> > wrote:
>
>> Are you sure it's memory related? What is the disk utilization and IO
>> performance on the workers? The error you posted looks to be related to
>> shuffle trying to obtain block data from another worker node and failing to
>> do so in reasonable amount of time. It may still be memory related, but I'm
>> not sure that other resources are ruled out yet.
>>
>> On Tue, Jun 2, 2015 at 5:10 AM, octavian.ganea <
>> octavian.ganea@inf.ethz.ch> wrote:
>>
>>> I was tried using reduceByKey, without success.
>>>
>>> I also tried this: rdd.persist(MEMORY_AND_DISK).flatMap(...).reduceByKey
>>> .
>>> However, I got the same error as before, namely the error described here:
>>>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-td23098.html
>>>
>>> My task is to count the frequencies of pairs of words that occur in a
>>> set of
>>> documents at least 5 times. I know that this final output is sparse and
>>> should comfortably fit in memory. However, the intermediate pairs that
>>> are
>>> spilled by flatMap might need to be stored on the disk, but I don't
>>> understand why the persist option does not work and my job fails.
>>>
>>> My code:
>>>
>>> rdd.persist(StorageLevel.MEMORY_AND_DISK)
>>>      .flatMap(x => outputPairsOfWords(x)) // outputs pairs of type
>>> ((word1,word2) , 1)
>>>     .reduceByKey((a,b) => (a + b).toShort)
>>>     .filter({case((x,y),count) => count >= 5})
>>>
>>>
>>> My cluster has 8 nodes, each with 129 GB of RAM and 16 cores per node.
>>> One
>>> node I keep for the master, 7 nodes for the workers.
>>>
>>> my conf:
>>>
>>>     conf.set("spark.cores.max", "128")
>>>     conf.set("spark.akka.frameSize", "1024")
>>>     conf.set("spark.executor.memory", "115g")
>>>     conf.set("spark.shuffle.file.buffer.kb", "1000")
>>>
>>> my spark-env.sh:
>>>  ulimit -n 200000
>>>  SPARK_JAVA_OPTS="-Xss1g -Xmx129g -d64 -XX:-UseGCOverheadLimit
>>> -XX:-UseCompressedOops"
>>>  SPARK_DRIVER_MEMORY=129G
>>>
>>> spark version: 1.1.1
>>>
>>> Thank you a lot for your help!
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-tp23098p23108.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> For additional commands, e-mail: user-help@spark.apache.org
>>>
>>>
>>
>

Mime
View raw message