kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jon Yeargers <jon.yearg...@cedexis.com>
Subject Re: How does 'TimeWindows.of().until()' work?
Date Tue, 13 Dec 2016 18:01:45 GMT
So a given window (with a '.until()' setting) is triggered for closing by
the presence of a record outside the .until() setting?

If the timestamps for records jump about  by a value larger than the .until
value you could have windows being created / deleted quite a bit then?

On Tue, Dec 13, 2016 at 9:57 AM, Matthias J. Sax <matthias@confluent.io>
wrote:

> First, windows are only created if there is actual data for a window. So
> you get windows [0, 50), [25, 75), [50, 100) only if there are record
> falling into each window (btw: window start-time is inclusive while
> window end time is exclusive). If you have only 2 record with lets say
> ts=20 and ts=90 you will not have an open window [25,75). Each window is
> physically created each time the first record for it is processed.
>
> If you have above 4 windows and a record with ts=101 arrives, a new
> window [101,151) will be created. Window [0,50) will not be deleted yet,
> because retention is 100 and thus Streams guarantees that all record
> with ts >= 1 (= 101 - 100) are still processed correctly and those
> records would fall into window [0,50).
>
> Thus, window [0,50) can be dropped, if time advanced to TS = 150, but
> not before that.
>
> -Matthias
>
>
> On 12/13/16 12:06 AM, Sachin Mittal wrote:
> > Hi,
> > So is until for future or past?
> > Say I get first record at t = 0 and until is 100 and my window size is 50
> > advance by 25.
> > I understand it will create windows (0, 50), (25, 75), (50, 100)
> > Now at t = 101 it will drop
> > (0, 50), (25, 75), (50, 100) and create
> > (101, 150), (125, 175), (150, 200)
> >
> > Please confirm if this understanding us correct. It is not clear how it
> > will handle overlapping windows (75, 125) and (175, 225) and so on?
> >
> > What case is not clear again is that at say t = 102 I get some message
> with
> > timestamp 99. What happens then?
> > Will the result added to previous aggregation of (50, 100) or (75, 125),
> > like it should.
> >
> > Or it will recreate the old window (50, 100) and aggregate the value
> there
> > and then drop it. This would result is wrong aggregated value, as it does
> > not consider the previous aggregated values.
> >
> > So this is the pressing case I am not able to understand. Maybe I am
> wrong
> > at some basic understanding.
> >
> >
> > Next for
> > The parameter
> >> windowstore.changelog.additional.retention.ms
> >
> > How does this relate to rentention.ms param of topic config?
> > I create internal topic manually using say rentention.ms=3600000.
> > In next release (post kafka_2.10-0.10.0.1) since we support delete of
> > internal changelog topic as well and I want it to be retained for say
> just
> > 1 hour.
> > So how does that above parameter interfere with this topic level setting.
> > Or now I just need to set above config as 3600000 and not add
> > rentention.ms=3600000
> > while creating internal topic.
> > This is just another doubt remaining here.
> >
> > Thanks
> > Sachin
> >
> >
> >
> > On Tue, Dec 13, 2016 at 3:02 AM, Matthias J. Sax <matthias@confluent.io>
> > wrote:
> >
> >> Sachin,
> >>
> >> There is no reason to have an .until() AND a .retain() -- just increase
> >> the value of .until()
> >>
> >> If you have a window of let's say 1h size and you set .until() also to
> >> 1h -- you can obviously not process any late arriving data. If you set
> >> until() to 2h is this example, you can process data that is up to 1h
> >> delayed.
> >>
> >> So basically, the retention should always be larger than you window
> size.
> >>
> >> The parameter
> >>> windowstore.changelog.additional.retention.ms
> >>
> >> is applies to changelog topics that backup window state stores. Those
> >> changelog topics are compacted. However, the used key does encode an
> >> window ID and thus older data can never be cleaned up by compaction.
> >> Therefore, an additional retention time is applied to those topics, too.
> >> Thus, if an old window is not updated for this amount of time, it will
> >> get deleted eventually preventing this topic to grown infinitely.
> >>
> >> The value will be determined by until(), i.e., whatever you specify in
> >> .until() will be used to set this parameter.
> >>
> >>
> >> -Matthias
> >>
> >> On 12/12/16 1:07 AM, Sachin Mittal wrote:
> >>> Hi,
> >>> We are facing the exact problem as described by Matthias above.
> >>> We are keeping default until which is 1 day.
> >>>
> >>> Our record's times tamp extractor has a field which increases with
> time.
> >>> However for short time we cannot guarantee the time stamp is always
> >>> increases. So at the boundary ie after 24 hrs we can get records which
> >> are
> >>> beyond that windows retention period.
> >>>
> >>> Then it happens like it is mentioned above and our aggregation fails.
> >>>
> >>> So just to sum up when we get record
> >>> 24h + 1 sec (it deletes older window and since the new record belongs
> to
> >>> the new window its gets created)
> >>> Now when we get next record of 24 hs - 1 sec since older window is
> >> dropped
> >>> it does not get aggregated in that bucket.
> >>>
> >>> I suggest we have another setting next to until call retain which
> retains
> >>> the older windows into next window.
> >>>
> >>> I think at stream window boundary level it should use a concept of
> >> sliding
> >>> window. So we can define window like
> >>>
> >>> TimeWindows.of("test-table", 3600 * 1000l).advanceBy(1800 *
> >> 1000l).untill(7
> >>> * 24 * 3600 * 1000l).retain(900 * 1000l)
> >>>
> >>> So after 7 days it retains the data covered by windows in last 15
> minutes
> >>> which rolls over the data in them to next window. This way streams work
> >>> continuously.
> >>>
> >>> Please let us know your thoughts on this.
> >>>
> >>> On another side question on this there is a setting:
> >>>
> >>> windowstore.changelog.additional.retention.ms
> >>> I is not clear what is does. Is this the default for until?
> >>>
> >>> Thanks
> >>> Sachin
> >>>
> >>>
> >>> On Mon, Dec 12, 2016 at 10:17 AM, Matthias J. Sax <
> matthias@confluent.io
> >>>
> >>> wrote:
> >>>
> >>>> Windows are created on demand, ie, each time a new record arrives and
> >>>> there is no window yet for it, a new window will get created.
> >>>>
> >>>> Windows are accepting data until their retention time (that you can
> >>>> configure via .until()) passed. Thus, you will have many windows being
> >>>> open in parallel.
> >>>>
> >>>> If you read older data, they will just be put into the corresponding
> >>>> windows (as long as window retention time did not pass). If a window
> was
> >>>> discarded already, a new window with this single (later arriving)
> record
> >>>> will get created, the computation will be triggered, you get a result,
> >>>> and afterwards the window is deleted again (as it's retention time
> >>>> passed already).
> >>>>
> >>>> The retention time is driven by "stream-time", in internal tracked
> time
> >>>> that only progressed in forward direction. It gets it value from the
> >>>> timestamps provided by TimestampExtractor -- thus, per default it will
> >>>> be event-time.
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 12/11/16 3:47 PM, Jon Yeargers wrote:
> >>>>> I've read this and still have more questions than answers. If my
data
> >>>> skips
> >>>>> about (timewise) what determines when a given window will start
/
> stop
> >>>>> accepting new data? What if Im reading data from some time ago?
> >>>>>
> >>>>> On Sun, Dec 11, 2016 at 2:22 PM, Matthias J. Sax <
> >> matthias@confluent.io>
> >>>>> wrote:
> >>>>>
> >>>>>> Please have a look here:
> >>>>>>
> >>>>>> http://docs.confluent.io/current/streams/developer-
> >>>>>> guide.html#windowing-a-stream
> >>>>>>
> >>>>>> If you have further question, just follow up :)
> >>>>>>
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>>
> >>>>>> On 12/10/16 6:11 PM, Jon Yeargers wrote:
> >>>>>>> Ive added the 'until()' clause to some aggregation steps
and it's
> >>>> working
> >>>>>>> wonders for keeping the size of the state store in useful
> >> boundaries...
> >>>>>> But
> >>>>>>> Im not 100% clear on how it works.
> >>>>>>>
> >>>>>>> What is implied by the '.until()' clause? What determines
when to
> >> stop
> >>>>>>> receiving further data - is it clock time (since the window
was
> >>>> created)?
> >>>>>>> It seems problematic for it to refer to EventTime as this
may
> bounce
> >>>> all
> >>>>>>> over the place. For non-overlapping windows a given record
can only
> >>>> fall
> >>>>>>> into a single aggregation period - so when would a value
get
> >> discarded?
> >>>>>>>
> >>>>>>> Im using 'groupByKey(),aggregate(..., TimeWindows.of(60
*
> >>>>>> 1000L).until(10 *
> >>>>>>> 1000L))'  - but what is this accomplishing?
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> >
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message