spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nicholas Chammas <nicholas.cham...@gmail.com>
Subject What should happen if we try to cache more data than the cluster can hold in memory?
Date Fri, 01 Aug 2014 16:24:22 GMT
[Forking this thread.]

According to the Spark Programming Guide
<http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence>,
persisting RDDs with MEMORY_ONLY should not choke if the RDD cannot be held
entirely in memory:

If the RDD does not fit in memory, some partitions will not be cached and
> will be recomputed on the fly each time they're needed. This is the default
> level.


What I’m seeing per the discussion below is that when I try to cache more
data than the cluster can hold in memory, I get:

14/08/01 15:41:23 WARN TaskSetManager: Loss was due to
java.lang.OutOfMemoryError
java.lang.OutOfMemoryError: GC overhead limit exceeded
    at java.util.Arrays.copyOfRange(Arrays.java:2694)
    at java.lang.String.<init>(String.java:203)
    at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561)
    at java.nio.CharBuffer.toString(CharBuffer.java:1201)
    at org.apache.hadoop.io.Text.decode(Text.java:350)
    at org.apache.hadoop.io.Text.decode(Text.java:327)
    at org.apache.hadoop.io.Text.toString(Text.java:254)
    at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:458)
    at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:458)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:107)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
    at org.apache.spark.scheduler.Task.run(Task.scala:51)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

Trying MEMORY_AND_DISK yields the same error.

So what's the deal? I'm running 1.0.1 on EC2.

Nick


On Thu, Jul 31, 2014 at 5:17 PM, Nicholas Chammas <
nicholas.chammas@gmail.com> wrote:

Davies,
>
> That was it. Removing the call to cache() let the job run successfully,
> but this challenges my understanding of how Spark handles caching data.
>
> I thought it was safe to cache data sets larger than the cluster could
> hold in memory. What Spark would do is cache as much as it could and leave
> the rest for access from disk.
>
> Is that not correct?
>
> Nick
>
> On Thu, Jul 31, 2014 at 5:04 PM, Davies Liu <davies@databricks.com> wrote:
>
> Maybe because you try to cache all the data in memory, but heap of JVM
>> is not big enough.
>>
>> If remove the .cache(), is there still this problem?
>>
>> On Thu, Jul 31, 2014 at 1:33 PM, Nicholas Chammas
>> <nicholas.chammas@gmail.com> wrote:
>> > Hmm, looking at this stack trace a bit more carefully, it looks like the
>> > code in the Hadoop API for reading data from the source choked. Is that
>> > correct?
>> >
>> > Perhaps, there is a missing newline (or two. or more) that make 1 line
>> of
>> > data too much to read in at once? I'm just guessing here. Gonna try to
>> track
>> > this down real quick.
>> >
>> > Btw, I'm seeing this on 1.0.1 as well, so it's not a regression in
>> 1.0.2-rc1
>> > or anything like that.
>> >
>> > Nick
>> >
>> >
>> > On Thu, Jul 31, 2014 at 4:18 PM, Nicholas Chammas
>> > <nicholas.chammas@gmail.com> wrote:
>> >>
>> >> So if I try this again but in the Scala shell (as opposed to the Python
>> >> one), this is what I get:
>> >>
>> >> scala> val a = sc.textFile("s3n://some-path/*.json",
>> >> minPartitions=sc.defaultParallelism * 3).cache()
>> >> a: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at
>> >> <console>:12
>> >>
>> >> scala> a.map(_.length).max
>> >> 14/07/31 20:09:04 WARN LoadSnappy: Snappy native library is available
>> >> 14/07/31 20:10:41 WARN TaskSetManager: Lost TID 22 (task 0.0:22)
>> >> 14/07/31 20:10:41 WARN TaskSetManager: Loss was due to
>> >> java.lang.OutOfMemoryError
>> >> java.lang.OutOfMemoryError: GC overhead limit exceeded
>> >>     at java.util.Arrays.copyOfRange(Arrays.java:2694)
>> >>     at java.lang.String.<init>(String.java:203)
>> >>     at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561)
>> >>     at java.nio.CharBuffer.toString(CharBuffer.java:1201)
>> >>     at org.apache.hadoop.io.Text.decode(Text.java:350)
>> >>     at org.apache.hadoop.io.Text.decode(Text.java:327)
>> >>     at org.apache.hadoop.io.Text.toString(Text.java:254)
>> >>     at
>> >>
>> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:458)
>> >>     at
>> >>
>> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:458)
>> >>     at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
>>
>> >>     at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> >>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> >>     at
>> >>
>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>> >>     at
>> >>
>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>> >>     at
>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:107)
>> >>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
>> >>     at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>> >>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>> >>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>> >>     at
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
>> >>     at org.apache.spark.scheduler.Task.run(Task.scala:51)
>> >>     at
>> >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
>> >>     at
>> >>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> >>     at
>> >>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> >>     at java.lang.Thread.run(Thread.java:745)
>> >> 14/07/31 20:10:42 ERROR TaskSchedulerImpl: Lost executor 19 on
>> >> ip-10-13-142-142.ec2.internal: OutOfMemoryError
>> >>
>> >> So I guess I need to fiddle with some memory configs? I’m surprised
>> that
>> >> just checking input line length could trigger this.
>> >>
>> >> Nick
>> >>
>> >>
>> >>
>> >> On Wed, Jul 30, 2014 at 8:58 PM, Davies Liu <davies@databricks.com>
>> wrote:
>> >>>
>> >>> The exception in Python means that the worker try to read command from
>> >>> JVM, but it reach
>> >>> the end of socket (socket had been closed). So it's possible that
>> >>> there another exception
>> >>> happened in JVM.
>> >>>
>> >>> Could you change the log level of log4j, then check is there any
>> >>> problem inside JVM?
>> >>>
>> >>> Davies
>> >>>
>> >>> On Wed, Jul 30, 2014 at 9:12 AM, Nicholas Chammas
>> >>> <nicholas.chammas@gmail.com> wrote:
>> >>> > Any clues? This looks like a bug, but I can't report it without
more
>> >>> > precise
>> >>> > information.
>> >>> >
>> >>> >
>> >>> > On Tue, Jul 29, 2014 at 9:56 PM, Nick Chammas
>> >>> > <nicholas.chammas@gmail.com>
>> >>> > wrote:
>> >>> >>
>> >>> >> I’m in the PySpark shell and I’m trying to do this:
>> >>> >>
>> >>> >> a =
>> >>> >>
>> >>> >>
>> sc.textFile('s3n://path-to-handful-of-very-large-files-totalling-1tb/*.json',
>> >>> >> minPartitions=sc.defaultParallelism * 3).cache()
>> >>> >> a.map(lambda x: len(x)).max()
>> >>> >>
>> >>> >> My job dies with the following:
>> >>> >>
>> >>> >> 14/07/30 01:46:28 WARN TaskSetManager: Loss was due to
>> >>> >> org.apache.spark.api.python.PythonException
>> >>> >> org.apache.spark.api.python.PythonException: Traceback (most
recent
>> >>> >> call
>> >>> >> last):
>> >>> >>   File "/root/spark/python/pyspark/worker.py", line 73, in
main
>> >>> >>     command = pickleSer._read_with_length(infile)
>> >>> >>   File "/root/spark/python/pyspark/serializers.py", line 142,
in
>> >>> >> _read_with_length
>> >>> >>     length = read_int(stream)
>> >>> >>   File "/root/spark/python/pyspark/serializers.py", line 337,
in
>> >>> >> read_int
>> >>> >>     raise EOFError
>> >>> >> EOFError
>> >>> >>
>> >>> >>     at
>> >>> >>
>> >>> >>
>> org.apache.spark.api.python.PythonRDD$anon$1.read(PythonRDD.scala:115)
>> >>> >>     at
>> >>> >>
>> >>> >>
>> org.apache.spark.api.python.PythonRDD$anon$1.<init>(PythonRDD.scala:145)
>>
>> >>> >>     at
>> >>> >> org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78)
>> >>> >>     at
>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>> >>> >>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>> >>> >>     at
>> >>> >> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
>> >>> >>     at org.apache.spark.scheduler.Task.run(Task.scala:51)
>> >>> >>     at
>> >>> >>
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
>> >>> >>     at
>> >>> >>
>> >>> >>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> >>> >>     at
>> >>> >>
>> >>> >>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> >>> >>     at java.lang.Thread.run(Thread.java:745)
>> >>> >> 14/07/30 01:46:29 ERROR TaskSchedulerImpl: Lost executor 19
on
>> >>> >> ip-10-190-171-217.ec2.internal: remote Akka client disassociated
>> >>> >>
>> >>> >> How do I debug this? I’m using 1.0.2-rc1 deployed to EC2.
>> >>> >>
>> >>> >> Nick
>> >>> >>
>> >>> >>
>> >>> >> ________________________________
>> >>> >> View this message in context: How do you debug a PythonException?
>> >>> >> Sent from the Apache Spark User List mailing list archive at
>> >>> >> Nabble.com.
>> >>> >
>> >>> >
>> >>
>> >>
>> >
>>
> ​
>
​

Mime
View raw message