spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kant kodali <kanth...@gmail.com>
Subject Re: question on collect_list or say aggregations in general in structured streaming 2.3.0
Date Fri, 04 May 2018 08:39:15 GMT
1) I get an error when I set watermark to 0.
2) I set window and slide interval to 1 second with no watermark. It sill
aggregates messages from the previous batch that are in 1 second window.

so is it fair to say there is no declarative way to do stateless
aggregations?


On Thu, May 3, 2018 at 9:55 AM, Arun Mahadevan <arunm@apache.org> wrote:

> I think you need to group by a window (tumbling) and define watermarks
> (put a very low watermark or even 0) to discard the state. Here the window
> duration becomes your logical batch.
>
> - Arun
>
> From: kant kodali <kanth909@gmail.com>
> Date: Thursday, May 3, 2018 at 1:52 AM
> To: "user @spark" <user@spark.apache.org>
> Subject: Re: question on collect_list or say aggregations in general in
> structured streaming 2.3.0
>
> After doing some more research using Google. It's clear that aggregations
> by default are stateful in Structured Streaming. so the question now is how
> to do stateless aggregations(not storing the result from previous batches)
> using Structured Streaming 2.3.0? I am trying to do it using raw spark SQL
> so not using FlatMapsGroupWithState. And if that is not available then is
> it fair to say there is no declarative way to do stateless aggregations?
>
> On Thu, May 3, 2018 at 1:24 AM, kant kodali <kanth909@gmail.com> wrote:
>
>> Hi All,
>>
>> I was under an assumption that one needs to run grouby(window(...)) to
>> run any stateful operations but looks like that is not the case since any
>> aggregation like query
>>
>> "select count(*) from some_view"  is also stateful since it stores the
>> result of the count from the previous batch. Likewise, if I do
>>
>> "select collect_list(*) from some_view" with say maxOffsetsTrigger set to
>> 1 I can see the rows from the previous batch at every trigger.
>>
>> so is it fair to say aggregations by default are stateful?
>>
>> I am looking more like DStream like an approach(stateless) where I want
>> to collect bunch of records on each batch do some aggregation like say
>> count and throw the result out and next batch it should only count from
>> that batch only but not from the previous batch.
>>
>> so If I run "select collect_list(*) from some_view" I want to collect
>> whatever rows are available at each batch/trigger but not from the previous
>> batch. How do I do that?
>>
>> Thanks!
>>
>
>

Mime
View raw message