spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tathagata Das <t...@databricks.com>
Subject Re: spark streaming job fails to restart after checkpointing due to DStream initialization errors
Date Sat, 27 Jun 2015 07:41:21 GMT
Could you also provide the code where you set up the Kafka dstream? I dont
see it in the snippet.

On Fri, Jun 26, 2015 at 2:45 PM, Ashish Nigam <ashnigamtech@gmail.com>
wrote:

> Here's code -
>
> def createStreamingContext(checkpointDirectory: String) :
> StreamingContext = {
>
>     val conf = new SparkConf().setAppName("KafkaConsumer")
>
>     conf.set("spark.eventLog.enabled", "false")
>
>     logger.info("Going to init spark context")
>
>     conf.getOption("spark.master") match {
>
>     case Some(master) => println(master)
>
>     case None => conf.setMaster("local[*]")
>
>     }
>
>     val sc = new SparkContext(conf)
>
>     // Create a StreamingContext with a 5 second batch size
>
>     val ssc = new StreamingContext(sc, Seconds(5))
>
>     ssc.checkpoint(checkpointDirectory)
>
>     ssc
>
>     }
>
>
> And here's how it is being called -
>
>
> val ssc = StreamingContext.getOrCreate(checkpointDir, () => {
>
>         createStreamingContext(checkpointDir)
>
>       })
>
>
> On Fri, Jun 26, 2015 at 2:05 PM, Cody Koeninger <cody@koeninger.org>
> wrote:
>
>> Make sure you're following the docs regarding setting up a streaming
>> checkpoint.
>>
>> Post your code if you can't get it figured out.
>>
>> On Fri, Jun 26, 2015 at 3:45 PM, Ashish Nigam <ashnigamtech@gmail.com>
>> wrote:
>>
>>> I bring up spark streaming job that uses Kafka as input source.
>>> No data to process and then shut it down. And bring it back again.
>>> This time job does not start because it complains that DStream is not
>>> initialized.
>>>
>>> 15/06/26 01:10:44 ERROR yarn.ApplicationMaster: User class threw
>>> exception: org.apache.spark.streaming.dstream.UnionDStream@6135e5d8 has
>>> not been initialized
>>>
>>> org.apache.spark.SparkException:
>>> org.apache.spark.streaming.dstream.UnionDStream@6135e5d8 has not been
>>> initialized
>>>
>>>         at
>>> org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266)
>>>
>>>         at
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
>>>
>>>         at
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
>>>
>>>         at scala.Option.orElse(Option.scala:257)
>>>
>>>         at
>>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
>>>
>>>         at
>>> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
>>>
>>>         at
>>> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
>>>
>>>         at
>>> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
>>>
>>>         at
>>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>>>
>>>         at
>>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>>>
>>>         at
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>
>>>         at
>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>
>>>         at
>>> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>>>
>>>         at
>>> scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>>>
>>>         at
>>> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
>>>
>>>         at
>>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:227)
>>>
>>>         at
>>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:222)
>>>
>>>         at
>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>>
>>>         at
>>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>>
>>>         at
>>> org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:222)
>>>
>>>         at
>>> org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:90)
>>>
>>>         at
>>> org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:67)
>>>
>>>         at
>>> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:512)
>>>                ..........
>>>
>>> I am using spark 1.3.1 and spark-streaming-kafka 1.3.1 versions.
>>>
>>> Any idea how to resolve this issue?
>>>
>>> Thanks
>>> Ashish
>>>
>>>
>>
>

Mime
View raw message