kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sachin Mittal <sjmit...@gmail.com>
Subject Re: How does 'TimeWindows.of().until()' work?
Date Wed, 14 Dec 2016 04:16:44 GMT
Hi,
Thanks for the explanation. This illustration makes it super easy to
understand how until works. Perhaps we can update the wiki with this
illustration.
It is basically the retention time for a past window.
I used to think until creates all the future windows for that period and
when time passes that it used to delete all the past windows. However
actually until retains a window for specified time. This makes so much more
sense.

I just had one pending query regarding:

> windowstore.changelog.additional.retention.ms

How does this relate to rentention.ms param of topic config?
I create internal topic manually using say rentention.ms=3600000.
In next release (post kafka_2.10-0.10.0.1) since we support delete of
internal changelog topic as well and I want it to be retained for say just
1 hour.
So how does that above parameter interfere with this topic level setting.
Or now I just need to set above config as 3600000 and not add
rentention.ms=3600000
while creating internal topic.

Thanks
Sachin


On Tue, Dec 13, 2016 at 11:27 PM, Matthias J. Sax <matthias@confluent.io>
wrote:

> First, windows are only created if there is actual data for a window. So
> you get windows [0, 50), [25, 75), [50, 100) only if there are record
> falling into each window (btw: window start-time is inclusive while
> window end time is exclusive). If you have only 2 record with lets say
> ts=20 and ts=90 you will not have an open window [25,75). Each window is
> physically created each time the first record for it is processed.
>
> If you have above 4 windows and a record with ts=101 arrives, a new
> window [101,151) will be created. Window [0,50) will not be deleted yet,
> because retention is 100 and thus Streams guarantees that all record
> with ts >= 1 (= 101 - 100) are still processed correctly and those
> records would fall into window [0,50).
>
> Thus, window [0,50) can be dropped, if time advanced to TS = 150, but
> not before that.
>
> -Matthias
>
>
> On 12/13/16 12:06 AM, Sachin Mittal wrote:
> > Hi,
> > So is until for future or past?
> > Say I get first record at t = 0 and until is 100 and my window size is 50
> > advance by 25.
> > I understand it will create windows (0, 50), (25, 75), (50, 100)
> > Now at t = 101 it will drop
> > (0, 50), (25, 75), (50, 100) and create
> > (101, 150), (125, 175), (150, 200)
> >
> > Please confirm if this understanding us correct. It is not clear how it
> > will handle overlapping windows (75, 125) and (175, 225) and so on?
> >
> > What case is not clear again is that at say t = 102 I get some message
> with
> > timestamp 99. What happens then?
> > Will the result added to previous aggregation of (50, 100) or (75, 125),
> > like it should.
> >
> > Or it will recreate the old window (50, 100) and aggregate the value
> there
> > and then drop it. This would result is wrong aggregated value, as it does
> > not consider the previous aggregated values.
> >
> > So this is the pressing case I am not able to understand. Maybe I am
> wrong
> > at some basic understanding.
> >
> >
> > Next for
> > The parameter
> >> windowstore.changelog.additional.retention.ms
> >
> > How does this relate to rentention.ms param of topic config?
> > I create internal topic manually using say rentention.ms=3600000.
> > In next release (post kafka_2.10-0.10.0.1) since we support delete of
> > internal changelog topic as well and I want it to be retained for say
> just
> > 1 hour.
> > So how does that above parameter interfere with this topic level setting.
> > Or now I just need to set above config as 3600000 and not add
> > rentention.ms=3600000
> > while creating internal topic.
> > This is just another doubt remaining here.
> >
> > Thanks
> > Sachin
> >
> >
> >
> > On Tue, Dec 13, 2016 at 3:02 AM, Matthias J. Sax <matthias@confluent.io>
> > wrote:
> >
> >> Sachin,
> >>
> >> There is no reason to have an .until() AND a .retain() -- just increase
> >> the value of .until()
> >>
> >> If you have a window of let's say 1h size and you set .until() also to
> >> 1h -- you can obviously not process any late arriving data. If you set
> >> until() to 2h is this example, you can process data that is up to 1h
> >> delayed.
> >>
> >> So basically, the retention should always be larger than you window
> size.
> >>
> >> The parameter
> >>> windowstore.changelog.additional.retention.ms
> >>
> >> is applies to changelog topics that backup window state stores. Those
> >> changelog topics are compacted. However, the used key does encode an
> >> window ID and thus older data can never be cleaned up by compaction.
> >> Therefore, an additional retention time is applied to those topics, too.
> >> Thus, if an old window is not updated for this amount of time, it will
> >> get deleted eventually preventing this topic to grown infinitely.
> >>
> >> The value will be determined by until(), i.e., whatever you specify in
> >> .until() will be used to set this parameter.
> >>
> >>
> >> -Matthias
> >>
> >> On 12/12/16 1:07 AM, Sachin Mittal wrote:
> >>> Hi,
> >>> We are facing the exact problem as described by Matthias above.
> >>> We are keeping default until which is 1 day.
> >>>
> >>> Our record's times tamp extractor has a field which increases with
> time.
> >>> However for short time we cannot guarantee the time stamp is always
> >>> increases. So at the boundary ie after 24 hrs we can get records which
> >> are
> >>> beyond that windows retention period.
> >>>
> >>> Then it happens like it is mentioned above and our aggregation fails.
> >>>
> >>> So just to sum up when we get record
> >>> 24h + 1 sec (it deletes older window and since the new record belongs
> to
> >>> the new window its gets created)
> >>> Now when we get next record of 24 hs - 1 sec since older window is
> >> dropped
> >>> it does not get aggregated in that bucket.
> >>>
> >>> I suggest we have another setting next to until call retain which
> retains
> >>> the older windows into next window.
> >>>
> >>> I think at stream window boundary level it should use a concept of
> >> sliding
> >>> window. So we can define window like
> >>>
> >>> TimeWindows.of("test-table", 3600 * 1000l).advanceBy(1800 *
> >> 1000l).untill(7
> >>> * 24 * 3600 * 1000l).retain(900 * 1000l)
> >>>
> >>> So after 7 days it retains the data covered by windows in last 15
> minutes
> >>> which rolls over the data in them to next window. This way streams work
> >>> continuously.
> >>>
> >>> Please let us know your thoughts on this.
> >>>
> >>> On another side question on this there is a setting:
> >>>
> >>> windowstore.changelog.additional.retention.ms
> >>> I is not clear what is does. Is this the default for until?
> >>>
> >>> Thanks
> >>> Sachin
> >>>
> >>>
> >>> On Mon, Dec 12, 2016 at 10:17 AM, Matthias J. Sax <
> matthias@confluent.io
> >>>
> >>> wrote:
> >>>
> >>>> Windows are created on demand, ie, each time a new record arrives and
> >>>> there is no window yet for it, a new window will get created.
> >>>>
> >>>> Windows are accepting data until their retention time (that you can
> >>>> configure via .until()) passed. Thus, you will have many windows being
> >>>> open in parallel.
> >>>>
> >>>> If you read older data, they will just be put into the corresponding
> >>>> windows (as long as window retention time did not pass). If a window
> was
> >>>> discarded already, a new window with this single (later arriving)
> record
> >>>> will get created, the computation will be triggered, you get a result,
> >>>> and afterwards the window is deleted again (as it's retention time
> >>>> passed already).
> >>>>
> >>>> The retention time is driven by "stream-time", in internal tracked
> time
> >>>> that only progressed in forward direction. It gets it value from the
> >>>> timestamps provided by TimestampExtractor -- thus, per default it will
> >>>> be event-time.
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 12/11/16 3:47 PM, Jon Yeargers wrote:
> >>>>> I've read this and still have more questions than answers. If my
data
> >>>> skips
> >>>>> about (timewise) what determines when a given window will start
/
> stop
> >>>>> accepting new data? What if Im reading data from some time ago?
> >>>>>
> >>>>> On Sun, Dec 11, 2016 at 2:22 PM, Matthias J. Sax <
> >> matthias@confluent.io>
> >>>>> wrote:
> >>>>>
> >>>>>> Please have a look here:
> >>>>>>
> >>>>>> http://docs.confluent.io/current/streams/developer-
> >>>>>> guide.html#windowing-a-stream
> >>>>>>
> >>>>>> If you have further question, just follow up :)
> >>>>>>
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>>
> >>>>>> On 12/10/16 6:11 PM, Jon Yeargers wrote:
> >>>>>>> Ive added the 'until()' clause to some aggregation steps
and it's
> >>>> working
> >>>>>>> wonders for keeping the size of the state store in useful
> >> boundaries...
> >>>>>> But
> >>>>>>> Im not 100% clear on how it works.
> >>>>>>>
> >>>>>>> What is implied by the '.until()' clause? What determines
when to
> >> stop
> >>>>>>> receiving further data - is it clock time (since the window
was
> >>>> created)?
> >>>>>>> It seems problematic for it to refer to EventTime as this
may
> bounce
> >>>> all
> >>>>>>> over the place. For non-overlapping windows a given record
can only
> >>>> fall
> >>>>>>> into a single aggregation period - so when would a value
get
> >> discarded?
> >>>>>>>
> >>>>>>> Im using 'groupByKey(),aggregate(..., TimeWindows.of(60
*
> >>>>>> 1000L).until(10 *
> >>>>>>> 1000L))'  - but what is this accomplishing?
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> >
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message