spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sean Owen <so...@cloudera.com>
Subject Re: Spark shell and StackOverFlowError
Date Mon, 31 Aug 2015 14:41:00 GMT
It's not clear; that error is different still and somehow suggests
you're serializing a stream somewhere. I'd look at what's inside
bcItemsIdx as that is not shown here.

On Mon, Aug 31, 2015 at 3:34 PM, Ashish Shrowty
<ashish.shrowty@gmail.com> wrote:
> Sean,
>
> Thanks for your comments. What I was really trying to do was to transform a
> RDD[(userid,itemid,ratings)] into a RowMatrix so that I can do some column
> similarity calculations while exploring the data before building some
> models. But to do that I need to first convert the user and item ids into
> respective indexes where I intended on passing in an array into the closure,
> which is where I got stuck with this overflowerror trying to figure out
> where it is happening. The actual error I got was slightly different (Caused
> by: java.io.NotSerializableException: java.io.ObjectInputStream). I started
> investigating this issue which led me to the earlier code snippet that I had
> posted. This is again because of the bcItemsIdx variable being passed into
> the closure. Below code works if I don't pass in the variable and use simply
> a constant like 10 in its place .. The code thus far -
>
> // rdd below is RDD[(String,String,Double)]
> // bcItemsIdx below is Broadcast[Array[String]] which is an array of item
> ids
> val gRdd = rdd.map{case(user,item,rating) =>
> ((user),(item,rating))}.groupByKey
> val idxRdd = gRdd.zipWithIndex
> val cm = new CoordinateMatrix(
>     idxRdd.flatMap[MatrixEntry](e => {
>         e._1._2.map(item=> {
>                  MatrixEntry(e._2, bcItemsIdx.value.indexOf(item._1),
> item._2) // <- This is where I get the Serialization error passing in the
> index
>                  // MatrixEntry(e._2, 10, item._2) // <- This works
>         })
>     })
> )
> val rm = cm.toRowMatrix
> val simMatrix = rm.columnSimilarities()
>
> I would like to make this work in the Spark shell as I am still exploring
> the data. Let me know if there is an alternate way of constructing the
> RowMatrix.
>
> Thanks and appreciate all the help!
>
> Ashish
>
> On Mon, Aug 31, 2015 at 3:41 AM Sean Owen <sowen@cloudera.com> wrote:
>>
>> Yeah I see that now. I think it fails immediately because the map
>> operation does try to clean and/or verify the serialization of the
>> closure upfront.
>>
>> I'm not quite sure what is going on, but I think it's some strange
>> interaction between how you're building up the list and what the
>> resulting representation happens to be like, and how the closure
>> cleaner works, which can't be perfect. The shell also introduces an
>> extra layer of issues.
>>
>> For example, the slightly more canonical approaches work fine:
>>
>> import scala.collection.mutable.MutableList
>> val lst = MutableList[(String,String,Double)]()
>> (0 to 10000).foreach(i => lst :+ ("10", "10", i.toDouble))
>>
>> or just
>>
>> val lst = (0 to 10000).map(i => ("10", "10", i.toDouble))
>>
>> If you just need this to work, maybe those are better alternatives anyway.
>> You can also check whether it works without the shell, as I suspect
>> that's a factor.
>>
>> It's not an error in Spark per se but saying that something's default
>> Java serialization graph is very deep, so it's like the code you wrote
>> plus the closure cleaner ends up pulling in some huge linked list and
>> serializing it the direct and unuseful way.
>>
>> If you have an idea about exactly why it's happening you can open a
>> JIRA, but arguably it's something that's nice to just work but isn't
>> to do with Spark per se. Or, have a look at others related to the
>> closure and shell and you may find this is related to other known
>> behavior.
>>
>>
>> On Sun, Aug 30, 2015 at 8:08 PM, Ashish Shrowty
>> <ashish.shrowty@gmail.com> wrote:
>> > Sean .. does the code below work for you in the Spark shell? Ted got the
>> > same error -
>> >
>> > val a=10
>> > val lst = MutableList[(String,String,Double)]()
>> > Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
>> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>> >
>> > -Ashish
>> >
>> >
>> > On Sun, Aug 30, 2015 at 2:52 PM Sean Owen <sowen@cloudera.com> wrote:
>> >>
>> >> I'm not sure how to reproduce it? this code does not produce an error
>> >> in
>> >> master.
>> >>
>> >> On Sun, Aug 30, 2015 at 7:26 PM, Ashish Shrowty
>> >> <ashish.shrowty@gmail.com> wrote:
>> >> > Do you think I should create a JIRA?
>> >> >
>> >> >
>> >> > On Sun, Aug 30, 2015 at 12:56 PM Ted Yu <yuzhihong@gmail.com>
wrote:
>> >> >>
>> >> >> I got StackOverFlowError as well :-(
>> >> >>
>> >> >> On Sun, Aug 30, 2015 at 9:47 AM, Ashish Shrowty
>> >> >> <ashish.shrowty@gmail.com>
>> >> >> wrote:
>> >> >>>
>> >> >>> Yep .. I tried that too earlier. Doesn't make a difference.
Are you
>> >> >>> able
>> >> >>> to replicate on your side?
>> >> >>>
>> >> >>>
>> >> >>> On Sun, Aug 30, 2015 at 12:08 PM Ted Yu <yuzhihong@gmail.com>
>> >> >>> wrote:
>> >> >>>>
>> >> >>>> I see.
>> >> >>>>
>> >> >>>> What about using the following in place of variable a ?
>> >> >>>>
>> >> >>>>
>> >> >>>>
>> >> >>>> http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables
>> >> >>>>
>> >> >>>> Cheers
>> >> >>>>
>> >> >>>> On Sun, Aug 30, 2015 at 8:54 AM, Ashish Shrowty
>> >> >>>> <ashish.shrowty@gmail.com> wrote:
>> >> >>>>>
>> >> >>>>> @Sean - Agree that there is no action, but I still
get the
>> >> >>>>> stackoverflowerror, its very weird
>> >> >>>>>
>> >> >>>>> @Ted - Variable a is just an int - val a = 10 ... The
error
>> >> >>>>> happens
>> >> >>>>> when I try to pass a variable into the closure. The
example you
>> >> >>>>> have
>> >> >>>>> above
>> >> >>>>> works fine since there is no variable being passed
into the
>> >> >>>>> closure
>> >> >>>>> from the
>> >> >>>>> shell.
>> >> >>>>>
>> >> >>>>> -Ashish
>> >> >>>>>
>> >> >>>>> On Sun, Aug 30, 2015 at 9:55 AM Ted Yu <yuzhihong@gmail.com>
>> >> >>>>> wrote:
>> >> >>>>>>
>> >> >>>>>> Using Spark shell :
>> >> >>>>>>
>> >> >>>>>> scala> import scala.collection.mutable.MutableList
>> >> >>>>>> import scala.collection.mutable.MutableList
>> >> >>>>>>
>> >> >>>>>> scala> val lst = MutableList[(String,String,Double)]()
>> >> >>>>>> lst: scala.collection.mutable.MutableList[(String,
String,
>> >> >>>>>> Double)]
>> >> >>>>>> =
>> >> >>>>>> MutableList()
>> >> >>>>>>
>> >> >>>>>> scala> Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
>> >> >>>>>>
>> >> >>>>>> scala> val rdd=sc.makeRDD(lst).map(i=> if(a==10)
1 else 0)
>> >> >>>>>> <console>:27: error: not found: value a
>> >> >>>>>>        val rdd=sc.makeRDD(lst).map(i=> if(a==10)
1 else 0)
>> >> >>>>>>                                           ^
>> >> >>>>>>
>> >> >>>>>> scala> val rdd=sc.makeRDD(lst).map(i=> if(i._1==10)
1 else 0)
>> >> >>>>>> rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1]
at map
>> >> >>>>>> at
>> >> >>>>>> <console>:27
>> >> >>>>>>
>> >> >>>>>> scala> rdd.count()
>> >> >>>>>> ...
>> >> >>>>>> 15/08/30 06:53:40 INFO DAGScheduler: Job 0 finished:
count at
>> >> >>>>>> <console>:30, took 0.478350 s
>> >> >>>>>> res1: Long = 10000
>> >> >>>>>>
>> >> >>>>>> Ashish:
>> >> >>>>>> Please refine your example to mimic more closely
what your code
>> >> >>>>>> actually did.
>> >> >>>>>>
>> >> >>>>>> Thanks
>> >> >>>>>>
>> >> >>>>>> On Sun, Aug 30, 2015 at 12:24 AM, Sean Owen <sowen@cloudera.com>
>> >> >>>>>> wrote:
>> >> >>>>>>>
>> >> >>>>>>> That can't cause any error, since there is
no action in your
>> >> >>>>>>> first
>> >> >>>>>>> snippet. Even calling count on the result doesn't
cause an
>> >> >>>>>>> error.
>> >> >>>>>>> You
>> >> >>>>>>> must be executing something different.
>> >> >>>>>>>
>> >> >>>>>>> On Sun, Aug 30, 2015 at 4:21 AM, ashrowty
>> >> >>>>>>> <ashish.shrowty@gmail.com>
>> >> >>>>>>> wrote:
>> >> >>>>>>> > I am running the Spark shell (1.2.1) in
local mode and I have
>> >> >>>>>>> > a
>> >> >>>>>>> > simple
>> >> >>>>>>> > RDD[(String,String,Double)] with about
10,000 objects in it.
>> >> >>>>>>> > I
>> >> >>>>>>> > get
>> >> >>>>>>> > a
>> >> >>>>>>> > StackOverFlowError each time I try to
run the following code
>> >> >>>>>>> > (the
>> >> >>>>>>> > code
>> >> >>>>>>> > itself is just representative of other
logic where I need to
>> >> >>>>>>> > pass
>> >> >>>>>>> > in a
>> >> >>>>>>> > variable). I tried broadcasting the variable
too, but no luck
>> >> >>>>>>> > ..
>> >> >>>>>>> > missing
>> >> >>>>>>> > something basic here -
>> >> >>>>>>> >
>> >> >>>>>>> > val rdd = sc.makeRDD(List(<Data read
from file>)
>> >> >>>>>>> > val a=10
>> >> >>>>>>> > rdd.map(r => if (a==10) 1 else 0)
>> >> >>>>>>> > This throws -
>> >> >>>>>>> >
>> >> >>>>>>> > java.lang.StackOverflowError
>> >> >>>>>>> >     at
>> >> >>>>>>> > java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:318)
>> >> >>>>>>> >     at
>> >> >>>>>>> >
>> >> >>>>>>> >
>> >> >>>>>>> > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1133)
>> >> >>>>>>> >     at
>> >> >>>>>>> >
>> >> >>>>>>> >
>> >> >>>>>>> >
>> >> >>>>>>> > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>> >> >>>>>>> >     at
>> >> >>>>>>> >
>> >> >>>>>>> >
>> >> >>>>>>> >
>> >> >>>>>>> > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>> >> >>>>>>> >     at
>> >> >>>>>>> >
>> >> >>>>>>> >
>> >> >>>>>>> >
>> >> >>>>>>> > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>> >> >>>>>>> >     at
>> >> >>>>>>> >
>> >> >>>>>>> >
>> >> >>>>>>> > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>> >> >>>>>>> >     at
>> >> >>>>>>> >
>> >> >>>>>>> >
>> >> >>>>>>> >
>> >> >>>>>>> > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>> >> >>>>>>> >     at
>> >> >>>>>>> >
>> >> >>>>>>> >
>> >> >>>>>>> >
>> >> >>>>>>> > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>> >> >>>>>>> >     at
>> >> >>>>>>> >
>> >> >>>>>>> >
>> >> >>>>>>> >
>> >> >>>>>>> > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>> >> >>>>>>> > ...
>> >> >>>>>>> > ...
>> >> >>>>>>> >
>> >> >>>>>>> > More experiments  .. this works -
>> >> >>>>>>> >
>> >> >>>>>>> > val lst = Range(0,10000).map(i=>("10","10",i:Double)).toList
>> >> >>>>>>> > sc.makeRDD(lst).map(i=> if(a==10) 1
else 0)
>> >> >>>>>>> >
>> >> >>>>>>> > But below doesn't and throws the StackoverflowError
-
>> >> >>>>>>> >
>> >> >>>>>>> > val lst = MutableList[(String,String,Double)]()
>> >> >>>>>>> > Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
>> >> >>>>>>> > sc.makeRDD(lst).map(i=> if(a==10) 1
else 0)
>> >> >>>>>>> >
>> >> >>>>>>> > Any help appreciated!
>> >> >>>>>>> >
>> >> >>>>>>> > Thanks,
>> >> >>>>>>> > Ashish
>> >> >>>>>>> >
>> >> >>>>>>> >
>> >> >>>>>>> >
>> >> >>>>>>> > --
>> >> >>>>>>> > View this message in context:
>> >> >>>>>>> >
>> >> >>>>>>> >
>> >> >>>>>>> > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-shell-and-StackOverFlowError-tp24508.html
>> >> >>>>>>> > Sent from the Apache Spark User List mailing
list archive at
>> >> >>>>>>> > Nabble.com.
>> >> >>>>>>> >
>> >> >>>>>>> >
>> >> >>>>>>> >
>> >> >>>>>>> >
>> >> >>>>>>> > ---------------------------------------------------------------------
>> >> >>>>>>> > To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> >> >>>>>>> > For additional commands, e-mail: user-help@spark.apache.org
>> >> >>>>>>> >
>> >> >>>>>>>
>> >> >>>>>>>
>> >> >>>>>>>
>> >> >>>>>>> ---------------------------------------------------------------------
>> >> >>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> >> >>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>> >> >>>>>>>
>> >> >>>>>>
>> >> >>>>
>> >> >>
>> >> >

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message