spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tathagata Das <t...@databricks.com>
Subject Re: Exception when restoring spark streaming with batch RDD from checkpoint.
Date Wed, 09 Sep 2015 03:24:17 GMT
Probably, the problem here is that the recovered StreamingContext is trying
to refer to the pre-failure static RDD, which does exist after the failure.
The solution: When the driver process restarts from checkpoint, you need to
recreate the static RDD again explicitly, and make that the recreated RDD
is used. obviously does not exist any more after recovering from failure.
Something like this.

object StaticRDDLazySingleton {
    @transient @volatile private var singleton = null
    def getOrCreate(): RDD[] = {
       if (singleton == null)   {
          singleton = // Create RDD
       }
       singleton
    }
}


dstream.transform { rdd =>

     rdd.join(StaticRDDLazySingleton.getOrCreate()). . ..... // use the
lazily instantiated singleton RDD
}


On Tue, Sep 8, 2015 at 3:21 AM, Akhil Das <akhil@sigmoidanalytics.com>
wrote:

> Try to add a filter to remove/replace the null elements within/before the
> map operation.
>
> Thanks
> Best Regards
>
> On Mon, Sep 7, 2015 at 3:34 PM, ZhengHanbin <hanbin_zheng@163.com> wrote:
>
>> Hi,
>>
>> I am using spark streaming to join every RDD of a DStream to a stand
>> alone RDD to generate a new DStream as followed:
>>
>> *def joinWithBatchEvent(contentFeature: RDD[(String, String)],*
>> *                       batchEvent: DStream[((String, String), (Long,
>> Double, Double))]) = {*
>> *  batchEvent.map(event => {*
>> *    (event._1._2, (event._1._1, event._2._1, event._2._2, event._2._3))*
>> *  }).transform(eventRDD => {*
>> *    eventRDD.leftOuterJoin(contentFeature).map(result =>*
>> *      (result._2._1._1, (result._1, result._2._1._2, result._2._1._3,
>> result._2._1._4, result._2._2))*
>> *    )*
>> *  })*
>> *}*
>>
>> It works well when it start from a new StreamContext.
>> But if the StreamContext is restored from checkpoint, there will be an
>> exception as followed and the Graph can not be setup.
>> Do you know how to solve this problem? Thanks very much!
>>
>> 5/09/07 14:07:18 INFO spark.SparkContext: Starting job: saveAsTextFiles
>> at CFBModel.scala:49
>> 15/09/07 14:07:18 INFO scheduler.DAGScheduler: Registering RDD 12
>> (repartition at EventComponent.scala:64)
>> 15/09/07 14:07:18 INFO scheduler.DAGScheduler: Registering RDD 17
>> (flatMap at CFBModel.scala:25)
>> 15/09/07 14:07:18 INFO scheduler.DAGScheduler: Registering RDD 20 (map at
>> ContentFeature.scala:100)
>> 15/09/07 14:07:18 WARN scheduler.DAGScheduler: Creating new stage failed
>> due to exception - job: 1
>> java.lang.IllegalArgumentException: Flat hash tables cannot contain null
>> elements.
>> at
>> scala.collection.mutable.FlatHashTable$HashUtils$class.elemHashCode(FlatHashTable.scala:390)
>> at scala.collection.mutable.HashSet.elemHashCode(HashSet.scala:41)
>> at
>> scala.collection.mutable.FlatHashTable$class.findEntryImpl(FlatHashTable.scala:123)
>> at
>> scala.collection.mutable.FlatHashTable$class.containsEntry(FlatHashTable.scala:119)
>> at scala.collection.mutable.HashSet.containsEntry(HashSet.scala:41)
>> at scala.collection.mutable.HashSet.contains(HashSet.scala:58)
>> at scala.collection.GenSetLike$class.apply(GenSetLike.scala:43)
>> at scala.collection.mutable.AbstractSet.apply(Set.scala:45)
>> at org.apache.spark.scheduler.DAGScheduler.visit$2(DAGScheduler.scala:336)
>> at
>> org.apache.spark.scheduler.DAGScheduler.getAncestorShuffleDependencies(DAGScheduler.scala:355)
>> at
>> org.apache.spark.scheduler.DAGScheduler.registerShuffleDependencies(DAGScheduler.scala:317)
>> at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$getShuffleMapStage(DAGScheduler.scala:218)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:301)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:298)
>> at scala.collection.immutable.List.foreach(List.scala:318)
>> at org.apache.spark.scheduler.DAGScheduler.visit$1(DAGScheduler.scala:298)
>> at
>> org.apache.spark.scheduler.DAGScheduler.getParentStages(DAGScheduler.scala:310)
>> at
>> org.apache.spark.scheduler.DAGScheduler.newStage(DAGScheduler.scala:244)
>> at
>> org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:731)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>> 15/09/07 14:07:18 INFO scheduler.DAGScheduler: Job 1 failed:
>> saveAsTextFiles at CFBModel.scala:49, took 0.016406 s
>> 15/09/07 14:07:18 ERROR scheduler.JobScheduler: Error running job
>> streaming job 1441605900000 ms.0
>> java.lang.IllegalArgumentException: Flat hash tables cannot contain null
>> elements.
>> at
>> scala.collection.mutable.FlatHashTable$HashUtils$class.elemHashCode(FlatHashTable.scala:390)
>> at scala.collection.mutable.HashSet.elemHashCode(HashSet.scala:41)
>> at
>> scala.collection.mutable.FlatHashTable$class.findEntryImpl(FlatHashTable.scala:123)
>> at
>> scala.collection.mutable.FlatHashTable$class.containsEntry(FlatHashTable.scala:119)
>> at scala.collection.mutable.HashSet.containsEntry(HashSet.scala:41)
>> at scala.collection.mutable.HashSet.contains(HashSet.scala:58)
>> at scala.collection.GenSetLike$class.apply(GenSetLike.scala:43)
>> at scala.collection.mutable.AbstractSet.apply(Set.scala:45)
>> at org.apache.spark.scheduler.DAGScheduler.visit$2(DAGScheduler.scala:336)
>> at
>> org.apache.spark.scheduler.DAGScheduler.getAncestorShuffleDependencies(DAGScheduler.scala:355)
>> at
>> org.apache.spark.scheduler.DAGScheduler.registerShuffleDependencies(DAGScheduler.scala:317)
>> at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$getShuffleMapStage(DAGScheduler.scala:218)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:301)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:298)
>> at scala.collection.immutable.List.foreach(List.scala:318)
>> at org.apache.spark.scheduler.DAGScheduler.visit$1(DAGScheduler.scala:298)
>> at
>> org.apache.spark.scheduler.DAGScheduler.getParentStages(DAGScheduler.scala:310)
>> at
>> org.apache.spark.scheduler.DAGScheduler.newStage(DAGScheduler.scala:244)
>> at
>> org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:731)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>> Exception in thread "main" java.lang.IllegalArgumentException: Flat hash
>> tables cannot contain null elements.
>> at
>> scala.collection.mutable.FlatHashTable$HashUtils$class.elemHashCode(FlatHashTable.scala:390)
>> at scala.collection.mutable.HashSet.elemHashCode(HashSet.scala:41)
>> at
>> scala.collection.mutable.FlatHashTable$class.findEntryImpl(FlatHashTable.scala:123)
>> at
>> scala.collection.mutable.FlatHashTable$class.containsEntry(FlatHashTable.scala:119)
>> at scala.collection.mutable.HashSet.containsEntry(HashSet.scala:41)
>> at scala.collection.mutable.HashSet.contains(HashSet.scala:58)
>> at scala.collection.GenSetLike$class.apply(GenSetLike.scala:43)
>> at scala.collection.mutable.AbstractSet.apply(Set.scala:45)
>> at org.apache.spark.scheduler.DAGScheduler.visit$2(DAGScheduler.scala:336)
>> at
>> org.apache.spark.scheduler.DAGScheduler.getAncestorShuffleDependencies(DAGScheduler.scala:355)
>> at
>> org.apache.spark.scheduler.DAGScheduler.registerShuffleDependencies(DAGScheduler.scala:317)
>> at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$getShuffleMapStage(DAGScheduler.scala:218)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:301)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:298)
>> at scala.collection.immutable.List.foreach(List.scala:318)
>> at org.apache.spark.scheduler.DAGScheduler.visit$1(DAGScheduler.scala:298)
>> at
>> org.apache.spark.scheduler.DAGScheduler.getParentStages(DAGScheduler.scala:310)
>> at
>> org.apache.spark.scheduler.DAGScheduler.newStage(DAGScheduler.scala:244)
>> at
>> org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:731)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>
>>
>> Thanks,
>> Hanbin Zheng
>>
>>
>

Mime
View raw message