I go an error: 
Cannot update the state that is timing out

Because I set the timeout: 
 val newStateDstream = newActionDstream.mapWithState(StateSpec.function(mappingFunc).timeout(Seconds(3600)).initialState(initialRDD))

In the spark code : https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/State.scala
the mappingFunction show: 

/** 
 * :: Experimental :: 
 * Abstract class for getting and updating the state in mapping function used in the `mapWithState` 
 * operation of a [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair DStream]] (Scala) 
 * or a [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] (Java). 
 * 
 * Scala example of using `State`: 
 * {{{ 
 *    // A mapping function that maintains an integer state and returns a String 
 *    def mappingFunction(key: String, value: Option[Int], state: State[Int]): Option[String] = { 
 *      // Check if state exists 
 *      if (state.exists) { 
 *        val existingState = state.get  // Get the existing state 
 *        val shouldRemove = ...         // Decide whether to remove the state 
 *        if (shouldRemove) { 
 *          state.remove()     // Remove the state 
 *        } else { 
 *          val newState = ... 
 *          state.update(newState)    // Set the new state 
 *        } 
 *      } else { 
 *        val initialState = ... 
 *        state.update(initialState)  // Set the initial state 
 *      } 
 *      ... // return something 
 *    } 
 * 
 * }}} 


update will throw exception in the timeout batch: 

 /** 
   * Update the state with a new value. 
   * 
   * State cannot be updated if it has been already removed (that is, `remove()` has already been 
   * called) or it is going to be removed due to timeout (that is, `isTimingOut()` is `true`). 
   * 
   * @throws java.lang.IllegalArgumentException If the state has already been removed, or is 
   *                                            going to be removed 
   */ 
  def update(newState: S): Unit 


I wonder how to handle timeout in mappingFunc without lost current batch data?

-- 
http://www.cnblogs.com/hustlijian/
https://github.com/hustlijian