Aah, I might have misinterpreted. The groupBy + window solution would give the max time for each train over 24 hours (non-overlapping window) of event data (timestamped by activity_timestamp). So the output would be like. 

Train     Dest   Window(activity_timestamp)    max(Time)
1         HK     Aug28-00:00 to Aug29-00:00    10:00    <- updating currently through aug29
1         HK    Aug27-00:00 to Aug28-00:00     09:00    <- not updating as no new updates coming in with activity_timestamp in this range. 

The drawback of this approach is that as soon as Aug28 starts, you have wait for new event about a train to get a new max(time). You may rather want a rolling 24 hour period, that is, the max time known over events in the last 24 hours.
Then maintaining our own custom state using mapGroupsWithState/flatMapGroupsWithState() is the best and most flexible option.
It is available in Spark 2.2 in Scala, Java.

Here is an example that tracks sessions based on events. 
Scala - https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala

You will have to create a custom per-train state which keeps track of last 24 hours of trains history, and use that state to calculate the max time for each train.


def updateHistoryAndGetMax(train: String, events: Iterator[TrainEvents], state: GroupState[TrainHistory]): Long = {
    // for every event, update history (i.e. last 24 hours of events) and return the max time from the history

trainTimesDataset     // Dataset[TrainEvents]
  .groupByKey(_.train)
  .mapGroupsWithState(updateHistoryAndGetMax)

Hope this helps.


On Tue, Aug 29, 2017 at 5:25 PM, Burak Yavuz <brkyvz@gmail.com> wrote:
Hey TD,

If I understood the question correctly, your solution wouldn't return the exact solution, since it also groups by on destination. I would say the easiest solution would be to use flatMapGroupsWithState, where you:
.groupByKey(_.train)

and keep in state the row with the maximum time.

On Tue, Aug 29, 2017 at 5:18 PM, Tathagata Das <tathagata.das1565@gmail.com> wrote:
Yes. And in that case, if you just care about only the last few days of max, then you should set watermark on the timestamp column. 

 trainTimesDataset
  .withWatermark("activity_timestamp", "5 days")
  .groupBy(window(activity_timestamp, "24 hours", "24 hours"), "train", "dest")
  .max("time")

Any counts which are more than 5 days old will be dropped from the streaming state. 

On Tue, Aug 29, 2017 at 2:06 PM, kant kodali <kanth909@gmail.com> wrote:
Hi,

Thanks for the response. Since this is a streaming based query and in my case I need to hold state for 24 hours which I forgot to mention in my previous email. can I do ? 

 trainTimesDataset.groupBy(window(activity_timestamp, "24 hours", "24 hours"), "train", "dest").max("time")


On Tue, Aug 29, 2017 at 1:38 PM, Tathagata Das <tathagata.das1565@gmail.com> wrote:
Say, trainTimesDataset is the streaming Dataset of schema [train: Int, dest: String, time: Timestamp] 


Scala: trainTimesDataset.groupBy("train", "dest").max("time")


SQL: "select train, dest, max(time) from trainTimesView group by train, dest"    // after calling trainTimesData.createOrReplaceTempView(trainTimesView)


On Tue, Aug 29, 2017 at 12:59 PM, kant kodali <kanth909@gmail.com> wrote:
Hi All,

I am wondering what is the easiest and concise way to express the computation below in Spark Structured streaming given that it supports both imperative and declarative styles? 
I am just trying to select rows that has max timestamp for each train? Instead of doing some sort of nested queries like we normally do in any relational database I am trying to see if I can leverage both imperative and declarative at the same time. If nested queries or join are not required then I would like to see how this can be possible? I am using spark 2.1.1.

Dataset
Train    Dest      Time
1        HK        10:00
1        SH        12:00
1        SZ        14:00
2        HK        13:00
2        SH        09:00
2        SZ        07:00

The desired result should be:

Train    Dest      Time
1        SZ        14:00
2        HK        13:00