spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Amit Assudani <aassud...@impetus.com>
Subject Re: Spark streaming not remembering previous state
Date Sat, 27 Feb 2016 20:44:19 GMT
Your context is not being created using checkpoints, use get or create,

From: Vinti Maheshwari <vinti.uiet@gmail.com<mailto:vinti.uiet@gmail.com>>
Date: Saturday, February 27, 2016 at 3:28 PM
To: user <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Spark streaming not remembering previous state

Hi All,

I wrote spark streaming program with stateful transformation.
It seems like my spark streaming application is doing computation correctly with check pointing.
But i terminate my program and i start it again, it's not reading the previous checkpointing
data and staring from the beginning. Is it the expected behaviour?

Do i need to change anything in my program so that it will remember the previous data and
start computation from there?

Thanks in advance.

For reference my program:

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("HBaseStream")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(5))
    val inputStream = ssc.socketTextStream("ttsv-vccp-01.juniper.net<http://ttsv-vccp-01.juniper.net>",
9999)
    ssc.checkpoint("hdfs://ttsv-lab-vmdb-01.englab.juniper.net:8020/user/spark/checkpoints_dir<http://ttsv-lab-vmdb-01.englab.juniper.net:8020/user/spark/checkpoints_dir>")
    inputStream.print(1)
    val parsedStream = inputStream
      .map(line => {
        val splitLines = line.split(",")
        (splitLines(1), splitLines.slice(2, splitLines.length).map((_.trim.toLong)))
      })
    import breeze.linalg.{DenseVector => BDV}
    import scala.util.Try

    val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey(
      (current: Seq[Array[Long]], prev: Option[Array[Long]]) =>  {
        prev.map(_ +: current).orElse(Some(current))
          .flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption)
      })
    state.checkpoint(Duration(10000))
    state.foreachRDD(rdd => rdd.foreach(Blaher.blah))

    // Start the computation
    ssc.start()
    // Wait for the computation to terminate
    ssc.awaitTermination()

  }
}


Regards,

~Vinti

________________________________






NOTE: This message may contain information that is confidential, proprietary, privileged or
otherwise protected by law. The message is intended solely for the named addressee. If received
in error, please destroy and notify the sender. Any use of this email is prohibited when received
in error. Impetus does not represent, warrant and/or guarantee, that the integrity of this
communication has been maintained nor that the communication is free of errors, virus, interception
or interference.

Mime
View raw message