I’ve found my problem.

 

I’ve got a DAG with two consecutive “updateStateByKey” functions .

When I only process (map/foreachRDD/JavaEsSpark) the state of the last “updateStateByKey” function, I get an stackoverflow after a while (too long linage).

 

But when I also do some processing (foreachRDD/rdd.take) on the first “updatestatebykey”, then there is no problem.

 

Does this make sense? Probably the “long linage” problem.

 

But why should I have such a “linage problem” when Sparks claims to be a “abstract/high level” architecture? Why should I be worried about “long linage”? Its seems a contradiction with the abstract/high level (functional programming) approach when I have to know/consider how Spark doest it.

 

 

 

Van: Rishabh Wadhawan [mailto:rishabhwad@gmail.com]
Verzonden: donderdag 2 juni 2016 06:06
Aan: Yash Sharma <yash360@gmail.com>
CC: Ted Yu <yuzhihong@gmail.com>; Matthew Young <taigetco@gmail.com>; Michel Hubert <michelh@phact.nl>; user@spark.apache.org
Onderwerp: Re: StackOverflow in Spark

 

Stackoverflow is generated when DAG is too log as there are many transformations in lot of iterations. Please use checkpointing to store the DAG and break the linage to get away from this stack overflow error. Look into checkpoint fuction.

Thanks

Hope it helps. Let me know if you need anymore help.

On Jun 1, 2016, at 8:18 PM, Yash Sharma <yash360@gmail.com> wrote:

 

Not sure if its related, But I got a similar stack overflow error some time back while reading files and converting them to parquet.

 

 


Stack trace-
16/06/02 02:23:54 INFO YarnAllocator: Driver requested a total number of 32769 executor(s).
16/06/02 02:23:54 INFO ExecutorAllocationManager: Requesting 16384 new executors because tasks are backlogged (new desired total will be 32769)
16/06/02 02:23:54 INFO YarnAllocator: Will request 24576 executor containers, each with 5 cores and 22528 MB memory including 2048 MB overhead
16/06/02 02:23:55 WARN ApplicationMaster: Reporter thread fails 2 time(s) in a row.
        at scala.collection.mutable.MapBuilder.$plus$eq(MapBuilder.scala:28)
        at scala.collection.mutable.MapBuilder.$plus$eq(MapBuilder.scala:24)
        at scala.collection.TraversableLike$$anonfun$filter$1.apply(TraversableLike.scala:264)
        at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
        at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
        at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)

java.lang.StackOverflowError

 

 

On Thu, Jun 2, 2016 at 12:58 PM, Ted Yu <yuzhihong@gmail.com> wrote:

Looking at Michel's stack trace, it seems to be different issue. 


On Jun 1, 2016, at 7:45 PM, Matthew Young <taigetco@gmail.com> wrote:

Hi,

 

It's related to the one fixed bug in Spark, jira ticket SPARK-6847

 

Matthew Yang

 

On Wed, May 25, 2016 at 7:48 PM, Michel Hubert <michelh@phact.nl> wrote:

 

Hi,

 

 

I have an Spark application which generates StackOverflowError exceptions after 30+ min.

 

Anyone any ideas?

 

 

 

 

 

 

16/05/25 10:48:51 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 55449.0 (TID 5584, host81440-cld.opentsp.com): java.lang.StackOverflowError

·at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1382)

·at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)

·at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)

·at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)

·at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

·at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)

·at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)

·at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)

·at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

·at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)

·at scala.collection.immutable.$colon$colon.readObject(List.scala:362)

·at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)

·at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

·at java.lang.reflect.Method.invoke(Method.java:606)

·at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)

·at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)

·at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)

·at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

·at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)

·at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)

·at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)

·at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

·at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)

·at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)

·at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)

·at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

·at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)

·at scala.collection.immutable.$colon$colon.readObject(List.scala:362)

                at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)

 

 

 

 

Driver stacktrace:

·at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)

·at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)

·at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)

·at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

·at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

·at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)

·at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)

·at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)

·at scala.Option.foreach(Option.scala:236)

·at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)

·at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)

·at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)

·at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)

·at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

·at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)

·at org.apache.spark.SparkContext.runJob(SparkContext.scala:1843)

·at org.apache.spark.SparkContext.runJob(SparkContext.scala:1856)

·at org.apache.spark.SparkContext.runJob(SparkContext.scala:1933)

·at org.elasticsearch.spark.rdd.EsSpark$.saveToEs(EsSpark.scala:67)

·at org.elasticsearch.spark.rdd.EsSpark$.saveToEs(EsSpark.scala:54)

·at org.elasticsearch.spark.rdd.EsSpark$.saveJsonToEs(EsSpark.scala:90)

·at org.elasticsearch.spark.rdd.api.java.JavaEsSpark$.saveJsonToEs(JavaEsSpark.scala:62)

                at org.elasticsearch.spark.rdd.api.java.JavaEsSpark.saveJsonToEs(JavaEsSpark.scala)



---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org