kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Damian Guy <damian....@gmail.com>
Subject Re: How does 'TimeWindows.of().until()' work?
Date Mon, 19 Dec 2016 13:12:01 GMT
Yes that is one of the methods. It will be available on the 0.10.2 release
which is due at the beginning of February.

On Mon, 19 Dec 2016 at 12:17 Sachin Mittal <sjmittal@gmail.com> wrote:

> I believe you are talking about this method.
> public <W extends Window, T> KTable<Windowed<K>, T> aggregate(final
> Initializer<T> initializer,
>                                                                   final
> Aggregator<K, V, T> aggregator,
>                                                                   final
> Windows<W> windows,
>                                                                   final
> StateStoreSupplier<WindowStore> storeSupplier)
>
> Will this api be part of next release?
>
> I can go about using this, however if in StateStoreSupplier we add some api
> to update the logConfig, then we can pass all the topic level props as part
> of streams config directly.
>
> Thanks
> Sachin
>
>
>
> On Mon, Dec 19, 2016 at 5:32 PM, Damian Guy <damian.guy@gmail.com> wrote:
>
> > Hi Sachin,
> >
> > I think we have a way of doing what you want already. If you create a
> > custom state store you can call the enableLogging method and pass in any
> > configuration parameters you want: For example:
> >
> > final StateStoreSupplier supplier = Stores.create("store")
> >         .withKeys(Serdes.String())
> >         .withValues(Serdes.String())
> >         .persistent()
> >         .enableLogging(Collections.singletonMap("retention.ms", "1000"))
> >         .build();
> >
> > You can then use the overloaded methods in the DSL to pass in the
> > StateStoreSupplier to your aggregates (trunk only)
> >
> >
> > On Mon, 19 Dec 2016 at 10:58 Sachin Mittal <sjmittal@gmail.com> wrote:
> >
> > > Hi,
> > > I am working towards adding topic configs as part of streams config.
> > > However I have run into an issue:
> > > Code flow is like this
> > >
> > > KStreamBuilder builder = new KStreamBuilder();
> > > builder.stream(...)
> > > ...
> > > KafkaStreams streams = new KafkaStreams(builder, streamsProps);
> > > streams.start();
> > >
> > > So we can see we build the topology before building the streams.
> > > While building topology it assigns state store.
> > > That time no topic config props are available.
> > >
> > > So it creates the supplier with empty topic config.
> > >
> > > Further StateStoreSupplier has method just to get the config and not to
> > > update it.
> > > Map<String, Object> logConfig()
> > >
> > > One way to implement this is change this interface to be able to update
> > the
> > > log config props too.
> > > And we the props are available to streams we update the topology
> > builder's
> > > state stores too with updated config.
> > >
> > > Other way is to change the KStreamBuilder and make it pass the topic
> > > config.
> > > However in second approach we would be splitting the streams config
> into
> > > two parts.
> > >
> > > Let me know how should one proceed with this.
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> > >
> > > On Thu, Dec 15, 2016 at 2:27 PM, Matthias J. Sax <
> matthias@confluent.io>
> > > wrote:
> > >
> > > > I agree. We got already multiple request to add an API for specifying
> > > > topic parameters for internal topic... I am pretty sure we will add
> it
> > > > if time permits -- feel free to contribute this new feature!
> > > >
> > > > About chancing the value of until: that does not work, as the
> changelog
> > > > topic configuration would not be updated.
> > > >
> > > >
> > > > -Matthias
> > > >
> > > > On 12/14/16 8:22 PM, Sachin Mittal wrote:
> > > > > Hi,
> > > > > I suggest to include topic config as well as part of streams config
> > > > > properties like we do for producer and consumer configs.
> > > > > The topic config supplied would be used for creating internal
> > changelog
> > > > > topics along with certain additional configs which are applied by
> > > > default.
> > > > >
> > > > > This way we don't have to ever create internal topics manually.
> > > > >
> > > > > I had one doubt regarding until.
> > > > > Say I specify one value and run my streams app.
> > > > > Now I stop the app, specify different value and re start the app.
> > > > >
> > > > > Which value for retain would the old (pre existing) windows use.
> > Would
> > > it
> > > > > be the older value or the new value?
> > > > >
> > > > > Thanks
> > > > > Sachin
> > > > >
> > > > >
> > > > >
> > > > > On Thu, Dec 15, 2016 at 12:26 AM, Matthias J. Sax <
> > > matthias@confluent.io
> > > > >
> > > > > wrote:
> > > > >
> > > > >> Understood. Makes sense.
> > > > >>
> > > > >> For this, you should apply Streams configs manually when creating
> > > those
> > > > >> topics. For retention parameter, use the value you specify in
> > > > >> corresponding .until() method for it.
> > > > >>
> > > > >>
> > > > >> -Matthias
> > > > >>
> > > > >>
> > > > >> On 12/14/16 10:08 AM, Sachin Mittal wrote:
> > > > >>> I was referring to internal change log topic. I had to create
> them
> > > > >> manually
> > > > >>> because in some case the message size of these topic were
greater
> > > than
> > > > >> the
> > > > >>> default ones used by kafka streams.
> > > > >>>
> > > > >>> I think someone in this group recommended to create these
topic
> > > > >> manually. I
> > > > >>> understand that it is better to have internal topics created
by
> > > streams
> > > > >> app
> > > > >>> and I will take a second look at these and see if that can
be
> done.
> > > > >>>
> > > > >>> I just wanted to make sure what all configs are applied to
> internal
> > > > >> topics
> > > > >>> in order to decide to avoid them creating manually.
> > > > >>>
> > > > >>> Thanks
> > > > >>> Sachin
> > > > >>>
> > > > >>>
> > > > >>> On Wed, Dec 14, 2016 at 11:08 PM, Matthias J. Sax <
> > > > matthias@confluent.io
> > > > >>>
> > > > >>> wrote:
> > > > >>>
> > > > >>>> I am wondering about "I create internal topic manually"
-- which
> > > > topics
> > > > >>>> do you refer in detail?
> > > > >>>>
> > > > >>>> Kafka Streams create all kind of internal topics with
> > auto-generated
> > > > >>>> names. So it would be quite tricky to create all of them
> manually
> > > > >>>> (especially because you need to know those name in advance).
> > > > >>>>
> > > > >>>> IRRC, if a topic does exist, Kafka Streams does no change
it's
> > > > >>>> configuration. Only if Kafka Streams does create a topic,
it
> will
> > > > >>>> specify certain config parameters on topic create step.
> > > > >>>>
> > > > >>>>
> > > > >>>> -Matthias
> > > > >>>>
> > > > >>>>
> > > > >>>>
> > > > >>>> On 12/13/16 8:16 PM, Sachin Mittal wrote:
> > > > >>>>> Hi,
> > > > >>>>> Thanks for the explanation. This illustration makes
it super
> easy
> > > to
> > > > >>>>> understand how until works. Perhaps we can update
the wiki with
> > > this
> > > > >>>>> illustration.
> > > > >>>>> It is basically the retention time for a past window.
> > > > >>>>> I used to think until creates all the future windows
for that
> > > period
> > > > >> and
> > > > >>>>> when time passes that it used to delete all the past
windows.
> > > However
> > > > >>>>> actually until retains a window for specified time.
This makes
> so
> > > > much
> > > > >>>> more
> > > > >>>>> sense.
> > > > >>>>>
> > > > >>>>> I just had one pending query regarding:
> > > > >>>>>
> > > > >>>>>> windowstore.changelog.additional.retention.ms
> > > > >>>>>
> > > > >>>>> How does this relate to rentention.ms param of topic
config?
> > > > >>>>> I create internal topic manually using say rentention.ms
> > =3600000.
> > > > >>>>> In next release (post kafka_2.10-0.10.0.1) since
we support
> > delete
> > > of
> > > > >>>>> internal changelog topic as well and I want it to
be retained
> for
> > > say
> > > > >>>> just
> > > > >>>>> 1 hour.
> > > > >>>>> So how does that above parameter interfere with this
topic
> level
> > > > >> setting.
> > > > >>>>> Or now I just need to set above config as 3600000
and not add
> > > > >>>>> rentention.ms=3600000
> > > > >>>>> while creating internal topic.
> > > > >>>>>
> > > > >>>>> Thanks
> > > > >>>>> Sachin
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> On Tue, Dec 13, 2016 at 11:27 PM, Matthias J. Sax
<
> > > > >> matthias@confluent.io
> > > > >>>>>
> > > > >>>>> wrote:
> > > > >>>>>
> > > > >>>>>> First, windows are only created if there is actual
data for a
> > > > window.
> > > > >> So
> > > > >>>>>> you get windows [0, 50), [25, 75), [50, 100)
only if there are
> > > > record
> > > > >>>>>> falling into each window (btw: window start-time
is inclusive
> > > while
> > > > >>>>>> window end time is exclusive). If you have only
2 record with
> > lets
> > > > say
> > > > >>>>>> ts=20 and ts=90 you will not have an open window
[25,75). Each
> > > > window
> > > > >> is
> > > > >>>>>> physically created each time the first record
for it is
> > processed.
> > > > >>>>>>
> > > > >>>>>> If you have above 4 windows and a record with
ts=101 arrives,
> a
> > > new
> > > > >>>>>> window [101,151) will be created. Window [0,50)
will not be
> > > deleted
> > > > >> yet,
> > > > >>>>>> because retention is 100 and thus Streams guarantees
that all
> > > record
> > > > >>>>>> with ts >= 1 (= 101 - 100) are still processed
correctly and
> > those
> > > > >>>>>> records would fall into window [0,50).
> > > > >>>>>>
> > > > >>>>>> Thus, window [0,50) can be dropped, if time advanced
to TS =
> > 150,
> > > > but
> > > > >>>>>> not before that.
> > > > >>>>>>
> > > > >>>>>> -Matthias
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>> On 12/13/16 12:06 AM, Sachin Mittal wrote:
> > > > >>>>>>> Hi,
> > > > >>>>>>> So is until for future or past?
> > > > >>>>>>> Say I get first record at t = 0 and until
is 100 and my
> window
> > > size
> > > > >> is
> > > > >>>> 50
> > > > >>>>>>> advance by 25.
> > > > >>>>>>> I understand it will create windows (0, 50),
(25, 75), (50,
> > 100)
> > > > >>>>>>> Now at t = 101 it will drop
> > > > >>>>>>> (0, 50), (25, 75), (50, 100) and create
> > > > >>>>>>> (101, 150), (125, 175), (150, 200)
> > > > >>>>>>>
> > > > >>>>>>> Please confirm if this understanding us correct.
It is not
> > clear
> > > > how
> > > > >> it
> > > > >>>>>>> will handle overlapping windows (75, 125)
and (175, 225) and
> so
> > > on?
> > > > >>>>>>>
> > > > >>>>>>> What case is not clear again is that at say
t = 102 I get
> some
> > > > >> message
> > > > >>>>>> with
> > > > >>>>>>> timestamp 99. What happens then?
> > > > >>>>>>> Will the result added to previous aggregation
of (50, 100) or
> > > (75,
> > > > >>>> 125),
> > > > >>>>>>> like it should.
> > > > >>>>>>>
> > > > >>>>>>> Or it will recreate the old window (50, 100)
and aggregate
> the
> > > > value
> > > > >>>>>> there
> > > > >>>>>>> and then drop it. This would result is wrong
aggregated
> value,
> > as
> > > > it
> > > > >>>> does
> > > > >>>>>>> not consider the previous aggregated values.
> > > > >>>>>>>
> > > > >>>>>>> So this is the pressing case I am not able
to understand.
> > Maybe I
> > > > am
> > > > >>>>>> wrong
> > > > >>>>>>> at some basic understanding.
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> Next for
> > > > >>>>>>> The parameter
> > > > >>>>>>>> windowstore.changelog.additional.retention.ms
> > > > >>>>>>>
> > > > >>>>>>> How does this relate to rentention.ms param
of topic config?
> > > > >>>>>>> I create internal topic manually using say
rentention.ms
> > > =3600000.
> > > > >>>>>>> In next release (post kafka_2.10-0.10.0.1)
since we support
> > > delete
> > > > of
> > > > >>>>>>> internal changelog topic as well and I want
it to be retained
> > for
> > > > say
> > > > >>>>>> just
> > > > >>>>>>> 1 hour.
> > > > >>>>>>> So how does that above parameter interfere
with this topic
> > level
> > > > >>>> setting.
> > > > >>>>>>> Or now I just need to set above config as
3600000 and not add
> > > > >>>>>>> rentention.ms=3600000
> > > > >>>>>>> while creating internal topic.
> > > > >>>>>>> This is just another doubt remaining here.
> > > > >>>>>>>
> > > > >>>>>>> Thanks
> > > > >>>>>>> Sachin
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> On Tue, Dec 13, 2016 at 3:02 AM, Matthias
J. Sax <
> > > > >>>> matthias@confluent.io>
> > > > >>>>>>> wrote:
> > > > >>>>>>>
> > > > >>>>>>>> Sachin,
> > > > >>>>>>>>
> > > > >>>>>>>> There is no reason to have an .until()
AND a .retain() --
> just
> > > > >>>> increase
> > > > >>>>>>>> the value of .until()
> > > > >>>>>>>>
> > > > >>>>>>>> If you have a window of let's say 1h
size and you set
> .until()
> > > > also
> > > > >> to
> > > > >>>>>>>> 1h -- you can obviously not process any
late arriving data.
> If
> > > you
> > > > >> set
> > > > >>>>>>>> until() to 2h is this example, you can
process data that is
> up
> > > to
> > > > 1h
> > > > >>>>>>>> delayed.
> > > > >>>>>>>>
> > > > >>>>>>>> So basically, the retention should always
be larger than you
> > > > window
> > > > >>>>>> size.
> > > > >>>>>>>>
> > > > >>>>>>>> The parameter
> > > > >>>>>>>>> windowstore.changelog.additional.retention.ms
> > > > >>>>>>>>
> > > > >>>>>>>> is applies to changelog topics that backup
window state
> > stores.
> > > > >> Those
> > > > >>>>>>>> changelog topics are compacted. However,
the used key does
> > > encode
> > > > an
> > > > >>>>>>>> window ID and thus older data can never
be cleaned up by
> > > > compaction.
> > > > >>>>>>>> Therefore, an additional retention time
is applied to those
> > > > topics,
> > > > >>>> too.
> > > > >>>>>>>> Thus, if an old window is not updated
for this amount of
> time,
> > > it
> > > > >> will
> > > > >>>>>>>> get deleted eventually preventing this
topic to grown
> > > infinitely.
> > > > >>>>>>>>
> > > > >>>>>>>> The value will be determined by until(),
i.e., whatever you
> > > > specify
> > > > >> in
> > > > >>>>>>>> .until() will be used to set this parameter.
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>> -Matthias
> > > > >>>>>>>>
> > > > >>>>>>>> On 12/12/16 1:07 AM, Sachin Mittal wrote:
> > > > >>>>>>>>> Hi,
> > > > >>>>>>>>> We are facing the exact problem as
described by Matthias
> > above.
> > > > >>>>>>>>> We are keeping default until which
is 1 day.
> > > > >>>>>>>>>
> > > > >>>>>>>>> Our record's times tamp extractor
has a field which
> increases
> > > > with
> > > > >>>>>> time.
> > > > >>>>>>>>> However for short time we cannot
guarantee the time stamp
> is
> > > > always
> > > > >>>>>>>>> increases. So at the boundary ie
after 24 hrs we can get
> > > records
> > > > >>>> which
> > > > >>>>>>>> are
> > > > >>>>>>>>> beyond that windows retention period.
> > > > >>>>>>>>>
> > > > >>>>>>>>> Then it happens like it is mentioned
above and our
> > aggregation
> > > > >> fails.
> > > > >>>>>>>>>
> > > > >>>>>>>>> So just to sum up when we get record
> > > > >>>>>>>>> 24h + 1 sec (it deletes older window
and since the new
> record
> > > > >> belongs
> > > > >>>>>> to
> > > > >>>>>>>>> the new window its gets created)
> > > > >>>>>>>>> Now when we get next record of 24
hs - 1 sec since older
> > window
> > > > is
> > > > >>>>>>>> dropped
> > > > >>>>>>>>> it does not get aggregated in that
bucket.
> > > > >>>>>>>>>
> > > > >>>>>>>>> I suggest we have another setting
next to until call retain
> > > which
> > > > >>>>>> retains
> > > > >>>>>>>>> the older windows into next window.
> > > > >>>>>>>>>
> > > > >>>>>>>>> I think at stream window boundary
level it should use a
> > concept
> > > > of
> > > > >>>>>>>> sliding
> > > > >>>>>>>>> window. So we can define window like
> > > > >>>>>>>>>
> > > > >>>>>>>>> TimeWindows.of("test-table", 3600
* 1000l).advanceBy(1800 *
> > > > >>>>>>>> 1000l).untill(7
> > > > >>>>>>>>> * 24 * 3600 * 1000l).retain(900 *
1000l)
> > > > >>>>>>>>>
> > > > >>>>>>>>> So after 7 days it retains the data
covered by windows in
> > last
> > > 15
> > > > >>>>>> minutes
> > > > >>>>>>>>> which rolls over the data in them
to next window. This way
> > > > streams
> > > > >>>> work
> > > > >>>>>>>>> continuously.
> > > > >>>>>>>>>
> > > > >>>>>>>>> Please let us know your thoughts
on this.
> > > > >>>>>>>>>
> > > > >>>>>>>>> On another side question on this
there is a setting:
> > > > >>>>>>>>>
> > > > >>>>>>>>> windowstore.changelog.additional.retention.ms
> > > > >>>>>>>>> I is not clear what is does. Is this
the default for until?
> > > > >>>>>>>>>
> > > > >>>>>>>>> Thanks
> > > > >>>>>>>>> Sachin
> > > > >>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>> On Mon, Dec 12, 2016 at 10:17 AM,
Matthias J. Sax <
> > > > >>>>>> matthias@confluent.io
> > > > >>>>>>>>>
> > > > >>>>>>>>> wrote:
> > > > >>>>>>>>>
> > > > >>>>>>>>>> Windows are created on demand,
ie, each time a new record
> > > > arrives
> > > > >>>> and
> > > > >>>>>>>>>> there is no window yet for it,
a new window will get
> > created.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> Windows are accepting data until
their retention time
> (that
> > > you
> > > > >> can
> > > > >>>>>>>>>> configure via .until()) passed.
Thus, you will have many
> > > windows
> > > > >>>> being
> > > > >>>>>>>>>> open in parallel.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> If you read older data, they
will just be put into the
> > > > >> corresponding
> > > > >>>>>>>>>> windows (as long as window retention
time did not pass).
> If
> > a
> > > > >> window
> > > > >>>>>> was
> > > > >>>>>>>>>> discarded already, a new window
with this single (later
> > > > arriving)
> > > > >>>>>> record
> > > > >>>>>>>>>> will get created, the computation
will be triggered, you
> > get a
> > > > >>>> result,
> > > > >>>>>>>>>> and afterwards the window is
deleted again (as it's
> > retention
> > > > time
> > > > >>>>>>>>>> passed already).
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> The retention time is driven
by "stream-time", in internal
> > > > tracked
> > > > >>>>>> time
> > > > >>>>>>>>>> that only progressed in forward
direction. It gets it
> value
> > > from
> > > > >> the
> > > > >>>>>>>>>> timestamps provided by TimestampExtractor
-- thus, per
> > default
> > > > it
> > > > >>>> will
> > > > >>>>>>>>>> be event-time.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> -Matthias
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> On 12/11/16 3:47 PM, Jon Yeargers
wrote:
> > > > >>>>>>>>>>> I've read this and still
have more questions than
> answers.
> > If
> > > > my
> > > > >>>> data
> > > > >>>>>>>>>> skips
> > > > >>>>>>>>>>> about (timewise) what determines
when a given window will
> > > > start /
> > > > >>>>>> stop
> > > > >>>>>>>>>>> accepting new data? What
if Im reading data from some
> time
> > > ago?
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> On Sun, Dec 11, 2016 at 2:22
PM, Matthias J. Sax <
> > > > >>>>>>>> matthias@confluent.io>
> > > > >>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>>> Please have a look here:
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> http://docs.confluent.io/current/streams/developer-
> > > > >>>>>>>>>>>> guide.html#windowing-a-stream
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> If you have further question,
just follow up :)
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> -Matthias
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> On 12/10/16 6:11 PM,
Jon Yeargers wrote:
> > > > >>>>>>>>>>>>> Ive added the 'until()'
clause to some aggregation
> steps
> > > and
> > > > >> it's
> > > > >>>>>>>>>> working
> > > > >>>>>>>>>>>>> wonders for keeping
the size of the state store in
> useful
> > > > >>>>>>>> boundaries...
> > > > >>>>>>>>>>>> But
> > > > >>>>>>>>>>>>> Im not 100% clear
on how it works.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> What is implied by
the '.until()' clause? What
> determines
> > > > when
> > > > >> to
> > > > >>>>>>>> stop
> > > > >>>>>>>>>>>>> receiving further
data - is it clock time (since the
> > window
> > > > was
> > > > >>>>>>>>>> created)?
> > > > >>>>>>>>>>>>> It seems problematic
for it to refer to EventTime as
> this
> > > may
> > > > >>>>>> bounce
> > > > >>>>>>>>>> all
> > > > >>>>>>>>>>>>> over the place. For
non-overlapping windows a given
> > record
> > > > can
> > > > >>>> only
> > > > >>>>>>>>>> fall
> > > > >>>>>>>>>>>>> into a single aggregation
period - so when would a
> value
> > > get
> > > > >>>>>>>> discarded?
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Im using 'groupByKey(),aggregate(...,
> TimeWindows.of(60 *
> > > > >>>>>>>>>>>> 1000L).until(10 *
> > > > >>>>>>>>>>>>> 1000L))'  - but what
is this accomplishing?
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>
> > > > >>>>
> > > > >>>>
> > > > >>>
> > > > >>
> > > > >>
> > > > >
> > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message