spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Archit Thakur <archit279tha...@gmail.com>
Subject Re: Problems while moving from 0.8.0 to 0.8.1
Date Mon, 27 Jan 2014 19:34:59 GMT
ERROR executor.Executor: Exception in task ID 20
java.lang.NullPointerException
        at
com.xyz.spark.common.collection.MyCustomKeyType.equals(MyCustomKeyType.java:200)
        at
org.apache.spark.util.AppendOnlyMap.changeValue(AppendOnlyMap.scala:122)
        at
org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:42)
        at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$3.apply(PairRDDFunctions.scala:103)
        at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$3.apply(PairRDDFunctions.scala:102)
        at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:465)
        at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:465)
        at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
        at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
        at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:107)
        at org.apache.spark.scheduler.Task.run(Task.scala:53)
        at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:215)
        at
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:50)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182)
        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown
Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
Source)
        at java.lang.Thread.run(Unknown Source)



On Mon, Jan 27, 2014 at 10:54 PM, Reynold Xin <rxin@databricks.com> wrote:

> Do you mind pasting the whole stack trace for the NPE?
>
>
>
> On Mon, Jan 27, 2014 at 6:44 AM, Archit Thakur <archit279thakur@gmail.com
> >wrote:
>
> > Hi,
> >
> > Implementation of aggregation logic has been changed with 0.8.1
> > (Aggregator.scala)
> >
> > It is now using AppendOnlyMap as compared to java.util.HashMap in 0.8.0
> > release.
> >
> > Aggregator.scala
> > def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) :
> Iterator[(K,
> > C)] = {
> >     val combiners = new AppendOnlyMap[K, C]
> >     var kv: Product2[K, V] = null
> >     val update = (hadValue: Boolean, oldValue: C) => {
> >       if (hadValue) mergeValue(oldValue, kv._2) else
> createCombiner(kv._2)
> >     }
> >     while (iter.hasNext) {
> >       kv = iter.next()
> >       combiners.changeValue(kv._1, update)
> >     }
> >     combiners.iterator
> >   }
> >
> > I am facing problem that in changeValue function of AppendOnlyMap, it
> > computes,
> > val curKey = data(2 * pos)
> > which is coming as null and eventually giving NPE.
> >
> > AppendOnlyMap.scala
> > def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
> >     val k = key.asInstanceOf[AnyRef]
> >     if (k.eq(null)) {
> >       if (!haveNullValue) {
> >         incrementSize()
> >       }
> >       nullValue = updateFunc(haveNullValue, nullValue)
> >       haveNullValue = true
> >       return nullValue
> >     }
> >     var pos = rehash(k.hashCode) & mask
> >     var i = 1
> >     while (true) {
> >       val curKey = data(2 * pos)
> >       if (k.eq(curKey) || k.equals(curKey)) {
> >         val newValue = updateFunc(true, data(2 * pos +
> 1).asInstanceOf[V])
> >         data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
> >         return newValue
> >       } else if (curKey.eq(null)) {
> >         val newValue = updateFunc(false, null.asInstanceOf[V])
> >         data(2 * pos) = k
> >         data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
> >         incrementSize()
> >         return newValue
> >       } else {
> >         val delta = i
> >         pos = (pos + delta) & mask
> >         i += 1
> >       }
> >     }
> >     null.asInstanceOf[V] // Never reached but needed to keep compiler
> happy
> >   }
> >
> >
> > Other info:
> > 1. My code works fine with 0.8.0.
> > 2. I used groupByKey transformation.
> > 3. I replaces the Aggregator.scala with the older version(0.8.0),
> compiled
> > it, Restarted Master and Worker, It ran successfully.
> >
> > Thanks and Regards,
> > Archit Thakur.
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message