spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Reynold Xin <r...@databricks.com>
Subject Re: Problems while moving from 0.8.0 to 0.8.1
Date Mon, 27 Jan 2014 17:24:16 GMT
Do you mind pasting the whole stack trace for the NPE?



On Mon, Jan 27, 2014 at 6:44 AM, Archit Thakur <archit279thakur@gmail.com>wrote:

> Hi,
>
> Implementation of aggregation logic has been changed with 0.8.1
> (Aggregator.scala)
>
> It is now using AppendOnlyMap as compared to java.util.HashMap in 0.8.0
> release.
>
> Aggregator.scala
> def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K,
> C)] = {
>     val combiners = new AppendOnlyMap[K, C]
>     var kv: Product2[K, V] = null
>     val update = (hadValue: Boolean, oldValue: C) => {
>       if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
>     }
>     while (iter.hasNext) {
>       kv = iter.next()
>       combiners.changeValue(kv._1, update)
>     }
>     combiners.iterator
>   }
>
> I am facing problem that in changeValue function of AppendOnlyMap, it
> computes,
> val curKey = data(2 * pos)
> which is coming as null and eventually giving NPE.
>
> AppendOnlyMap.scala
> def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
>     val k = key.asInstanceOf[AnyRef]
>     if (k.eq(null)) {
>       if (!haveNullValue) {
>         incrementSize()
>       }
>       nullValue = updateFunc(haveNullValue, nullValue)
>       haveNullValue = true
>       return nullValue
>     }
>     var pos = rehash(k.hashCode) & mask
>     var i = 1
>     while (true) {
>       val curKey = data(2 * pos)
>       if (k.eq(curKey) || k.equals(curKey)) {
>         val newValue = updateFunc(true, data(2 * pos + 1).asInstanceOf[V])
>         data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
>         return newValue
>       } else if (curKey.eq(null)) {
>         val newValue = updateFunc(false, null.asInstanceOf[V])
>         data(2 * pos) = k
>         data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
>         incrementSize()
>         return newValue
>       } else {
>         val delta = i
>         pos = (pos + delta) & mask
>         i += 1
>       }
>     }
>     null.asInstanceOf[V] // Never reached but needed to keep compiler happy
>   }
>
>
> Other info:
> 1. My code works fine with 0.8.0.
> 2. I used groupByKey transformation.
> 3. I replaces the Aggregator.scala with the older version(0.8.0), compiled
> it, Restarted Master and Worker, It ran successfully.
>
> Thanks and Regards,
> Archit Thakur.
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message