spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sebastian Piu <sebastian....@gmail.com>
Subject Re: Spark streaming not remembering previous state
Date Sat, 27 Feb 2016 20:42:28 GMT
You need to create the streaming context using an existing checkpoint for
it to work

See sample here

On Sat, 27 Feb 2016, 20:28 Vinti Maheshwari, <vinti.uiet@gmail.com> wrote:

> Hi All,
>
> I wrote spark streaming program with stateful transformation.
> It seems like my spark streaming application is doing computation
> correctly with check pointing.
> But i terminate my program and i start it again, it's not reading the
> previous checkpointing data and staring from the beginning. Is it the
> expected behaviour?
>
> Do i need to change anything in my program so that it will remember the
> previous data and start computation from there?
>
> Thanks in advance.
>
> For reference my program:
>
>
>   def main(args: Array[String]): Unit = {
>     val conf = new SparkConf().setAppName("HBaseStream")
>     val sc = new SparkContext(conf)
>     val ssc = new StreamingContext(sc, Seconds(5))
>     val inputStream = ssc.socketTextStream("ttsv-vccp-01.juniper.net", 9999)
>     ssc.checkpoint("hdfs://ttsv-lab-vmdb-01.englab.juniper.net:8020/user/spark/checkpoints_dir")
>     inputStream.print(1)
>     val parsedStream = inputStream
>       .map(line => {
>         val splitLines = line.split(",")
>         (splitLines(1), splitLines.slice(2, splitLines.length).map((_.trim.toLong)))
>       })
>     import breeze.linalg.{DenseVector => BDV}
>     import scala.util.Try
>
>     val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey(
>       (current: Seq[Array[Long]], prev: Option[Array[Long]]) =>  {
>         prev.map(_ +: current).orElse(Some(current))
>           .flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption)
>       })
>     state.checkpoint(Duration(10000))
>     state.foreachRDD(rdd => rdd.foreach(Blaher.blah))
>
>     // Start the computation
>     ssc.start()
>     // Wait for the computation to terminate
>     ssc.awaitTermination()
>
>   }
> }
>
>
> Regards,
>
> ~Vinti
>
>

Mime
View raw message