spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Akhil Das <ak...@sigmoidanalytics.com>
Subject Re: Exception when restoring spark streaming with batch RDD from checkpoint.
Date Tue, 08 Sep 2015 10:21:43 GMT
Try to add a filter to remove/replace the null elements within/before the
map operation.

Thanks
Best Regards

On Mon, Sep 7, 2015 at 3:34 PM, ZhengHanbin <hanbin_zheng@163.com> wrote:

> Hi,
>
> I am using spark streaming to join every RDD of a DStream to a stand alone
> RDD to generate a new DStream as followed:
>
> *def joinWithBatchEvent(contentFeature: RDD[(String, String)],*
> *                       batchEvent: DStream[((String, String), (Long,
> Double, Double))]) = {*
> *  batchEvent.map(event => {*
> *    (event._1._2, (event._1._1, event._2._1, event._2._2, event._2._3))*
> *  }).transform(eventRDD => {*
> *    eventRDD.leftOuterJoin(contentFeature).map(result =>*
> *      (result._2._1._1, (result._1, result._2._1._2, result._2._1._3,
> result._2._1._4, result._2._2))*
> *    )*
> *  })*
> *}*
>
> It works well when it start from a new StreamContext.
> But if the StreamContext is restored from checkpoint, there will be an
> exception as followed and the Graph can not be setup.
> Do you know how to solve this problem? Thanks very much!
>
> 5/09/07 14:07:18 INFO spark.SparkContext: Starting job: saveAsTextFiles at
> CFBModel.scala:49
> 15/09/07 14:07:18 INFO scheduler.DAGScheduler: Registering RDD 12
> (repartition at EventComponent.scala:64)
> 15/09/07 14:07:18 INFO scheduler.DAGScheduler: Registering RDD 17 (flatMap
> at CFBModel.scala:25)
> 15/09/07 14:07:18 INFO scheduler.DAGScheduler: Registering RDD 20 (map at
> ContentFeature.scala:100)
> 15/09/07 14:07:18 WARN scheduler.DAGScheduler: Creating new stage failed
> due to exception - job: 1
> java.lang.IllegalArgumentException: Flat hash tables cannot contain null
> elements.
> at
> scala.collection.mutable.FlatHashTable$HashUtils$class.elemHashCode(FlatHashTable.scala:390)
> at scala.collection.mutable.HashSet.elemHashCode(HashSet.scala:41)
> at
> scala.collection.mutable.FlatHashTable$class.findEntryImpl(FlatHashTable.scala:123)
> at
> scala.collection.mutable.FlatHashTable$class.containsEntry(FlatHashTable.scala:119)
> at scala.collection.mutable.HashSet.containsEntry(HashSet.scala:41)
> at scala.collection.mutable.HashSet.contains(HashSet.scala:58)
> at scala.collection.GenSetLike$class.apply(GenSetLike.scala:43)
> at scala.collection.mutable.AbstractSet.apply(Set.scala:45)
> at org.apache.spark.scheduler.DAGScheduler.visit$2(DAGScheduler.scala:336)
> at
> org.apache.spark.scheduler.DAGScheduler.getAncestorShuffleDependencies(DAGScheduler.scala:355)
> at
> org.apache.spark.scheduler.DAGScheduler.registerShuffleDependencies(DAGScheduler.scala:317)
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$getShuffleMapStage(DAGScheduler.scala:218)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:301)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:298)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at org.apache.spark.scheduler.DAGScheduler.visit$1(DAGScheduler.scala:298)
> at
> org.apache.spark.scheduler.DAGScheduler.getParentStages(DAGScheduler.scala:310)
> at org.apache.spark.scheduler.DAGScheduler.newStage(DAGScheduler.scala:244)
> at
> org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:731)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> 15/09/07 14:07:18 INFO scheduler.DAGScheduler: Job 1 failed:
> saveAsTextFiles at CFBModel.scala:49, took 0.016406 s
> 15/09/07 14:07:18 ERROR scheduler.JobScheduler: Error running job
> streaming job 1441605900000 ms.0
> java.lang.IllegalArgumentException: Flat hash tables cannot contain null
> elements.
> at
> scala.collection.mutable.FlatHashTable$HashUtils$class.elemHashCode(FlatHashTable.scala:390)
> at scala.collection.mutable.HashSet.elemHashCode(HashSet.scala:41)
> at
> scala.collection.mutable.FlatHashTable$class.findEntryImpl(FlatHashTable.scala:123)
> at
> scala.collection.mutable.FlatHashTable$class.containsEntry(FlatHashTable.scala:119)
> at scala.collection.mutable.HashSet.containsEntry(HashSet.scala:41)
> at scala.collection.mutable.HashSet.contains(HashSet.scala:58)
> at scala.collection.GenSetLike$class.apply(GenSetLike.scala:43)
> at scala.collection.mutable.AbstractSet.apply(Set.scala:45)
> at org.apache.spark.scheduler.DAGScheduler.visit$2(DAGScheduler.scala:336)
> at
> org.apache.spark.scheduler.DAGScheduler.getAncestorShuffleDependencies(DAGScheduler.scala:355)
> at
> org.apache.spark.scheduler.DAGScheduler.registerShuffleDependencies(DAGScheduler.scala:317)
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$getShuffleMapStage(DAGScheduler.scala:218)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:301)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:298)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at org.apache.spark.scheduler.DAGScheduler.visit$1(DAGScheduler.scala:298)
> at
> org.apache.spark.scheduler.DAGScheduler.getParentStages(DAGScheduler.scala:310)
> at org.apache.spark.scheduler.DAGScheduler.newStage(DAGScheduler.scala:244)
> at
> org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:731)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> Exception in thread "main" java.lang.IllegalArgumentException: Flat hash
> tables cannot contain null elements.
> at
> scala.collection.mutable.FlatHashTable$HashUtils$class.elemHashCode(FlatHashTable.scala:390)
> at scala.collection.mutable.HashSet.elemHashCode(HashSet.scala:41)
> at
> scala.collection.mutable.FlatHashTable$class.findEntryImpl(FlatHashTable.scala:123)
> at
> scala.collection.mutable.FlatHashTable$class.containsEntry(FlatHashTable.scala:119)
> at scala.collection.mutable.HashSet.containsEntry(HashSet.scala:41)
> at scala.collection.mutable.HashSet.contains(HashSet.scala:58)
> at scala.collection.GenSetLike$class.apply(GenSetLike.scala:43)
> at scala.collection.mutable.AbstractSet.apply(Set.scala:45)
> at org.apache.spark.scheduler.DAGScheduler.visit$2(DAGScheduler.scala:336)
> at
> org.apache.spark.scheduler.DAGScheduler.getAncestorShuffleDependencies(DAGScheduler.scala:355)
> at
> org.apache.spark.scheduler.DAGScheduler.registerShuffleDependencies(DAGScheduler.scala:317)
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$getShuffleMapStage(DAGScheduler.scala:218)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:301)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:298)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at org.apache.spark.scheduler.DAGScheduler.visit$1(DAGScheduler.scala:298)
> at
> org.apache.spark.scheduler.DAGScheduler.getParentStages(DAGScheduler.scala:310)
> at org.apache.spark.scheduler.DAGScheduler.newStage(DAGScheduler.scala:244)
> at
> org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:731)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>
>
> Thanks,
> Hanbin Zheng
>
>

Mime
View raw message