spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michael Armbrust <>
Subject Re: StateStoreSaveExec / StateStoreRestoreExec
Date Tue, 03 Jan 2017 23:36:44 GMT
I think we should add something similar to mapWithState in 2.2.  It would
be great if you could add the description of your problem to this ticket:

On Mon, Jan 2, 2017 at 2:05 PM, Jeremy Smith <>

> I have a question about state tracking in Structured Streaming.
> First let me briefly explain my use case: Given a mutable data source
> (i.e. an RDBMS) in which we assume we can retrieve a set of newly created
> row versions (being a row that was created or updated between two given
> `Offset`s, whatever those are), we can create a Structured Streaming
> `Source` which retrieves the new row versions. Further assuming that every
> logical row has some primary key, then as long as we can track the current
> offset for each primary key, we can differentiate between new and updated
> rows. Then, when a row is updated, we can record that the previous version
> of that row expired at some particular time. That's essentially what I'm
> trying to do. This would effectively give you an "event-sourcing" type of
> historical/immutable log of changes out of a mutable data source.
> I noticed that in Spark 2.0.1 there was a concept of a StateStore, which
> seemed like it would allow me to do exactly the tracking that I needed, so
> I decided to try and use that built-in functionality rather than some
> external key/value store for storing the current "version number" of each
> primary key. There were a lot of hard-coded hoops I had to jump through,
> but I eventually made it work by implementing some custom LogicalPlans and
> SparkPlans around StateStore[Save/Restore]Exec.
> Now, in Spark 2.1.0 it seems to have gotten even further away from what I
> was using it for - the keyExpressions of StateStoreSaveExec must include a
> timestamp column, which means that those expressions are not really keys
> (at least not for a logical row). So it appears I can't use it that way
> anymore (I can't blame Spark for this, as I knew what I was getting into
> when leveraging developer APIs). There are also several hard-coded checks
> which now make it clear that StateStore functionality is only to be used
> for streaming aggregates, which is not really what I'm doing.
> My question is - is there a good way to accomplish the above use case
> within Structured Streaming? Or is this the wrong use case for the state
> tracking functionality (which increasingly seems to be targeted toward
> aggregates only)? Is there a plan for any kind of generalized
> `mapWithState`-type functionality for Structured Streaming, or should I
> just give up on that and use an external key/value store for my state
> tracking?
> Thanks,
> Jeremy

View raw message