kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sachin Mittal <sjmit...@gmail.com>
Subject Re: How does 'TimeWindows.of().until()' work?
Date Thu, 15 Dec 2016 04:22:38 GMT
Hi,
I suggest to include topic config as well as part of streams config
properties like we do for producer and consumer configs.
The topic config supplied would be used for creating internal changelog
topics along with certain additional configs which are applied by default.

This way we don't have to ever create internal topics manually.

I had one doubt regarding until.
Say I specify one value and run my streams app.
Now I stop the app, specify different value and re start the app.

Which value for retain would the old (pre existing) windows use. Would it
be the older value or the new value?

Thanks
Sachin



On Thu, Dec 15, 2016 at 12:26 AM, Matthias J. Sax <matthias@confluent.io>
wrote:

> Understood. Makes sense.
>
> For this, you should apply Streams configs manually when creating those
> topics. For retention parameter, use the value you specify in
> corresponding .until() method for it.
>
>
> -Matthias
>
>
> On 12/14/16 10:08 AM, Sachin Mittal wrote:
> > I was referring to internal change log topic. I had to create them
> manually
> > because in some case the message size of these topic were greater than
> the
> > default ones used by kafka streams.
> >
> > I think someone in this group recommended to create these topic
> manually. I
> > understand that it is better to have internal topics created by streams
> app
> > and I will take a second look at these and see if that can be done.
> >
> > I just wanted to make sure what all configs are applied to internal
> topics
> > in order to decide to avoid them creating manually.
> >
> > Thanks
> > Sachin
> >
> >
> > On Wed, Dec 14, 2016 at 11:08 PM, Matthias J. Sax <matthias@confluent.io
> >
> > wrote:
> >
> >> I am wondering about "I create internal topic manually" -- which topics
> >> do you refer in detail?
> >>
> >> Kafka Streams create all kind of internal topics with auto-generated
> >> names. So it would be quite tricky to create all of them manually
> >> (especially because you need to know those name in advance).
> >>
> >> IRRC, if a topic does exist, Kafka Streams does no change it's
> >> configuration. Only if Kafka Streams does create a topic, it will
> >> specify certain config parameters on topic create step.
> >>
> >>
> >> -Matthias
> >>
> >>
> >>
> >> On 12/13/16 8:16 PM, Sachin Mittal wrote:
> >>> Hi,
> >>> Thanks for the explanation. This illustration makes it super easy to
> >>> understand how until works. Perhaps we can update the wiki with this
> >>> illustration.
> >>> It is basically the retention time for a past window.
> >>> I used to think until creates all the future windows for that period
> and
> >>> when time passes that it used to delete all the past windows. However
> >>> actually until retains a window for specified time. This makes so much
> >> more
> >>> sense.
> >>>
> >>> I just had one pending query regarding:
> >>>
> >>>> windowstore.changelog.additional.retention.ms
> >>>
> >>> How does this relate to rentention.ms param of topic config?
> >>> I create internal topic manually using say rentention.ms=3600000.
> >>> In next release (post kafka_2.10-0.10.0.1) since we support delete of
> >>> internal changelog topic as well and I want it to be retained for say
> >> just
> >>> 1 hour.
> >>> So how does that above parameter interfere with this topic level
> setting.
> >>> Or now I just need to set above config as 3600000 and not add
> >>> rentention.ms=3600000
> >>> while creating internal topic.
> >>>
> >>> Thanks
> >>> Sachin
> >>>
> >>>
> >>> On Tue, Dec 13, 2016 at 11:27 PM, Matthias J. Sax <
> matthias@confluent.io
> >>>
> >>> wrote:
> >>>
> >>>> First, windows are only created if there is actual data for a window.
> So
> >>>> you get windows [0, 50), [25, 75), [50, 100) only if there are record
> >>>> falling into each window (btw: window start-time is inclusive while
> >>>> window end time is exclusive). If you have only 2 record with lets say
> >>>> ts=20 and ts=90 you will not have an open window [25,75). Each window
> is
> >>>> physically created each time the first record for it is processed.
> >>>>
> >>>> If you have above 4 windows and a record with ts=101 arrives, a new
> >>>> window [101,151) will be created. Window [0,50) will not be deleted
> yet,
> >>>> because retention is 100 and thus Streams guarantees that all record
> >>>> with ts >= 1 (= 101 - 100) are still processed correctly and those
> >>>> records would fall into window [0,50).
> >>>>
> >>>> Thus, window [0,50) can be dropped, if time advanced to TS = 150, but
> >>>> not before that.
> >>>>
> >>>> -Matthias
> >>>>
> >>>>
> >>>> On 12/13/16 12:06 AM, Sachin Mittal wrote:
> >>>>> Hi,
> >>>>> So is until for future or past?
> >>>>> Say I get first record at t = 0 and until is 100 and my window size
> is
> >> 50
> >>>>> advance by 25.
> >>>>> I understand it will create windows (0, 50), (25, 75), (50, 100)
> >>>>> Now at t = 101 it will drop
> >>>>> (0, 50), (25, 75), (50, 100) and create
> >>>>> (101, 150), (125, 175), (150, 200)
> >>>>>
> >>>>> Please confirm if this understanding us correct. It is not clear
how
> it
> >>>>> will handle overlapping windows (75, 125) and (175, 225) and so
on?
> >>>>>
> >>>>> What case is not clear again is that at say t = 102 I get some
> message
> >>>> with
> >>>>> timestamp 99. What happens then?
> >>>>> Will the result added to previous aggregation of (50, 100) or (75,
> >> 125),
> >>>>> like it should.
> >>>>>
> >>>>> Or it will recreate the old window (50, 100) and aggregate the value
> >>>> there
> >>>>> and then drop it. This would result is wrong aggregated value, as
it
> >> does
> >>>>> not consider the previous aggregated values.
> >>>>>
> >>>>> So this is the pressing case I am not able to understand. Maybe
I am
> >>>> wrong
> >>>>> at some basic understanding.
> >>>>>
> >>>>>
> >>>>> Next for
> >>>>> The parameter
> >>>>>> windowstore.changelog.additional.retention.ms
> >>>>>
> >>>>> How does this relate to rentention.ms param of topic config?
> >>>>> I create internal topic manually using say rentention.ms=3600000.
> >>>>> In next release (post kafka_2.10-0.10.0.1) since we support delete
of
> >>>>> internal changelog topic as well and I want it to be retained for
say
> >>>> just
> >>>>> 1 hour.
> >>>>> So how does that above parameter interfere with this topic level
> >> setting.
> >>>>> Or now I just need to set above config as 3600000 and not add
> >>>>> rentention.ms=3600000
> >>>>> while creating internal topic.
> >>>>> This is just another doubt remaining here.
> >>>>>
> >>>>> Thanks
> >>>>> Sachin
> >>>>>
> >>>>>
> >>>>>
> >>>>> On Tue, Dec 13, 2016 at 3:02 AM, Matthias J. Sax <
> >> matthias@confluent.io>
> >>>>> wrote:
> >>>>>
> >>>>>> Sachin,
> >>>>>>
> >>>>>> There is no reason to have an .until() AND a .retain() -- just
> >> increase
> >>>>>> the value of .until()
> >>>>>>
> >>>>>> If you have a window of let's say 1h size and you set .until()
also
> to
> >>>>>> 1h -- you can obviously not process any late arriving data.
If you
> set
> >>>>>> until() to 2h is this example, you can process data that is
up to 1h
> >>>>>> delayed.
> >>>>>>
> >>>>>> So basically, the retention should always be larger than you
window
> >>>> size.
> >>>>>>
> >>>>>> The parameter
> >>>>>>> windowstore.changelog.additional.retention.ms
> >>>>>>
> >>>>>> is applies to changelog topics that backup window state stores.
> Those
> >>>>>> changelog topics are compacted. However, the used key does encode
an
> >>>>>> window ID and thus older data can never be cleaned up by compaction.
> >>>>>> Therefore, an additional retention time is applied to those
topics,
> >> too.
> >>>>>> Thus, if an old window is not updated for this amount of time,
it
> will
> >>>>>> get deleted eventually preventing this topic to grown infinitely.
> >>>>>>
> >>>>>> The value will be determined by until(), i.e., whatever you
specify
> in
> >>>>>> .until() will be used to set this parameter.
> >>>>>>
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>> On 12/12/16 1:07 AM, Sachin Mittal wrote:
> >>>>>>> Hi,
> >>>>>>> We are facing the exact problem as described by Matthias
above.
> >>>>>>> We are keeping default until which is 1 day.
> >>>>>>>
> >>>>>>> Our record's times tamp extractor has a field which increases
with
> >>>> time.
> >>>>>>> However for short time we cannot guarantee the time stamp
is always
> >>>>>>> increases. So at the boundary ie after 24 hrs we can get
records
> >> which
> >>>>>> are
> >>>>>>> beyond that windows retention period.
> >>>>>>>
> >>>>>>> Then it happens like it is mentioned above and our aggregation
> fails.
> >>>>>>>
> >>>>>>> So just to sum up when we get record
> >>>>>>> 24h + 1 sec (it deletes older window and since the new record
> belongs
> >>>> to
> >>>>>>> the new window its gets created)
> >>>>>>> Now when we get next record of 24 hs - 1 sec since older
window is
> >>>>>> dropped
> >>>>>>> it does not get aggregated in that bucket.
> >>>>>>>
> >>>>>>> I suggest we have another setting next to until call retain
which
> >>>> retains
> >>>>>>> the older windows into next window.
> >>>>>>>
> >>>>>>> I think at stream window boundary level it should use a
concept of
> >>>>>> sliding
> >>>>>>> window. So we can define window like
> >>>>>>>
> >>>>>>> TimeWindows.of("test-table", 3600 * 1000l).advanceBy(1800
*
> >>>>>> 1000l).untill(7
> >>>>>>> * 24 * 3600 * 1000l).retain(900 * 1000l)
> >>>>>>>
> >>>>>>> So after 7 days it retains the data covered by windows in
last 15
> >>>> minutes
> >>>>>>> which rolls over the data in them to next window. This way
streams
> >> work
> >>>>>>> continuously.
> >>>>>>>
> >>>>>>> Please let us know your thoughts on this.
> >>>>>>>
> >>>>>>> On another side question on this there is a setting:
> >>>>>>>
> >>>>>>> windowstore.changelog.additional.retention.ms
> >>>>>>> I is not clear what is does. Is this the default for until?
> >>>>>>>
> >>>>>>> Thanks
> >>>>>>> Sachin
> >>>>>>>
> >>>>>>>
> >>>>>>> On Mon, Dec 12, 2016 at 10:17 AM, Matthias J. Sax <
> >>>> matthias@confluent.io
> >>>>>>>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Windows are created on demand, ie, each time a new record
arrives
> >> and
> >>>>>>>> there is no window yet for it, a new window will get
created.
> >>>>>>>>
> >>>>>>>> Windows are accepting data until their retention time
(that you
> can
> >>>>>>>> configure via .until()) passed. Thus, you will have
many windows
> >> being
> >>>>>>>> open in parallel.
> >>>>>>>>
> >>>>>>>> If you read older data, they will just be put into the
> corresponding
> >>>>>>>> windows (as long as window retention time did not pass).
If a
> window
> >>>> was
> >>>>>>>> discarded already, a new window with this single (later
arriving)
> >>>> record
> >>>>>>>> will get created, the computation will be triggered,
you get a
> >> result,
> >>>>>>>> and afterwards the window is deleted again (as it's
retention time
> >>>>>>>> passed already).
> >>>>>>>>
> >>>>>>>> The retention time is driven by "stream-time", in internal
tracked
> >>>> time
> >>>>>>>> that only progressed in forward direction. It gets it
value from
> the
> >>>>>>>> timestamps provided by TimestampExtractor -- thus, per
default it
> >> will
> >>>>>>>> be event-time.
> >>>>>>>>
> >>>>>>>> -Matthias
> >>>>>>>>
> >>>>>>>> On 12/11/16 3:47 PM, Jon Yeargers wrote:
> >>>>>>>>> I've read this and still have more questions than
answers. If my
> >> data
> >>>>>>>> skips
> >>>>>>>>> about (timewise) what determines when a given window
will start /
> >>>> stop
> >>>>>>>>> accepting new data? What if Im reading data from
some time ago?
> >>>>>>>>>
> >>>>>>>>> On Sun, Dec 11, 2016 at 2:22 PM, Matthias J. Sax
<
> >>>>>> matthias@confluent.io>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Please have a look here:
> >>>>>>>>>>
> >>>>>>>>>> http://docs.confluent.io/current/streams/developer-
> >>>>>>>>>> guide.html#windowing-a-stream
> >>>>>>>>>>
> >>>>>>>>>> If you have further question, just follow up
:)
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> -Matthias
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On 12/10/16 6:11 PM, Jon Yeargers wrote:
> >>>>>>>>>>> Ive added the 'until()' clause to some aggregation
steps and
> it's
> >>>>>>>> working
> >>>>>>>>>>> wonders for keeping the size of the state
store in useful
> >>>>>> boundaries...
> >>>>>>>>>> But
> >>>>>>>>>>> Im not 100% clear on how it works.
> >>>>>>>>>>>
> >>>>>>>>>>> What is implied by the '.until()' clause?
What determines when
> to
> >>>>>> stop
> >>>>>>>>>>> receiving further data - is it clock time
(since the window was
> >>>>>>>> created)?
> >>>>>>>>>>> It seems problematic for it to refer to
EventTime as this may
> >>>> bounce
> >>>>>>>> all
> >>>>>>>>>>> over the place. For non-overlapping windows
a given record can
> >> only
> >>>>>>>> fall
> >>>>>>>>>>> into a single aggregation period - so when
would a value get
> >>>>>> discarded?
> >>>>>>>>>>>
> >>>>>>>>>>> Im using 'groupByKey(),aggregate(..., TimeWindows.of(60
*
> >>>>>>>>>> 1000L).until(10 *
> >>>>>>>>>>> 1000L))'  - but what is this accomplishing?
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> >
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message