spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Adrian Tanase <atan...@adobe.com>
Subject Re: NullPointerException when adding to accumulator
Date Wed, 14 Oct 2015 17:19:28 GMT
Can you try to use foreachRdd instead of print and access the accumulator value in there?

Could it be that you are accessing the accumulator before its initialized or after it was
cleaned up?

Sent from my iPhone

On 14 Oct 2015, at 17:46, Sela, Amit <ANSELA@paypal.com.INVALID<mailto:ANSELA@paypal.com.invalid>>
wrote:

This is not the entire code (obtaining the Kafka stream is in EventReader), but the relevant
part is:

val kafkaStream = EventsReader.getRawStreamByEvent(ssc, classOf[LoginEvent]).cache()
val messageCount = ssc.sparkContext.accumulator(0, "message_count")
kafkaStream.map(stream => {
  messageCount += 1 // line number: (31)
  stream
}).print(1)
ssc.awaitTerminationOrTimeout(60000)
println(s"${messageCount.name.get}=${messageCount.value}")


This is the stack trace:

river stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1280)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1268)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1267)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1267)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1493)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1455)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1444)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1813)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1826)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1839)
        at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1298)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
        at org.apache.spark.rdd.RDD.take(RDD.scala:1272)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$5$1.apply(DStream.scala:722)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$5$1.apply(DStream.scala:721)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:42)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:40)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
        at scala.util.Try$.apply(Try.scala:161)
        at org.apache.spark.streaming.scheduler.Job.run(Job.scala:34)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:207)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:207)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:207)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:206)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
        at com.paypal.risk.platform.nextgen.LoginCountMessages$.increment(LoginCountMessages.scala:31)
        at com.paypal.risk.platform.nextgen.LoginCountMessages$$anonfun$1.apply(LoginCountMessages.scala:35)
        at com.paypal.risk.platform.nextgen.LoginCountMessages$$anonfun$1.apply(LoginCountMessages.scala:34)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
        at scala.collection.AbstractIterator.to(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
        at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1298)
        at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1298)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:88)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)





From: Ted Yu
Date: Wednesday, October 14, 2015 at 5:38 PM
To: "Sela, Amit"
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>"
Subject: Re: NullPointerException when adding to accumulator

Can you show your code snippet with the complete stack trace ?

Thanks

On Wed, Oct 14, 2015 at 7:30 AM, Sela, Amit <ANSELA@paypal.com.invalid<mailto:ANSELA@paypal.com.invalid>>
wrote:
I'm running a simple streaming application that reads from Kafka, maps the events and prints
them and I'm trying to use accumulators to count the number of mapped records.

While this works in standalone(IDE), when submitting to YARN I get NullPointerException on
accumulator.add(1) or accumulator += 1

Anyone using accumulators in .map() with Spark 1.5 and YARN ?

Thanks,
Amit





Mime
View raw message