spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ewen Cheslack-Postava ...@ewencp.org>
Subject Re: newbie : java.lang.OutOfMemoryError: Java heap space
Date Thu, 09 Jan 2014 01:44:28 GMT
 From the stack trace, it looks like the driver program is dying trying 
to serialize data out to the workers. My guess is that whatever machine 
you're running from has a relatively small default maximum heap size and 
trying to broadcast the 49MB file is causing it to run out of memory. I 
don't know if Spark avoids reserializing broadcast data for each worker 
or not, but if it didn't, that would make the explanation even more 
plausible as you might have many copies of it in memory at once.

Are you using the java -Xmx flag to set the JVM's max heap space? It 
seems that you're using spark-shell, for which I think you can still use 
the SPARK_MEM environment variable.

(You could also do the join() through Spark instead of doing it manually 
by broadcasting half the data. This would keep the "large" 49MB data set 
out of the driver and probably avoid any memory issues there. But it 
looks like your approach could be better if you want to avoid shuffling 
the 17GB data set and this is the entirety of your analysis task.)

-Ewen

> Vipul Pandey <mailto:vipandey@gmail.com>
> January 8, 2014 3:54 PM
> Any idea anyone?  This seems like a pretty basic requirement and i'm 
> sure a minor config change might get it to work.
> I'd appreciate any pointers as i'm blocked on this since last night.
>
> btw, spark version is 0.8.0
>
>
>
>
> Prashant Sharma <mailto:scrapcodes@gmail.com>
> January 7, 2014 11:31 PM
> spark version ?
>
>
>
>
>
> -- 
> Prashant
> Vipul Pandey <mailto:vipandey@gmail.com>
> January 7, 2014 11:13 PM
> Hi,
>
> Disclaimer : Newbie (well, a returning user)
>
> Setup :
> 20 nodes
> -Dspark.executor.memory=40g  , essentially tons of space for my usecase
>
>
> Pretty straight forward join between two inputs
> - 17G (distributed in 10 equally sized - 1.7g files)
> - 49Mb (1 file)
> I just need to join based on the keys and write out values from both 
> as tuples
> ==================================================================
>
>     val XA = sc.textFile("path-to-17GB") //assume this to be a tab 
> separated key value pair
>     val XAMap = XA.map(x => {
>       val arr = x.split("\t")
>       (arr(0),arr(1))
>     })
>
>
>     val XY = sc.textFile("pathTo49MB")  // assume it to be a tab 
> separated key value pair
>     val XYMap = XY.map(x => {
>      val arr  = x.split("\t")
>       (arr(0), arr(1))
>     }).collect.toMap
>
>     val bc = sc.broadcast(XYMap)
>
>     val joined =  XAMap.map(v => {
>       (bc.value(v._1),v._2)
>     })
>
>     joined.saveAsTextFile("path-to-output")
> ==================================================================
>
> When i try to save the text file it throws a OOME and my shell quits.
> any clues?
>
> scala>     joined.saveAsTextFile("path-to-output")
> Uncaught error from thread [spark-akka.actor.default-dispatcher-4] 
> shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for 
> ActorSystem[spark]
> java.lang.OutOfMemoryError: Java heap space
> at java.util.Arrays.copyOf(Arrays.java:2786)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:94)
> at 
> java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1823)
> at java.io.ObjectOutputStream.write(ObjectOutputStream.java:670)
> at 
> org.apache.spark.scheduler.ResultTask.writeExternal(ResultTask.scala:116)
> at 
> java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1429)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1398)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:330)
> at 
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:27)
> at 
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:47)
> at 
> org.apache.spark.scheduler.Task$.serializeWithDependencies(Task.scala:78)
> at 
> org.apache.spark.scheduler.cluster.ClusterTaskSetManager.resourceOffer(ClusterTaskSetManager.scala:375)
> at 
> org.apache.spark.scheduler.cluster.ClusterScheduler$$anonfun$resourceOffers$3$$anonfun$apply$3$$anonfun$apply$1.apply$mcVI$sp(ClusterScheduler.scala:215)
> at 
> org.apache.spark.scheduler.cluster.ClusterScheduler$$anonfun$resourceOffers$3$$anonfun$apply$3.apply(ClusterScheduler.scala:212)
> at 
> org.apache.spark.scheduler.cluster.ClusterScheduler$$anonfun$resourceOffers$3$$anonfun$apply$3.apply(ClusterScheduler.scala:209)
> at scala.collection.Iterator$class.foreach(Iterator.scala:772)
> at scala.collection.Iterator$$anon$19.foreach(Iterator.scala:399)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
> at scala.Enumeration$ValueSet.foreach(Enumeration.scala:234)
> at 
> org.apache.spark.scheduler.cluster.ClusterScheduler$$anonfun$resourceOffers$3.apply(ClusterScheduler.scala:209)
> at 
> org.apache.spark.scheduler.cluster.ClusterScheduler$$anonfun$resourceOffers$3.apply(ClusterScheduler.scala:209)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at 
> org.apache.spark.scheduler.cluster.ClusterScheduler.resourceOffers(ClusterScheduler.scala:209)
> at 
> org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend$DriverActor.makeOffers(StandaloneSchedulerBackend.scala:113)
> at 
> org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend$DriverActor$$anonfun$receive$1.apply(StandaloneSchedulerBackend.scala:91)
> at 
> org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend$DriverActor$$anonfun$receive$1.apply(StandaloneSchedulerBackend.scala:64)
> at akka.actor.Actor$class.apply(Actor.scala:318)
> at 
> org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend$DriverActor.apply(StandaloneSchedulerBackend.scala:47)
> at akka.actor.ActorCell.invoke(ActorCell.scala:626)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:197)
>

Mime
View raw message