spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ted Yu <yuzhih...@gmail.com>
Subject Re: Error while using ConcurrentHashMap in Spark Streaming
Date Thu, 06 Aug 2015 18:15:24 GMT
bq. aggregationMap.put(countryCode,requestCountPerCountry+1);

If NPE came from the above line, maybe requestCountPerCountry was null ?

Cheers

On Thu, Aug 6, 2015 at 8:54 AM, UMESH CHAUDHARY <umesh9794@gmail.com> wrote:

> Scenario is:
>
>    - I have a map of country-code as key and count as value (initially
>    count is 0)
>    - In DStream.foreachRDD I need to update the count for country in the
>    map with new aggregated value
>
> I am doing :
>
> transient Map<String,Integer> aggregationMap=new ConcurrentHashMap<String,Integer>();
>
>
> Integer requestCountPerCountry=aggregationMap.get(countryCode);
>
> aggregationMap.put(countryCode,requestCountPerCountry+1);   // Getting Following Error
in this Line
>
>
> java.lang.NullPointerException
> 	at JavaKafkaStream$2.call(JavaKafkaStream.java:107)
> 	at JavaKafkaStream$2.call(JavaKafkaStream.java:92)
> 	at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:323)
> 	at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:323)
> 	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:42)
> 	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
> 	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
> 	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
> 	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:40)
> 	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
> 	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
> 	at scala.util.Try$.apply(Try.scala:161)
> 	at org.apache.spark.streaming.scheduler.Job.run(Job.scala:34)
> 	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:193)
> 	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:193)
> 	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:193)
> 	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> 	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:192)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> 	at java.lang.Thread.run(Thread.java:745)
>
>
>  Is this issue related to :
>
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>
> If so how can I resolve this?
>
>

Mime
View raw message