kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: Aggregated windowed counts
Date Thu, 05 Jan 2017 00:35:06 GMT
There is no such thing as a final window aggregate and you might see
intermediate results -- thus the count do not add up.

Please have a look here:

http://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable/38945277#38945277

and here:

http://docs.confluent.io/current/streams/developer-guide.html#memory-management


On each commit, the current intermediate result will be flushed from the
de-duplication cache -- thus, for smaller commit interval you see more
intermediate results and thus it seems to be more off.

In .toStream((k, v) -> k.key()) you get rid of the window-id -- if you
keep it, you can see which result record belong to the same window. The
simplest way for testing would be to use .print() instead of .toStream()
to see the key as window-id plus record-key.


-Matthias


On 1/4/17 2:09 PM, Benjamin Black wrote:
> I'm hoping the DSL will do what I want :) Currently the example is
> continuously adding instead of bucketing, so if I modify it by adding a
> window to the count function:
> 
> .groupBy((key, word) -> word)
> .count(TimeWindows.of(5000L), "Counts")
> .toStream((k, v) -> k.key());
> 
> Then I do see bucketing happening. However, it isn't accurate. For example,
> I type into the console "kafka" as 20 sentences, but the output I get is:
> 
> kafka 4
> kafka 9
> kafka 2
> kafka 7
> 
> Which equals 22. What am I doing wrong? What is the relationship between
> commit interval and time window. The smaller I make commit interval, the
> less accurate it becomes.
> 
> 
> On Wed, Jan 4, 2017 at 3:53 PM Matthias J. Sax <matthias@confluent.io>
> wrote:
> 
>> Do you know about Kafka Streams? It's DSL gives you exactly what you
>> want to do.
>>
>> Check out the documentation and WordCount example:
>>
>> http://docs.confluent.io/current/streams/index.html
>>
>> https://github.com/confluentinc/examples/blob/3.1.x/kafka-streams/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java
>>
>>
>> Let us know if you have further questions.
>>
>>
>> -Matthias
>>
>> On 1/4/17 12:48 PM, Benjamin Black wrote:
>>> Hello,
>>>
>>> I'm looking for guidance on how to approach a counting problem. We want
>> to
>>> consume a stream of data that consists of IDs and generate an output of
>> the
>>> aggregated count with a window size of X seconds using processing time
>> and
>>> a hopping time window. For example, using a window size of 1 second, if
>> we
>>> get IDs 1, 2, 2, 2 in the 1st second, then the output would be 1=1, 2=3.
>> If
>>> we get IDs 1, 3, 3 in the 2nd second then the output would be 1=1, 3=2.
>> The
>>> aggregated count will then be turned into increment commands to a cache
>> and
>>> a database.
>>>
>>> Obviously we will need some state to be stored during the count of a
>>> window, but we only need to keep it for the time period of the window
>> (i.e.
>>> a second). I was thinking this could be achieved by using a persistent
>>> store, where the counts are reset during the punctuate and the store
>> topic
>>> uses log compression. Alternatively, we could simple have an in memory
>>> store that is reset during the punctuate. My concern with the in memory
>>> store is that I don't know when the input topic offset is committed or
>> when
>>> the output data is written and therefore we could lose data. Ultimately,
>> at
>>> the end of the second, the input offset and output data should be written
>>> at the same time, reducing the likelihood of lost data. We would rather
>>> lose data, than have duplicate counts. What is the correct approach? Is
>>> there a better way of tackling the problem?
>>>
>>> I have put together some code, but it doesn't do exactly what I expect.
>> I'm
>>> happy to share if it helps.
>>>
>>> Thanks,
>>> Ben
>>>
>>
>>
> 


Mime
View raw message