spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Burak Yavuz <brk...@gmail.com>
Subject Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?
Date Wed, 30 Aug 2017 00:25:53 GMT
Hey TD,

If I understood the question correctly, your solution wouldn't return the
exact solution, since it also groups by on destination. I would say the
easiest solution would be to use flatMapGroupsWithState, where you:
.groupByKey(_.train)

and keep in state the row with the maximum time.

On Tue, Aug 29, 2017 at 5:18 PM, Tathagata Das <tathagata.das1565@gmail.com>
wrote:

> Yes. And in that case, if you just care about only the last few days of
> max, then you should set watermark on the timestamp column.
>
>  *trainTimesDataset*
> *  .withWatermark("**activity_timestamp", "5 days")*
> *  .groupBy(window(activity_timestamp, "24 hours", "24 hours"), "train",
> "dest")*
> *  .max("time")*
>
> Any counts which are more than 5 days old will be dropped from the
> streaming state.
>
> On Tue, Aug 29, 2017 at 2:06 PM, kant kodali <kanth909@gmail.com> wrote:
>
>> Hi,
>>
>> Thanks for the response. Since this is a streaming based query and in my
>> case I need to hold state for 24 hours which I forgot to mention in my
>> previous email. can I do ?
>>
>>  *trainTimesDataset.groupBy(window(activity_timestamp, "24 hours", "24
>> hours"), "train", "dest").max("time")*
>>
>>
>> On Tue, Aug 29, 2017 at 1:38 PM, Tathagata Das <
>> tathagata.das1565@gmail.com> wrote:
>>
>>> Say, *trainTimesDataset* is the streaming Dataset of schema *[train:
>>> Int, dest: String, time: Timestamp] *
>>>
>>>
>>> *Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")*
>>>
>>>
>>> *SQL*: *"select train, dest, max(time) from trainTimesView group by
>>> train, dest"*    // after calling
>>> *trainTimesData.createOrReplaceTempView(trainTimesView)*
>>>
>>>
>>> On Tue, Aug 29, 2017 at 12:59 PM, kant kodali <kanth909@gmail.com>
>>> wrote:
>>>
>>>> Hi All,
>>>>
>>>> I am wondering what is the easiest and concise way to express the
>>>> computation below in Spark Structured streaming given that it supports both
>>>> imperative and declarative styles?
>>>> I am just trying to select rows that has max timestamp for each train?
>>>> Instead of doing some sort of nested queries like we normally do in any
>>>> relational database I am trying to see if I can leverage both imperative
>>>> and declarative at the same time. If nested queries or join are not
>>>> required then I would like to see how this can be possible? I am using
>>>> spark 2.1.1.
>>>>
>>>> Dataset
>>>>
>>>> Train    Dest      Time1        HK        10:001        SH        12:001
       SZ        14:002        HK        13:002        SH        09:002        SZ        07:00
>>>>
>>>> The desired result should be:
>>>>
>>>> Train    Dest      Time1        SZ        14:002        HK        13:00
>>>>
>>>>
>>>
>>
>

Mime
View raw message