spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sebastian Piu <sebastian....@gmail.com>
Subject Re: Spark streaming not remembering previous state
Date Sat, 27 Feb 2016 20:42:39 GMT
Here:
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala

On Sat, 27 Feb 2016, 20:42 Sebastian Piu, <sebastian.piu@gmail.com> wrote:

> You need to create the streaming context using an existing checkpoint for
> it to work
>
> See sample here
>
> On Sat, 27 Feb 2016, 20:28 Vinti Maheshwari, <vinti.uiet@gmail.com> wrote:
>
>> Hi All,
>>
>> I wrote spark streaming program with stateful transformation.
>> It seems like my spark streaming application is doing computation
>> correctly with check pointing.
>> But i terminate my program and i start it again, it's not reading the
>> previous checkpointing data and staring from the beginning. Is it the
>> expected behaviour?
>>
>> Do i need to change anything in my program so that it will remember the
>> previous data and start computation from there?
>>
>> Thanks in advance.
>>
>> For reference my program:
>>
>>
>>   def main(args: Array[String]): Unit = {
>>     val conf = new SparkConf().setAppName("HBaseStream")
>>     val sc = new SparkContext(conf)
>>     val ssc = new StreamingContext(sc, Seconds(5))
>>     val inputStream = ssc.socketTextStream("ttsv-vccp-01.juniper.net", 9999)
>>     ssc.checkpoint("hdfs://ttsv-lab-vmdb-01.englab.juniper.net:8020/user/spark/checkpoints_dir")
>>     inputStream.print(1)
>>     val parsedStream = inputStream
>>       .map(line => {
>>         val splitLines = line.split(",")
>>         (splitLines(1), splitLines.slice(2, splitLines.length).map((_.trim.toLong)))
>>       })
>>     import breeze.linalg.{DenseVector => BDV}
>>     import scala.util.Try
>>
>>     val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey(
>>       (current: Seq[Array[Long]], prev: Option[Array[Long]]) =>  {
>>         prev.map(_ +: current).orElse(Some(current))
>>           .flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption)
>>       })
>>     state.checkpoint(Duration(10000))
>>     state.foreachRDD(rdd => rdd.foreach(Blaher.blah))
>>
>>     // Start the computation
>>     ssc.start()
>>     // Wait for the computation to terminate
>>     ssc.awaitTermination()
>>
>>   }
>> }
>>
>>
>> Regards,
>>
>> ~Vinti
>>
>>

Mime
View raw message