spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vinti Maheshwari <vinti.u...@gmail.com>
Subject Re: Spark streaming not remembering previous state
Date Sat, 27 Feb 2016 21:38:03 GMT
Thanks much Amit, Sebastian. It worked.

Regards,
~Vinti

On Sat, Feb 27, 2016 at 12:44 PM, Amit Assudani <aassudani@impetus.com>
wrote:

> Your context is not being created using checkpoints, use get or create,
>
> From: Vinti Maheshwari <vinti.uiet@gmail.com>
> Date: Saturday, February 27, 2016 at 3:28 PM
> To: user <user@spark.apache.org>
> Subject: Spark streaming not remembering previous state
>
> Hi All,
>
> I wrote spark streaming program with stateful transformation.
> It seems like my spark streaming application is doing computation
> correctly with check pointing.
> But i terminate my program and i start it again, it's not reading the
> previous checkpointing data and staring from the beginning. Is it the
> expected behaviour?
>
> Do i need to change anything in my program so that it will remember the
> previous data and start computation from there?
>
> Thanks in advance.
>
> For reference my program:
>
>
>   def main(args: Array[String]): Unit = {
>     val conf = new SparkConf().setAppName("HBaseStream")
>     val sc = new SparkContext(conf)
>     val ssc = new StreamingContext(sc, Seconds(5))
>     val inputStream = ssc.socketTextStream("ttsv-vccp-01.juniper.net", 9999)
>     ssc.checkpoint("hdfs://ttsv-lab-vmdb-01.englab.juniper.net:8020/user/spark/checkpoints_dir")
>     inputStream.print(1)
>     val parsedStream = inputStream
>       .map(line => {
>         val splitLines = line.split(",")
>         (splitLines(1), splitLines.slice(2, splitLines.length).map((_.trim.toLong)))
>       })
>     import breeze.linalg.{DenseVector => BDV}
>     import scala.util.Try
>
>     val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey(
>       (current: Seq[Array[Long]], prev: Option[Array[Long]]) =>  {
>         prev.map(_ +: current).orElse(Some(current))
>           .flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption)
>       })
>     state.checkpoint(Duration(10000))
>     state.foreachRDD(rdd => rdd.foreach(Blaher.blah))
>
>     // Start the computation
>     ssc.start()
>     // Wait for the computation to terminate
>     ssc.awaitTermination()
>
>   }
> }
>
>
> Regards,
>
> ~Vinti
>
>
> ------------------------------
>
>
>
>
>
>
> NOTE: This message may contain information that is confidential,
> proprietary, privileged or otherwise protected by law. The message is
> intended solely for the named addressee. If received in error, please
> destroy and notify the sender. Any use of this email is prohibited when
> received in error. Impetus does not represent, warrant and/or guarantee,
> that the integrity of this communication has been maintained nor that the
> communication is free of errors, virus, interception or interference.
>

Mime
View raw message