The per-key state S is kept in the memory. It has to be of a type that can be encoded by Datasets. All you have to do is update S every time the function is called, and the engine takes care of serializing/checkpointing the state value, and retrieving the correct version of the value when restarting from failures. So you explicitly don't have to "store" the state anywhere, the engine takes care of it under the hood. Internally, there is an interface called StateStore, which defines a component who is actually responsible for checkpointing the values, etc. And there is a single implementation of the store that keeps the values in a hashmap and writes all changes to the values to a HDFS-API-compatible fault-tolerant filesystem for checkpointing. With this, by default, you really don't have to worry about externalizing it and you don't have overload any thing in GroupState. You just use it as the example shows.

It's important to note that all the state of all the keys is distributed over the executors. So each executor will have in its memory, a fraction of the all the train state. Depending on the number of trains, and the amount of data in the state, you will have to size the cluster and the workers accordingly. If you keep a lot of state for each train, then your overall memory requirements are going to increase. So you have to be judicious about how much data to keep as state data for each key.

Regarding aggregation vs mapGroupsWithState, it's a trade-off between efficiency and flexibility. With aggregation, you can do sliding window of "24 hours" sliding every "1 hour", which will give max in "last 24 hours" updated every "1 hour". If you are okay with this approximation, then this is easiest to implement (don't forget setting watermarks) and most efficient. If you really want something more precise than that, then mapGroupsWithState is the ultimate flexible tool. However, you have to do bookkeeping of "last 24 hours" and calculate the max yourself. :)

Hope this helps.

On Wed, Aug 30, 2017 at 10:58 AM, kant kodali <> wrote:
I think I understand groupByKey/mapGroupsWithState and I am still trying to wrap my head around GroupState<S>. so, I believe I have a naive questions to ask on GroupState<S>.

If I were to represent a state that has history of events (say 24 hours) and say the number of events can be big for a given 24 hour period. where do I store the state S? An external store like Kafka or a Database or a Distributed File system ? I wonder if I can represent the state S using a DataSet that represents the history of events? GroupState also has .exists() and  .get() and if I am not wrong I should override these methods right so comparisons and retrieval from external store can work?


On Wed, Aug 30, 2017 at 1:39 AM, kant kodali <> wrote:
Hi TD,

Thanks for the explanation and for the clear pseudo code and an example!

mapGroupsWithState is cool and looks very flexible however I have few concerns and questions. For example

Say I store TrainHistory as max heap from the Java Collections library and I keep adding to to this heap for 24 hours and at some point I will run out of Java heap space right? Do I need to store TrainHistory as a DataSet or DataFrame instead of in memory max heap object from Java Collections library?

I wonder between Nested query vs  groupByKey/mapGroupsWithState which approach is more efficient to solve this particular problem ? 


On Tue, Aug 29, 2017 at 9:50 PM, Tathagata Das <> wrote:
Aah, I might have misinterpreted. The groupBy + window solution would give the max time for each train over 24 hours (non-overlapping window) of event data (timestamped by activity_timestamp). So the output would be like. 

Train     Dest   Window(activity_timestamp)    max(Time)
1         HK     Aug28-00:00 to Aug29-00:00    10:00    <- updating currently through aug29
1         HK    Aug27-00:00 to Aug28-00:00     09:00    <- not updating as no new updates coming in with activity_timestamp in this range. 

The drawback of this approach is that as soon as Aug28 starts, you have wait for new event about a train to get a new max(time). You may rather want a rolling 24 hour period, that is, the max time known over events in the last 24 hours.
Then maintaining our own custom state using mapGroupsWithState/flatMapGroupsWithState() is the best and most flexible option.
It is available in Spark 2.2 in Scala, Java.

Here is an example that tracks sessions based on events. 

You will have to create a custom per-train state which keeps track of last 24 hours of trains history, and use that state to calculate the max time for each train.

def updateHistoryAndGetMax(train: String, events: Iterator[TrainEvents], state: GroupState[TrainHistory]): Long = {
    // for every event, update history (i.e. last 24 hours of events) and return the max time from the history

trainTimesDataset     // Dataset[TrainEvents]

Hope this helps.

On Tue, Aug 29, 2017 at 5:25 PM, Burak Yavuz <> wrote:
Hey TD,

If I understood the question correctly, your solution wouldn't return the exact solution, since it also groups by on destination. I would say the easiest solution would be to use flatMapGroupsWithState, where you:

and keep in state the row with the maximum time.

On Tue, Aug 29, 2017 at 5:18 PM, Tathagata Das <> wrote:
Yes. And in that case, if you just care about only the last few days of max, then you should set watermark on the timestamp column. 

  .withWatermark("activity_timestamp", "5 days")
  .groupBy(window(activity_timestamp, "24 hours", "24 hours"), "train", "dest")

Any counts which are more than 5 days old will be dropped from the streaming state. 

On Tue, Aug 29, 2017 at 2:06 PM, kant kodali <> wrote:

Thanks for the response. Since this is a streaming based query and in my case I need to hold state for 24 hours which I forgot to mention in my previous email. can I do ? 

 trainTimesDataset.groupBy(window(activity_timestamp, "24 hours", "24 hours"), "train", "dest").max("time")

On Tue, Aug 29, 2017 at 1:38 PM, Tathagata Das <> wrote:
Say, trainTimesDataset is the streaming Dataset of schema [train: Int, dest: String, time: Timestamp] 

Scala: trainTimesDataset.groupBy("train", "dest").max("time")

SQL: "select train, dest, max(time) from trainTimesView group by train, dest"    // after calling trainTimesData.createOrReplaceTempView(trainTimesView)

On Tue, Aug 29, 2017 at 12:59 PM, kant kodali <> wrote:
Hi All,

I am wondering what is the easiest and concise way to express the computation below in Spark Structured streaming given that it supports both imperative and declarative styles? 
I am just trying to select rows that has max timestamp for each train? Instead of doing some sort of nested queries like we normally do in any relational database I am trying to see if I can leverage both imperative and declarative at the same time. If nested queries or join are not required then I would like to see how this can be possible? I am using spark 2.1.1.

Train    Dest      Time
1        HK        10:00
1        SH        12:00
1        SZ        14:00
2        HK        13:00
2        SH        09:00
2        SZ        07:00

The desired result should be:

Train    Dest      Time
1        SZ        14:00
2        HK        13:00