spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From learner1014 all <learner1...@gmail.com>
Subject Re: Spark heap issues
Date Fri, 06 Dec 2013 18:35:01 GMT
Btw the node only has 4GB memory so does the spark.executor.memory make
sense...
Should i instead make it around 2-3GB. ALso how different is this parameter
from SPARK_MEM

Thanks,
Saurabh


On Fri, Dec 6, 2013 at 8:26 AM, learner1014 all <learner1014@gmail.com>wrote:

> Still see a whole lot of following erros
> java.lang.OutOfMemoryError: Java heap space
> 13/12/05 16:04:13 INFO executor.StandaloneExecutorBackend: Got assigned
> task 553
> 13/12/05 16:04:13 INFO executor.Executor: Running task ID 553
>
> Issue seems to be that the process hangs as we are probably performing
> full GC cycles...
> 1536.617: [Full GC 1536.617: [CMS: 707839K->707839K(707840K), 5.0507000
> secs] 1014527K->1014527K(1014528K), [CMS Perm : 31955K->31955K(53572K)],
> 5.0507940 secs] [Times: user=4.94 sys=0.00, real=5.05 secs]
> 1541.669: [Full GC 1541.669: [CMS: 707840K->707839K(707840K), 4.5483600
> secs] 1014527K->1014527K(1014528K), [CMS Perm : 31955K->31955K(53572K)],
> 4.5484390 secs] [Times: user=4.47 sys=0.00, real=4.55 secs]
> 1546.218: [Full GC 1546.218: [CMS: 707839K->707839K(707840K), 4.5937460
> secs] 1014527K->1014527K(1014528K), [CMS Perm : 31955K->31955K(53572K)],
> 4.5938460 secs] [Times: user=4.59 sys=0.00, real=4.60 secs]
> 1550.812: [Full GC 1550.812: [CMS: 707839K->707839K(707840K), 5.3572370
> secs] 1014527K->1014527K(1014528K), [CMS Perm : 31955K->31955K(53572K)],
> 5.3573840 secs] [Times: user=5.26 sys=0.01, real=5.35 secs]
> 1556.171: [Full GC 1556.171: [CMS: 707840K->694574K(707840K), 4.1462520
> secs] 1014528K->860511K(1014528K), [CMS Perm : 31955K->31955K(53572K)],
> 4.1463350 secs] [Times: user=4.13 sys=0.00, real=4.15 secs]
> 1560.329: [GC [1 CMS-initial-mark: 694574K(707840K)] 874378K(1014528K),
> 0.4269160 secs] [Times: user=0.41 sys=0.00, real=0.43 secs]
>
>
> I tried the following parameters and they do not seem to help
>
>       System.setProperty("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
>       System.setProperty("spark.akka.timeout", "30")  //in seconds
>
>       System.setProperty("spark.executor.memory","15g")
>       System.setProperty("spark.akka.frameSize", "2000")  //in MB
>       System.setProperty("spark.akka.threads","8")
>
> Thanks
>
>
> On Thu, Dec 5, 2013 at 11:31 PM, purav aggarwal <
> puravaggarwal123@gmail.com> wrote:
>
>> Try allocating some more resources to your application.
>> You seem to be using 512Mb for you worker node - (you can verify that
>> from the master UI)
>>
>> Try putting the following settings into your code and see if it helps -
>>
>> System.setProperty("spark.executor.memory","15g")   // Will allocate more
>> memory
>> System.setProperty("spark.akka.frameSize","2000")
>> System.setProperty("spark.akka.threads","16")           // Dependent upon
>> number of cores with your worker machine
>>
>>
>> On Fri, Dec 6, 2013 at 1:06 AM, learner1014 all <learner1014@gmail.com>wrote:
>>
>>> Hi,
>>>
>>> Trying to do a join operation on an RDD, my input is pipe delimited data
>>> and there are 2 files.
>>> One file is 24MB and the other file is 285MB.
>>> Setup being used is the single node (server) setup: SPARK_MEM set to 512m
>>>
>>> Master
>>> /pkg/java/jdk1.7.0_11/bin/java -cp
>>> :/spark-0.8.0-incubating-bin-cdh4/conf:/spark-0.8.0-incubating-bin-cdh4/assembly/target/scala-2.9.3/spark-assembly-0.8.0-incubating-hadoop1.2.1.jar
>>> -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
>>> -Dspark.boundedMemoryCache.memoryFraction=0.4
>>> -Dspark.cache.class=spark.DiskSpillingCache -XX:+UseConcMarkSweepGC
>>> -Djava.library.path= -Xms512m -Xmx512m
>>> org.apache.spark.deploy.master.Master --ip localhost --port 7077
>>> --webui-port 8080
>>>
>>> Worker
>>> /pkg/java/jdk1.7.0_11/bin/java -cp
>>> :/spark-0.8.0-incubating-bin-cdh4/conf:/spark-0.8.0-incubating-bin-cdh4/assembly/target/scala-2.9.3/spark-assembly-0.8.0-incubating-hadoop1.2.1.jar
>>> -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
>>> -Dspark.boundedMemoryCache.memoryFraction=0.4
>>> -Dspark.cache.class=spark.DiskSpillingCache -XX:+UseConcMarkSweepGC
>>> -Djava.library.path= -Xms512m -Xmx512m
>>> org.apache.spark.deploy.worker.Worker spark://localhost:7077
>>>
>>>
>>> App
>>> /pkg/java/jdk1.7.0_11/bin/java -cp
>>> :/spark-0.8.0-incubating-bin-cdh4/conf:/spark-0.8.0-incubating-bin-cdh4/assembly/target/scala-2.9.3/spark-assembly-0.8.0-incubating-hadoop1.2.1.jar:/spark-0.8.0-incubating-bin-cdh4/core/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/repl/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/mllib/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/bagel/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/streaming/target/scala-2.9.3/test-classes
>>> -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
>>> -Dspark.boundedMemoryCache.memoryFraction=0.4
>>> -Dspark.cache.class=spark.DiskSpillingCache -XX:+UseConcMarkSweepGC
>>> -Xms512M -Xmx512M org.apache.spark.executor.StandaloneExecutorBackend
>>> akka://spark@localhost:33024/user/StandaloneScheduler 1 localhost 4
>>>
>>>
>>> Here is the code
>>> import org.apache.spark.SparkContext
>>> import org.apache.spark.SparkContext._
>>> import org.apache.spark.storage.StorageLevel
>>>
>>> object SimpleApp {
>>>
>>>       def main (args: Array[String]) {
>>>
>>>
>>> System.setProperty("spark.local.dir","/spark-0.8.0-incubating-bin-cdh4/tmp");
>>>       System.setProperty("spark.serializer",
>>> "org.apache.spark.serializer.KryoSerializer")
>>>       System.setProperty("spark.akka.timeout", "30")  //in seconds
>>>
>>>       val dataFile2 = "/tmp_data/data1.txt"
>>>       val dataFile1 = "/tmp_data/data2.txt"
>>>       val sc = new SparkContext("spark://localhost:7077", "Simple App",
>>> "/spark-0.8.0-incubating-bin-cdh4",
>>>       List("target/scala-2.9.3/simple-project_2.9.3-1.0.jar"))
>>>
>>>       val data10 = sc.textFile(dataFile1, 128)
>>>       val data11 = data10.map(x => x.split("|"))
>>>       val data12 = data11.map( x  =>  (x(1).toInt -> x) )
>>>
>>>
>>>       val data20 = sc.textFile(dataFile2, 128)
>>>       val data21 = data20.map(x => x.split("|"))
>>>       val data22 = data21.map(x => (x(1).toInt -> x))
>>>
>>>
>>>       val data3 = data12.join(data22, 128)
>>>       val data4 = data3.distinct(4)
>>>       val numAs = data10.count()
>>>       val numBs = data20.count()
>>>       val numCs = data3.count()
>>>       val numDs = data4.count()
>>>       println("Lines in 1: %s, Lines in 2: %s Lines in 3: %s Lines in 4:
>>> %s".format(numAs, numBs, numCs, numDs))
>>>       data4.foreach(println)
>>> }
>>>
>>> I see the following errors
>>> 13/12/04 10:53:55 WARN storage.BlockManagerMaster: Error sending message
>>> to BlockManagerMaster in 1 attempts
>>> java.util.concurrent.TimeoutException: Futures timed out after [10000]
>>> milliseconds
>>>         at akka.dispatch.DefaultPromise.ready(Future.scala:870)
>>>         at akka.dispatch.DefaultPromise.result(Future.scala:874)
>>>         at akka.dispatch.Await$.result(Future.scala:74)
>>>
>>> and
>>> 13/12/04 10:53:55 ERROR executor.Executor: Exception in task ID 517
>>> java.lang.OutOfMemoryError: Java heap space
>>>         at com.esotericsoftware.kryo.io.Input.readString(Input.java:448)
>>>         at
>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$StringArraySerializer.read(DefaultArraySerializers.java:282)
>>>         at
>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$StringArraySerializer.read(DefaultArraySerializers.java:262)
>>>         at
>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>>         at
>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43)
>>>         at
>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34)
>>>         at
>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>>         at
>>> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:106)
>>>         at
>>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:101)
>>>         at
>>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>
>>> Lots of hem actually...
>>>
>>>
>>> To give some additional information, i just added single columns in both
>>> files and passed them through this program and encountered the same issue.
>>> Out of memory and other errors.
>>>
>>> What did work was removal of the following lines:
>>>
>>>       val data21 = data20.map(x => x.split("|"))
>>>       val data22 = data21.map(x => (x(1).toInt -> x))
>>>
>>> which were replaced by:
>>>       val data22 = data20.map(x => (x.toInt -> x))
>>>
>>> However as soon as i add additional columns this is of-course not going
>>> to work.
>>> So can someone explain this and any suggestions are most welcome.
>>>  Any help is helpful.
>>> Thanks
>>>
>>
>>
>

Mime
View raw message