spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From 李剑 <hustlij...@gmail.com>
Subject mapWithState handle timeout
Date Sat, 06 Aug 2016 08:00:24 GMT
I go an error:
Cannot update the state that is timing out

Because I set the timeout:
 val newStateDstream =
newActionDstream.mapWithState(StateSpec.function(mappingFunc).timeout(Seconds(3600)).initialState(initialRDD))

In the spark code :
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/State.scala
the mappingFunction show:

/**
 * :: Experimental ::
 * Abstract class for getting and updating the state in mapping function
used in the `mapWithState`
 * operation of a [[org.apache.spark.streaming.dstream.PairDStreamFunctions
pair DStream]] (Scala)
 * or a [[org.apache.spark.streaming.api.java.JavaPairDStream
JavaPairDStream]] (Java).
 *
 * Scala example of using `State`:
 * {{{
 *    // A mapping function that maintains an integer state and returns a
String
 *    def mappingFunction(key: String, value: Option[Int], state:
State[Int]): Option[String] = {
 *      // Check if state exists
 *      if (state.exists) {
 *        val existingState = state.get  // Get the existing state
 *        val shouldRemove = ...         // Decide whether to remove the
state
 *        if (shouldRemove) {
 *          state.remove()     // Remove the state
 *        } else {
 *          val newState = ...
 *          state.update(newState)    // Set the new state
 *        }
 *      } else {
 *        val initialState = ...
 *        state.update(initialState)  // Set the initial state
 *      }
 *      ... // return something
 *    }
 *
 * }}}


update will throw exception in the timeout batch:

 /**
   * Update the state with a new value.
   *
   * State cannot be updated if it has been already removed (that is,
`remove()` has already been
   * called) or it is going to be removed due to timeout (that is,
`isTimingOut()` is `true`).
   *
   * @throws java.lang.IllegalArgumentException If the state has already
been removed, or is
   *                                            going to be removed
   */
  def update(newState: S): Unit


I wonder how to handle timeout in mappingFunc without lost current batch
data?

-- 
http://www.cnblogs.com/hustlijian/
https://github.com/hustlijian

Mime
View raw message