kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sachin Mittal <sjmit...@gmail.com>
Subject Re: Kafka Streams window retention period question
Date Fri, 06 Jan 2017 05:59:42 GMT
What should happen when the window 00:00..01:00 will finally get purged
(and the internal stream time will get bumped to say time 10:00) but then I
receive an event <b,00:15 10>? Will it create the 00:00..01:00 window again
or the event will be dropped because it's way older than the internal
stream time?

Once window is finally purged and you get a late message for that window
again, it will create that window, aggregate single value to it and again
drop that window.
So this will be of not much use. Suggestion here is to keep the window
until sufficient time so that you don't get any late message.
Another thing I have found is that if until is long enough compaction (and
deletion in later release), will trigger only after until, and your
changelog topics will be huge. This may result in high local state restore
time if streams are resumed. Also in general I see more lag in case until
is set way high.

Thanks
Sachin



On Fri, Jan 6, 2017 at 10:44 AM, Alexander Demidko <
alexander.demidko@stitchfix.com> wrote:

> Hi Matthias,
>
> Thanks for such a thorough response!
>
> I guess there are cases when a determinism might be preferred over
> computing "more correct results" (e.g. in unit tests, where one manually
> lays out an order of incoming events and wants to get an exact output), but
> from now on I can simply assume that windows might be stored longer than
> the specified time.
>
> Few more questions if you don't mind.
>
> - What should happen when the window 00:00..01:00 will finally get purged
> (and the internal stream time will get bumped to say time 10:00) but then I
> receive an event <b,00:15 10>? Will it create the 00:00..01:00 window again
> or the event will be dropped because it's way older than the internal
> stream time?
>
> - I got a bit confused when you mentioned a key in the window name "open
> (first) >>>b<<<-window". To make it clear – I assume that because
in Kafka
> Streams hopping/tumbling windows are aligned, an internal stream time is
> not related to the aggregation keys but just to the input partitions,
> right? I.e. if I have only one partition there will be only one internal
> stream time watermark regardless of how many keys do I have? Will this
> behavior be the same for sliding windows? Feel free to just point me to the
> code :)
>
> Alex
>
>
> > Hi Alexander,
> >
> > first, both mailing list should be fine :)
> >
> > About internal time tracking: Kafka Streams tracks an internal "stream
> > time" that is determined as the minimum "partition time" over all its
> > input partitions.
> >
> > The "partition time" is tracked for each input partition individually
> > and is the minimum timestamp of all currently buffered record of the
> > partition.
> >
> > So depending on how many records from which partitions are fetch on
> > poll() "stream time" gets advanced accordingly -- this is kinda
> > non-deterministic because we cannot predict what poll() will return.
> >
> > Because all buffered records are considered, "stream time" is advance
> > conservatively. The main idea about this is to keep windows open longer
> > if we know in advance that there will be a late record (as we observe
> > the late record in the buffer already). Thus, we can compute "more
> > correct" results with regard to late arriving records.
> > (Just a heads up, we might change this behavior in future releases --
> > thus, it is not documented anywhere but in the code ;) )
> >
> >
> >
> > About retention time and purging windows: old windows should be dropped
> > after "stream time" advances beyond the point on which it is guaranteed
> > to maintain the window. It should happen "as soon as possible" but there
> > is no strict guarantee (thus "kept at least").
> >
> > Furthermore, Streams applies a minimum retention time of 1 minute --
> > thus, for your specific use case, the 30 seconds you do specify are not
> > used (this is unfortunately not documented :( ). However, this in
> > unrelated to the behavior you see -- I just mention it for completeness.
> >
> >
> > Thus for you specific use case, streams time is most likely not advance
> > to 5:01 when record <b,05:01 10> is processed (as record with TS=0:15 is
> > most likely in the buffer) and thus, the next b-record with TS=15
> > seconds will be added the the still open (first) b-window and both
> > values 2 and 10 get added to 12.
> >
> >
> > Also keep in mind that we do some deduplication on KTable result using
> > an internal cache. This can also influence what output record you see.
> > For further details see:
> >
> http://docs.confluent.io/current/streams/developer-
> guide.html#memory-management
>
>
> > -Matthias
>
>
> On Wed, Jan 4, 2017 at 5:16 PM, Alexander Demidko <
> alexander.demidko@stitchfix.com> wrote:
>
> > Hi folks,
> >
> > I'm experimenting with Kafka Streams windowed aggregation and came across
> > window retention period behavior I don't fully understand.
> > I'm using custom timestamp extractor which gets the timestamp from the
> > payload. Values are aggregated using tumbling time windows and summed by
> > the key.
> > I am using kafka and kafka-streams with 0.10.1.1 version.
> >
> > Full code can be found at https://gist.github.com/xdralex/
> > 845bcf8f06ab0cfcf9785d9f95450b88, but in general I'm doing the
> following:
> >
> > val input: KStream[String, String] = builder.stream(Serdes.String(),
> > Serdes.String(), "TimeInputTopic")
> > val window: Windows[TimeWindow] = TimeWindows.of(60000).
> > advanceBy(60000).until(30000)
> >
> > val aggregated: KTable[Windowed[String], JInt] = input
> >   .mapValues((v: String) => parse(v)._2)
> >   .groupByKey(Serdes.String(), Serdes.Integer())
> >   .reduce((a: JInt, b: JInt) => (a + b).asInstanceOf[JInt], window,
> > "TimeStore1")
> >
> > aggregated.foreach {
> >   (w: Windowed[String], s: JInt) =>
> >     val start = new DateTime(w.window().start(), DateTimeZone.UTC)
> >     val end = new DateTime(w.window().end(), DateTimeZone.UTC)
> >     println(s"Aggregated: $start..$end - ${w.key()} - $s")
> > }
> >
> > Here is the data being sent to TimeInputTopic:
> > a,1970-01-01T00:00:00Z 1
> > b,1970-01-01T00:00:01Z 2
> > a,1970-01-01T00:00:02Z 3
> > b,1970-01-01T00:05:01Z 10
> > b,1970-01-01T00:00:15Z 10
> >
> > Here is the output:
> > Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - a - 4
> > Aggregated: 1970-01-01T00:05:00.000Z..1970-01-01T00:06:00.000Z - b - 10
> > Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 12
> >
> > Here is what confuses me.
> > I would expect that once an event <b,05:01 10> is received, it should
> > update some sort of "current" time value (watermark?), and, because
> 05:01 is
> > bigger than 00:00..01:00 + 30 seconds of retention, either:
> >
> > A) Drop the bucket 00:00:00..00:01:00, and once an event <b,00:15 10> is
> > received, recreate this bucket. This means there would be two outputs for
> > 00:00..01:00:
> > Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 2
> > Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 10
> > Not sure about this behavior because http://docs.confluent.
> > io/3.1.1/streams/developer-guide.html#windowing-a-stream is saying:
> > "Kafka Streams guarantees to keep a window for *at least this specified
> > time*". So I guess the window can be kept longer...
> >
> >
> > B) Just drop the incoming event <b,00:15 10> altogether. In this case
> > there would be only one output for 00:00..01:00:
> > Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 2
> > I could expect this because http://docs.confluent.
> > io/3.1.1/streams/concepts.html#windowing is saying: "If a record arrives
> > after the retention period has passed, the record cannot be processed and
> > is dropped".
> >
> >
> > Hope all this makes sense. Few questions:
> >
> > – If the window can be kept in store longer, are there any thresholds
> when
> > it will finally be purged? For example, it would be nice to flush old
> > buckets if they are taking too much space.
> >
> > – How is "current" time value updated / how do Kafka Streams decide that
> > the retention period has passed? Does it maintain a watermark with the
> > biggest time seen?
> >
> > – What is the right mailing list to ask questions about Kafka Streams? I
> > had a choice between this one and Confluent Platform list, and given that
> > open source part of CP consists from patched vanilla Kafka, was not sure
> > where to write.
> >
> > Thanks,
> > Alex
> >
> >
> >
> >
> >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message