spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kant kodali <kanth...@gmail.com>
Subject Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?
Date Wed, 30 Aug 2017 04:22:36 GMT
Can I do sub queries using DataSet or DataFrame API's but not raw sql
unless it is a final resort?

On Tue, Aug 29, 2017 at 8:47 PM, ayan guha <guha.ayan@gmail.com> wrote:

> This is not correct. In SQL Land, your query should be like below:
>
> select * from (
> select Train,DEST,TIME, row_number() over (partition by train order by
> time desc) r
> from TrainTable
> ) x where r=1
>
> All the constructs supported in dataframe functions.
>
> On Wed, Aug 30, 2017 at 1:08 PM, kant kodali <kanth909@gmail.com> wrote:
>
>> yes in a relational db one could just do this
>>
>> SELECT t.Train, t.Dest, r.MaxTimeFROM (
>>       SELECT Train, MAX(Time) as MaxTime
>>       FROM TrainTable
>>       GROUP BY Train) rINNER JOIN TrainTable tON t.Train = r.Train AND t.Time = r.MaxTime
>>
>> (copied answer from a Stack overflow question someone asked)
>>
>> but still thinking how to do this in a streaming setting?
>> mapGroupWithState looks interesting but its just not available in 2.1.1 If
>> I am not wrong.
>>
>> Thanks!
>>
>> On Tue, Aug 29, 2017 at 7:50 PM, Burak Yavuz <brkyvz@gmail.com> wrote:
>>
>>> That just gives you the max time for each train. If I understood the
>>> question correctly, OP wants the whole row with the max time. That's
>>> generally solved through joins or subqueries, which would be hard to do in
>>> a streaming setting
>>>
>>> On Aug 29, 2017 7:29 PM, "ayan guha" <guha.ayan@gmail.com> wrote:
>>>
>>>> Why removing the destination from the window wont work? Like this:
>>>>
>>>>  *trainTimesDataset*
>>>> *  .withWatermark("**activity_timestamp", "5 days")*
>>>> *  .groupBy(window(activity_timestamp, "24 hours", "24 hours"),
>>>> "train")*
>>>> *  .max("time")*
>>>>
>>>> On Wed, Aug 30, 2017 at 10:38 AM, kant kodali <kanth909@gmail.com>
>>>> wrote:
>>>>
>>>>> @Burak so how would the transformation or query would look like for
>>>>> the above example? I don't see flatMapGroupsWithState in the DataSet
>>>>> API Spark 2.1.1. I may be able to upgrade to 2.2.0 if that makes life
>>>>> easier.
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Aug 29, 2017 at 5:25 PM, Burak Yavuz <brkyvz@gmail.com>
wrote:
>>>>>
>>>>>> Hey TD,
>>>>>>
>>>>>> If I understood the question correctly, your solution wouldn't return
>>>>>> the exact solution, since it also groups by on destination. I would
say the
>>>>>> easiest solution would be to use flatMapGroupsWithState, where you:
>>>>>> .groupByKey(_.train)
>>>>>>
>>>>>> and keep in state the row with the maximum time.
>>>>>>
>>>>>> On Tue, Aug 29, 2017 at 5:18 PM, Tathagata Das <
>>>>>> tathagata.das1565@gmail.com> wrote:
>>>>>>
>>>>>>> Yes. And in that case, if you just care about only the last few
days
>>>>>>> of max, then you should set watermark on the timestamp column.
>>>>>>>
>>>>>>>  *trainTimesDataset*
>>>>>>> *  .withWatermark("**activity_timestamp", "5 days")*
>>>>>>> *  .groupBy(window(activity_timestamp, "24 hours", "24 hours"),
>>>>>>> "train", "dest")*
>>>>>>> *  .max("time")*
>>>>>>>
>>>>>>> Any counts which are more than 5 days old will be dropped from
the
>>>>>>> streaming state.
>>>>>>>
>>>>>>> On Tue, Aug 29, 2017 at 2:06 PM, kant kodali <kanth909@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Thanks for the response. Since this is a streaming based
query and
>>>>>>>> in my case I need to hold state for 24 hours which I forgot
to mention in
>>>>>>>> my previous email. can I do ?
>>>>>>>>
>>>>>>>>  *trainTimesDataset.groupBy(window(activity_timestamp, "24
hours",
>>>>>>>> "24 hours"), "train", "dest").max("time")*
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Aug 29, 2017 at 1:38 PM, Tathagata Das <
>>>>>>>> tathagata.das1565@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Say, *trainTimesDataset* is the streaming Dataset of
schema *[train:
>>>>>>>>> Int, dest: String, time: Timestamp] *
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")*
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *SQL*: *"select train, dest, max(time) from trainTimesView
group
>>>>>>>>> by train, dest"*    // after calling
>>>>>>>>> *trainTimesData.createOrReplaceTempView(trainTimesView)*
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Aug 29, 2017 at 12:59 PM, kant kodali <kanth909@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi All,
>>>>>>>>>>
>>>>>>>>>> I am wondering what is the easiest and concise way
to express the
>>>>>>>>>> computation below in Spark Structured streaming given
that it supports both
>>>>>>>>>> imperative and declarative styles?
>>>>>>>>>> I am just trying to select rows that has max timestamp
for each
>>>>>>>>>> train? Instead of doing some sort of nested queries
like we normally do in
>>>>>>>>>> any relational database I am trying to see if I can
leverage both
>>>>>>>>>> imperative and declarative at the same time. If nested
queries or join are
>>>>>>>>>> not required then I would like to see how this can
be possible? I am using
>>>>>>>>>> spark 2.1.1.
>>>>>>>>>>
>>>>>>>>>> Dataset
>>>>>>>>>>
>>>>>>>>>> Train    Dest      Time1        HK        10:001
       SH        12:001        SZ        14:002        HK        13:002        SH        09:002
       SZ        07:00
>>>>>>>>>>
>>>>>>>>>> The desired result should be:
>>>>>>>>>>
>>>>>>>>>> Train    Dest      Time1        SZ        14:002
       HK        13:00
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Best Regards,
>>>> Ayan Guha
>>>>
>>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>

Mime
View raw message