kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: Kafka Streams window retention period question
Date Fri, 06 Jan 2017 06:09:14 GMT
Hi Alex,

if a window was purged because its retention time passed it will not
accept any records anymore -- thus, if a very late record arrives, it
will get dropped without any further notice.

About stream time and partition: yes. And how time is advanced/tracked
in independent for the window type.


-Matthias

On 1/5/17 9:14 PM, Alexander Demidko wrote:
> Hi Matthias,
> 
> Thanks for such a thorough response!
> 
> I guess there are cases when a determinism might be preferred over
> computing "more correct results" (e.g. in unit tests, where one manually
> lays out an order of incoming events and wants to get an exact output), but
> from now on I can simply assume that windows might be stored longer than
> the specified time.
> 
> Few more questions if you don't mind.
> 
> - What should happen when the window 00:00..01:00 will finally get purged
> (and the internal stream time will get bumped to say time 10:00) but then I
> receive an event <b,00:15 10>? Will it create the 00:00..01:00 window again
> or the event will be dropped because it's way older than the internal
> stream time?
> 
> - I got a bit confused when you mentioned a key in the window name "open
> (first) >>>b<<<-window". To make it clear – I assume that because
in Kafka
> Streams hopping/tumbling windows are aligned, an internal stream time is
> not related to the aggregation keys but just to the input partitions,
> right? I.e. if I have only one partition there will be only one internal
> stream time watermark regardless of how many keys do I have? Will this
> behavior be the same for sliding windows? Feel free to just point me to the
> code :)
> 
> Alex
> 
> 
>> Hi Alexander,
>>
>> first, both mailing list should be fine :)
>>
>> About internal time tracking: Kafka Streams tracks an internal "stream
>> time" that is determined as the minimum "partition time" over all its
>> input partitions.
>>
>> The "partition time" is tracked for each input partition individually
>> and is the minimum timestamp of all currently buffered record of the
>> partition.
>>
>> So depending on how many records from which partitions are fetch on
>> poll() "stream time" gets advanced accordingly -- this is kinda
>> non-deterministic because we cannot predict what poll() will return.
>>
>> Because all buffered records are considered, "stream time" is advance
>> conservatively. The main idea about this is to keep windows open longer
>> if we know in advance that there will be a late record (as we observe
>> the late record in the buffer already). Thus, we can compute "more
>> correct" results with regard to late arriving records.
>> (Just a heads up, we might change this behavior in future releases --
>> thus, it is not documented anywhere but in the code ;) )
>>
>>
>>
>> About retention time and purging windows: old windows should be dropped
>> after "stream time" advances beyond the point on which it is guaranteed
>> to maintain the window. It should happen "as soon as possible" but there
>> is no strict guarantee (thus "kept at least").
>>
>> Furthermore, Streams applies a minimum retention time of 1 minute --
>> thus, for your specific use case, the 30 seconds you do specify are not
>> used (this is unfortunately not documented :( ). However, this in
>> unrelated to the behavior you see -- I just mention it for completeness.
>>
>>
>> Thus for you specific use case, streams time is most likely not advance
>> to 5:01 when record <b,05:01 10> is processed (as record with TS=0:15 is
>> most likely in the buffer) and thus, the next b-record with TS=15
>> seconds will be added the the still open (first) b-window and both
>> values 2 and 10 get added to 12.
>>
>>
>> Also keep in mind that we do some deduplication on KTable result using
>> an internal cache. This can also influence what output record you see.
>> For further details see:
>>
> http://docs.confluent.io/current/streams/developer-guide.html#memory-management
> 
> 
>> -Matthias
> 
> 
> On Wed, Jan 4, 2017 at 5:16 PM, Alexander Demidko <
> alexander.demidko@stitchfix.com> wrote:
> 
>> Hi folks,
>>
>> I'm experimenting with Kafka Streams windowed aggregation and came across
>> window retention period behavior I don't fully understand.
>> I'm using custom timestamp extractor which gets the timestamp from the
>> payload. Values are aggregated using tumbling time windows and summed by
>> the key.
>> I am using kafka and kafka-streams with 0.10.1.1 version.
>>
>> Full code can be found at https://gist.github.com/xdralex/
>> 845bcf8f06ab0cfcf9785d9f95450b88, but in general I'm doing the following:
>>
>> val input: KStream[String, String] = builder.stream(Serdes.String(),
>> Serdes.String(), "TimeInputTopic")
>> val window: Windows[TimeWindow] = TimeWindows.of(60000).
>> advanceBy(60000).until(30000)
>>
>> val aggregated: KTable[Windowed[String], JInt] = input
>>   .mapValues((v: String) => parse(v)._2)
>>   .groupByKey(Serdes.String(), Serdes.Integer())
>>   .reduce((a: JInt, b: JInt) => (a + b).asInstanceOf[JInt], window,
>> "TimeStore1")
>>
>> aggregated.foreach {
>>   (w: Windowed[String], s: JInt) =>
>>     val start = new DateTime(w.window().start(), DateTimeZone.UTC)
>>     val end = new DateTime(w.window().end(), DateTimeZone.UTC)
>>     println(s"Aggregated: $start..$end - ${w.key()} - $s")
>> }
>>
>> Here is the data being sent to TimeInputTopic:
>> a,1970-01-01T00:00:00Z 1
>> b,1970-01-01T00:00:01Z 2
>> a,1970-01-01T00:00:02Z 3
>> b,1970-01-01T00:05:01Z 10
>> b,1970-01-01T00:00:15Z 10
>>
>> Here is the output:
>> Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - a - 4
>> Aggregated: 1970-01-01T00:05:00.000Z..1970-01-01T00:06:00.000Z - b - 10
>> Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 12
>>
>> Here is what confuses me.
>> I would expect that once an event <b,05:01 10> is received, it should
>> update some sort of "current" time value (watermark?), and, because 05:01 is
>> bigger than 00:00..01:00 + 30 seconds of retention, either:
>>
>> A) Drop the bucket 00:00:00..00:01:00, and once an event <b,00:15 10> is
>> received, recreate this bucket. This means there would be two outputs for
>> 00:00..01:00:
>> Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 2
>> Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 10
>> Not sure about this behavior because http://docs.confluent.
>> io/3.1.1/streams/developer-guide.html#windowing-a-stream is saying:
>> "Kafka Streams guarantees to keep a window for *at least this specified
>> time*". So I guess the window can be kept longer...
>>
>>
>> B) Just drop the incoming event <b,00:15 10> altogether. In this case
>> there would be only one output for 00:00..01:00:
>> Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 2
>> I could expect this because http://docs.confluent.
>> io/3.1.1/streams/concepts.html#windowing is saying: "If a record arrives
>> after the retention period has passed, the record cannot be processed and
>> is dropped".
>>
>>
>> Hope all this makes sense. Few questions:
>>
>> – If the window can be kept in store longer, are there any thresholds when
>> it will finally be purged? For example, it would be nice to flush old
>> buckets if they are taking too much space.
>>
>> – How is "current" time value updated / how do Kafka Streams decide that
>> the retention period has passed? Does it maintain a watermark with the
>> biggest time seen?
>>
>> – What is the right mailing list to ask questions about Kafka Streams? I
>> had a choice between this one and Confluent Platform list, and given that
>> open source part of CP consists from patched vanilla Kafka, was not sure
>> where to write.
>>
>> Thanks,
>> Alex
>>
>>
>>
>>
>>
>>
> 


Mime
View raw message