spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Anubhav Agarwal <anubha...@gmail.com>
Subject Re: NullPointException Help while using accumulators
Date Mon, 03 Aug 2015 22:41:18 GMT
.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:647)
	at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:647)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
	at org.apache.spark.scheduler.Task.run(Task.scala:64)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

The above is also part of the error.

On Mon, Aug 3, 2015 at 6:40 PM, Ted Yu <yuzhihong@gmail.com> wrote:

> Putting your code in a file I find the following on line 17:
>                 stepAcc = new StepAccumulator();
> However I don't think that was where the NPE was thrown.
>
> Another thing I don't understand was that there were two addAccumulator()
> calls at the top of stack trace while in your code I don't
> see addAccumulator() calling itself.
>
> FYI
>
> On Mon, Aug 3, 2015 at 3:22 PM, Anubhav Agarwal <anubhav33@gmail.com>
> wrote:
>
>> The code was written in 1.4 but I am compiling it and running it with 1.3.
>>
>> import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap;
>> import org.apache.spark.AccumulableParam;
>> import scala.Tuple4;
>> import thomsonreuters.trailblazer.operation.DriverCalc;
>> import thomsonreuters.trailblazer.operation.StepAccumulator;
>>
>> //Tuple4<Allocation StepIndex.IF_Position, DenKey, NumKey, Value> -
>> Allocation Step Add
>>
>> class DriverAccumulator implements
>> AccumulableParam<Object2ObjectOpenHashMap<String, StepAccumulator>,
>> Tuple4<String, String, String, Double>> {
>>     private static final Object _lockObj = new Object();
>>
>>     public Object2ObjectOpenHashMap<String, StepAccumulator>
>> addAccumulator(Object2ObjectOpenHashMap<String, StepAccumulator>
>> stepAccumulatorMap, Tuple4<String, String, String, Double> value) {
>>         if (value == null) return stepAccumulatorMap;
>>         synchronized (_lockObj) {
>>             StepAccumulator stepAcc = stepAccumulatorMap.get(value._1());
>>             if (stepAcc == null) {
>>                 stepAcc = new StepAccumulator();
>>                 stepAccumulatorMap.put(value._1(), stepAcc);
>>             }
>>             DriverCalc dc = stepAcc.stepRows.get(value._2());
>>             if (dc == null) {
>>                 dc = new DriverCalc();
>>                 dc._denominator = value._4();
>>                 if (value._3() != null) dc._numerator.put(value._3(),
>> value._4());
>>                 stepAcc.stepRows.put(value._2(), dc);
>>             } else {
>>                 dc._denominator = dc._denominator + value._4();
>>                 if (value._3() != null) {
>>                     Double val = dc._numerator.get(value._3());
>>                     dc._numerator.put(value._3(), new Double(val != null
>> ? val + value._4() : value._4()));
>>                 }
>>             }
>>         }
>>         return stepAccumulatorMap;
>>     }
>>
>>     public Object2ObjectOpenHashMap<String, StepAccumulator>
>> addInPlace(Object2ObjectOpenHashMap<String, StepAccumulator> r1,
>> Object2ObjectOpenHashMap<String, StepAccumulator> r2) {
>>         r2.forEach((k,v) -> r1.merge(k, v, this::mergeAcc));
>>         return r1;
>>     }
>>
>>     private StepAccumulator mergeAcc(StepAccumulator source1,
>> StepAccumulator source2) {
>>         source2.stepRows.forEach((k,v) -> source1.stepRows.merge(k, v,
>> this::denominatorMerge));
>>         return source1;
>>     }
>>
>>     private DriverCalc denominatorMerge(DriverCalc driverCalc1,
>> DriverCalc driverCalc2) {
>>         driverCalc1._denominator = driverCalc1._denominator +
>> driverCalc2._denominator;
>>         driverCalc2._numerator.forEach((k,v) ->
>> driverCalc1._numerator.merge(k, v, this::numeratorMerge));
>>         return driverCalc1;
>>     }
>>
>>     private Double numeratorMerge(Double d1, Double d2) {
>>         return d1 + d2;
>>     }
>>
>>     public Object2ObjectOpenHashMap<String, StepAccumulator>
>> zero(Object2ObjectOpenHashMap<String, StepAccumulator> initialValue) {
>>         return null;
>>     }
>>
>> }
>>
>> On Mon, Aug 3, 2015 at 6:20 PM, Ted Yu <yuzhihong@gmail.com> wrote:
>>
>>> Can you show related code in DriverAccumulator.java ?
>>>
>>> Which Spark release do you use ?
>>>
>>> Cheers
>>>
>>> On Mon, Aug 3, 2015 at 3:13 PM, Anubhav Agarwal <anubhav33@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>> I am trying to modify my code to use HDFS and multiple nodes. The code
>>>> works fine when I run it locally in a single machine with a single worker.
>>>> I have been trying to modify it and I get the following error. Any hint
>>>> would be helpful.
>>>>
>>>> java.lang.NullPointerException
>>>> 	at thomsonreuters.trailblazer.main.DriverAccumulator.addAccumulator(DriverAccumulator.java:17)
>>>> 	at thomsonreuters.trailblazer.main.DriverAccumulator.addAccumulator(DriverAccumulator.java:11)
>>>> 	at org.apache.spark.Accumulable.add(Accumulators.scala:73)
>>>> 	at thomsonreuters.trailblazer.main.AllocationBolt.queueDriverRow(AllocationBolt.java:112)
>>>> 	at thomsonreuters.trailblazer.main.AllocationBolt.executeRow(AllocationBolt.java:303)
>>>> 	at thomsonreuters.trailblazer.main.FileMapFunction.call(FileMapFunction.java:49)
>>>> 	at thomsonreuters.trailblazer.main.FileMapFunction.call(FileMapFunction.java:8)
>>>> 	at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction2$1.apply(JavaPairRDD.scala:996)
>>>> 	at org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:90)
>>>> 	at org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:90)
>>>> 	at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:647)
>>>> 	at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:647)
>>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
>>>> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>>> 	at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>>> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>> 	at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> failed in write bolt execute null
>>>> failed in write bolt execute null
>>>> java.lang.NullPointerException
>>>>
>>>>
>>>
>>
>

Mime
View raw message