spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Amit Sela (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-19067) mapGroupsWithState - arbitrary stateful operations with Structured Streaming (similar to DStream.mapWithState)
Date Thu, 09 Feb 2017 09:00:50 GMT

    [ https://issues.apache.org/jira/browse/SPARK-19067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15859228#comment-15859228
] 

Amit Sela commented on SPARK-19067:
-----------------------------------

[~tdas] I would like to add another feature to consider, that could give great flexibility
into stateful operators in Spark.

We should be able to react with the state for more than just an update (new input) or timeout
- this way you could fire based on things like Watermarks, Timers etc.

I'm currently working on this using {{updateStateByKey}} for the Beam Spark runner, but this
could be made much more efficient if Spark implements this in a low-level and could also provide
Spark with Watermark-based triggers and Timers (user sets a Timer to react, once the Timer
is done the applied function could run doing whatever it does, and a Timer could be reset
or dropped).

The simplest thing to do would be to simply make this feature here be able to scan through
the entire state as well (not just when there's new input), and since this could be expensive,
it could happen every X batches (configured) - doesn't this happened anyway today ? on clean-up
or checkpointing ?

On top of this, future development of Watermark-based triggers and Timers would be easier:
# Scanning through the state to fire if reached Watermark.
# Scanning through the state to check if user-defined Timers are ready for action.

WDYT ? 
   

> mapGroupsWithState - arbitrary stateful operations with Structured Streaming (similar
to DStream.mapWithState)
> --------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-19067
>                 URL: https://issues.apache.org/jira/browse/SPARK-19067
>             Project: Spark
>          Issue Type: New Feature
>          Components: Structured Streaming
>            Reporter: Michael Armbrust
>            Assignee: Tathagata Das
>            Priority: Critical
>
> Right now the only way to do stateful operations with with Aggregator or UDAF.  However,
this does not give users control of emission or expiration of state making it hard to implement
things like sessionization.  We should add a more general construct (probably similar to {{DStream.mapWithState}})
to structured streaming. Here is the design. 
> *Requirements*
> - Users should be able to specify a function that can do the following
> - Access the input row corresponding to a key
> - Access the previous state corresponding to a key
> - Optionally, update or remove the state
> - Output any number of new rows (or none at all)
> *Proposed API*
> {code}
> // ------------ New methods on KeyValueGroupedDataset ------------
> class KeyValueGroupedDataset[K, V] {	
> 	// Scala friendly
> 	def mapGroupsWithState[S: Encoder, U: Encoder](func: (K, Iterator[V], State[S]) =>
U)
>         def flatMapGroupsWithState[S: Encode, U: Encoder](func: (K, Iterator[V], State[S])
=> Iterator[U])
> 	// Java friendly
>        def mapGroupsWithState[S, U](func: MapGroupsWithStateFunction[K, V, S, R], stateEncoder:
Encoder[S], resultEncoder: Encoder[U])
>        def flatMapGroupsWithState[S, U](func: FlatMapGroupsWithStateFunction[K, V, S,
R], stateEncoder: Encoder[S], resultEncoder: Encoder[U])
> }
> // ------------------- New Java-friendly function classes ------------------- 
> public interface MapGroupsWithStateFunction<K, V, S, R> extends Serializable {
>   R call(K key, Iterator<V> values, state: State<S>) throws Exception;
> }
> public interface FlatMapGroupsWithStateFunction<K, V, S, R> extends Serializable
{
>   Iterator<R> call(K key, Iterator<V> values, state: State<S>) throws
Exception;
> }
> // ---------------------- Wrapper class for state data ---------------------- 
> trait KeyedState[S] {
> 	def exists(): Boolean  	
>   	def get(): S 			// throws Exception is state does not exist
> 	def getOption(): Option[S]       
> 	def update(newState: S): Unit
> 	def remove(): Unit		// exists() will be false after this
> }
> {code}
> Key Semantics of the State class
> - The state can be null.
> - If the state.remove() is called, then state.exists() will return false, and getOption
will returm None.
> - After that state.update(newState) is called, then state.exists() will return true,
and getOption will return Some(...). 
> - None of the operations are thread-safe. This is to avoid memory barriers.
> *Usage*
> {code}
> val stateFunc = (word: String, words: Iterator[String, runningCount: KeyedState[Long])
=> {
>     val newCount = words.size + runningCount.getOption.getOrElse(0L)
>     runningCount.update(newCount)
>    (word, newCount)
> }
> dataset					                        // type is Dataset[String]
>   .groupByKey[String](w => w)        	                // generates KeyValueGroupedDataset[String,
String]
>   .mapGroupsWithState[Long, (String, Long)](stateFunc)	// returns Dataset[(String, Long)]
> {code}
> *Future Directions*
> - Timeout based state expiration (that has not received data for a while)
> - General expression based expiration 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message