kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: Kafka windowed table not aggregating correctly
Date Thu, 08 Dec 2016 18:46:01 GMT
Hello Sachin,

I am with you that ideally the windowing segmentation implementation should
be totally abstracted from users but today it is a bit confusing to
understand. I have filed JIRA some time ago to improve on this end:

https://issues.apache.org/jira/browse/KAFKA-3596

So to your example, if a "far future record" was received whose timestamp
is beyond current time + the retention period, it could potentially cause
the current window to be dropped.


Guozhang


On Fri, Dec 2, 2016 at 10:07 PM, Sachin Mittal <sjmittal@gmail.com> wrote:

> Hi,
> I think now it makes all the sense. The field I was using for timestamp
> extractor contains timestamps which spans for greater than a day's duration
> and it worked for wall clock because for short duration timestamps were in
> day's range.
>
> I wanted to understand one thing:
> Say I have a timestamp extractor field and as record gets ingested future
> records will have increasing values for the timestamp.
>
> Now lets say default duration is one day. At a future time a record will
> have timestamp which now is greater than the initial day's range.
> What will happen then, it will create a new segment and then create windows
> in it for the next day's duration?
> What happens if now it gets a record from the previous day, will it get
> discarded or will it again have just the single value aggregated in it
> (previous values are lost).
> So when new segment is create as I understand does it retain the older
> segments data.
>
> This is bit confusing, so would be helpful if you can explain in bit more
> detail.
>
> Thanks
> Sachin
>
>
> On Sat, Dec 3, 2016 at 5:18 AM, Guozhang Wang <wangguoz@gmail.com> wrote:
>
> > Sachin,
> >
> > One thing to note is that the retention of the windowed stores works by
> > keeping multiple segments of the stores where each segments stores a time
> > range which can potentially span multiple windows, if a new window needs
> to
> > be created that is further from the oldest segment's time range +
> retention
> > period (from your code it seems you do not override it from
> > TimeWindows.of("stream-table",
> > 10 * 1000L).advanceBy(5 * 1000L), via until(...)), so the default of one
> > day is used.
> >
> > So with WallclockTimeExtractor since it is using system time, it wont
> give
> > you timestamps that span for more than a day during a short period of
> time,
> > but if your own defined timestamps expand that value, then old segments
> > will be dropped immediately and hence the aggregate values will be
> returned
> > as a single value.
> >
> > Guozhang
> >
> >
> > On Fri, Dec 2, 2016 at 11:58 AM, Matthias J. Sax <matthias@confluent.io>
> > wrote:
> >
> > > The extractor is used in
> > >
> > > org.apache.kafka.streams.processor.internals.
> RecordQueue#addRawRecords()
> > >
> > > Let us know, if you could resolve the problem or need more help.
> > >
> > > -Matthias
> > >
> > > On 12/2/16 11:46 AM, Sachin Mittal wrote:
> > > > https://github.com/SOHU-Co/kafka-node/ this is the node js client i
> am
> > > > using. The version is 0.5x. Can you please tell me what code in
> streams
> > > > calls the timestamp extractor. I can look there to see if there is
> any
> > > > issue.
> > > >
> > > > Again issue happens only when producing the messages using producer
> > that
> > > is
> > > > compatible with kafka version 0.8x. I see that this producer does not
> > > send
> > > > a record timestamp as this was introduced in version 0.10 only.
> > > >
> > > > Thanks
> > > > Sachin
> > > >
> > > > On 3 Dec 2016 1:03 a.m., "Matthias J. Sax" <matthias@confluent.io>
> > > wrote:
> > > >
> > > >> I am not sure what is happening. That's why it would be good to
> have a
> > > >> toy example to reproduce the issue.
> > > >>
> > > >> What do you mean by "Kafka node version 0.5"?
> > > >>
> > > >> -Matthias
> > > >>
> > > >> On 12/2/16 11:30 AM, Sachin Mittal wrote:
> > > >>> I can provide with the data but data does not seem to be the issue.
> > > >>> If I submit the same data and use same timestamp extractor  using
> the
> > > >> java
> > > >>> client with kafka version 0.10.0.1 aggregation works fine.
> > > >>> I find the issue only when submitting the data with kafka node
> > version
> > > >> 0.5.
> > > >>> It looks like the stream does not extract the time correctly in
> that
> > > >> case.
> > > >>>
> > > >>> Thanks
> > > >>> Sachin
> > > >>>
> > > >>> On 2 Dec 2016 11:41 p.m., "Matthias J. Sax" <matthias@confluent.io
> >
> > > >> wrote:
> > > >>>
> > > >>>> Can you provide example input data (including timetamps) and
> result.
> > > >>>> What is the expected result (ie, what aggregation do you apply)?
> > > >>>>
> > > >>>>
> > > >>>> -Matthias
> > > >>>>
> > > >>>> On 12/2/16 7:43 AM, Sachin Mittal wrote:
> > > >>>>> Hi,
> > > >>>>> After much debugging I found an issue with timestamp extractor.
> > > >>>>>
> > > >>>>> If I use a custom timestamp extractor with following code:
> > > >>>>>     public static class MessageTimestampExtractor implements
> > > >>>>> TimestampExtractor {
> > > >>>>>         public long extract(ConsumerRecord<Object,
Object>
> record)
> > {
> > > >>>>>             if (record.value() instanceof Message) {
> > > >>>>>                 return ((Message) record.value()).ts;
> > > >>>>>             } else {
> > > >>>>>                 return record.timestamp();
> > > >>>>>             }
> > > >>>>>         }
> > > >>>>>     }
> > > >>>>>
> > > >>>>> Here message has a long field ts which stores the timestamp,
the
> > > >>>>> aggregation does not work.
> > > >>>>> Note I have checked and ts has valid timestamp values.
> > > >>>>>
> > > >>>>> However if I replace it with say WallclockTimestampExtractor
> > > >> aggregation
> > > >>>> is
> > > >>>>> working fine.
> > > >>>>>
> > > >>>>> I do not understand what could be the issue here.
> > > >>>>>
> > > >>>>> Also note I am using kafka streams version 0.10.0.1 and
I am
> > > publishing
> > > >>>>> messages via
> > > >>>>> https://github.com/SOHU-Co/kafka-node/ whose version is
quite
> old
> > > >> 0.5.x
> > > >>>>>
> > > >>>>> Let me know if there is some bug in time stamp extractions.
> > > >>>>>
> > > >>>>> Thanks
> > > >>>>> Sachin
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>> On Mon, Nov 28, 2016 at 11:52 PM, Guozhang Wang <
> > wangguoz@gmail.com>
> > > >>>> wrote:
> > > >>>>>
> > > >>>>>> Sachin,
> > > >>>>>>
> > > >>>>>> This is indeed a bit wired, and we'd like to try to
re-produce
> > your
> > > >>>> issue
> > > >>>>>> locally. Do you have a sample input data for us to
try out?
> > > >>>>>>
> > > >>>>>> Guozhang
> > > >>>>>>
> > > >>>>>> On Fri, Nov 25, 2016 at 10:12 PM, Sachin Mittal <
> > sjmittal@gmail.com
> > > >
> > > >>>>>> wrote:
> > > >>>>>>
> > > >>>>>>> Hi,
> > > >>>>>>> I fixed that sorted set issue but I am facing
a weird problem
> > > which I
> > > >>>> am
> > > >>>>>>> not able to replicate.
> > > >>>>>>>
> > > >>>>>>> Here is the sample problem that I could isolate:
> > > >>>>>>> My class is like this:
> > > >>>>>>>     public static class Message implements Comparable<Message>
> {
> > > >>>>>>>         public long ts;
> > > >>>>>>>         public String message;
> > > >>>>>>>         public String key;
> > > >>>>>>>         public Message() {};
> > > >>>>>>>         public Message(long ts, String message,
String key) {
> > > >>>>>>>             this.ts = ts;
> > > >>>>>>>             this.key = key;
> > > >>>>>>>             this.message = message;
> > > >>>>>>>         }
> > > >>>>>>>         public int compareTo(Message paramT) {
> > > >>>>>>>             long ts1 = paramT.ts;
> > > >>>>>>>             return ts > ts1 ? 1 : -1;
> > > >>>>>>>         }
> > > >>>>>>>     }
> > > >>>>>>>
> > > >>>>>>> pipeline is like this:
> > > >>>>>>> builder.stream(Serdes.String(), messageSerde,
> > > "test-window-stream")\
> > > >>>>>>>  .map(new KeyValueMapper<String, Message, KeyValue<String,
> > > >> Message>>()
> > > >>>> {
> > > >>>>>>>      public KeyValue<String, Message> apply(String
key, Message
> > > >> value)
> > > >>>> {
> > > >>>>>>>          return new KeyValue<String, Message>(value.key,
> value);
> > > >>>>>>>       }
> > > >>>>>>>  })
> > > >>>>>>> .through(Serdes.String(), messageSerde,
> "test-window-key-stream")
> > > >>>>>>> .aggregateByKey(new Initializer<SortedSet<Message>>()
{
> > > >>>>>>>     public SortedSet<Message> apply() {
> > > >>>>>>>         return new TreeSet<Message>();
> > > >>>>>>>     }
> > > >>>>>>> }, new Aggregator<String, Message, SortedSet<Message>>()
{
> > > >>>>>>>     public SortedSet<Message> apply(String
aggKey, Message
> value,
> > > >>>>>>> SortedSet<Message> aggregate) {
> > > >>>>>>>         aggregate.add(value);
> > > >>>>>>>         return aggregate;
> > > >>>>>>>     }
> > > >>>>>>> }, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5
*
> > 1000L),
> > > >>>>>>> Serdes.String(), messagesSerde)
> > > >>>>>>> .foreach(new ForeachAction<Windowed<String>,
> > > SortedSet<Message>>() {
> > > >>>>>>>     public void apply(Windowed<String> key,
SortedSet<Message>
> > > >>>> messages)
> > > >>>>>> {
> > > >>>>>>>         ...
> > > >>>>>>>     }
> > > >>>>>>> });
> > > >>>>>>>
> > > >>>>>>> So basically I rekey the original message into
another topic
> and
> > > then
> > > >>>>>>> aggregate it based on that key.
> > > >>>>>>> What I have observed is that when I used windowed
aggregation
> the
> > > >>>>>>> aggregator does not use previous aggregated value.
> > > >>>>>>>
> > > >>>>>>> public SortedSet<Message> apply(String aggKey,
Message value,
> > > >>>>>>> SortedSet<Message> aggregate) {
> > > >>>>>>>     aggregate.add(value);
> > > >>>>>>>     return aggregate;
> > > >>>>>>> }
> > > >>>>>>>
> > > >>>>>>> So in the above function the aggregate is an empty
set of every
> > > value
> > > >>>>>>> entering into pipeline. When I remove the windowed
aggregation,
> > the
> > > >>>>>>> aggregate set retains previously aggregated values
in the set.
> > > >>>>>>>
> > > >>>>>>> I am just not able to wrap my head around it.
When I ran this
> > type
> > > of
> > > >>>>>> test
> > > >>>>>>> locally on windows it is working fine. However
a similar
> pipeline
> > > >> setup
> > > >>>>>>> when run against production on linux is behaving
strangely and
> > > always
> > > >>>>>>> getting an empty aggregate set.
> > > >>>>>>> Any idea what could be the reason, where should
I look at the
> > > >> problem.
> > > >>>>>> Does
> > > >>>>>>> length of key string matters here? I will later
try to run the
> > same
> > > >>>>>> simple
> > > >>>>>>> setup on linux and see what happens. But this
is a very strange
> > > >>>> behavior.
> > > >>>>>>>
> > > >>>>>>> Thanks
> > > >>>>>>> Sachin
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> On Wed, Nov 23, 2016 at 12:04 AM, Guozhang Wang
<
> > > wangguoz@gmail.com>
> > > >>>>>>> wrote:
> > > >>>>>>>
> > > >>>>>>>> Hello Sachin,
> > > >>>>>>>>
> > > >>>>>>>> In the implementation of SortedSet, if the
object's
> implemented
> > > the
> > > >>>>>>>> Comparable interface, that compareTo function
is applied in "
> > > >>>>>>>> aggregate.add(value);", and hence if it returns
0, this
> element
> > > will
> > > >>>>>> not
> > > >>>>>>> be
> > > >>>>>>>> added since it is a Set.
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> Guozhang
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> On Mon, Nov 21, 2016 at 10:06 PM, Sachin Mittal
<
> > > sjmittal@gmail.com
> > > >>>
> > > >>>>>>>> wrote:
> > > >>>>>>>>
> > > >>>>>>>>> Hi,
> > > >>>>>>>>> What I find is that when I use sorted
set as aggregation it
> > fails
> > > >> to
> > > >>>>>>>>> aggregate the values which have compareTo
returning 0.
> > > >>>>>>>>>
> > > >>>>>>>>> My class is like this:
> > > >>>>>>>>>     public class Message implements Comparable<Message>
{
> > > >>>>>>>>>         public long ts;
> > > >>>>>>>>>         public String message;
> > > >>>>>>>>>         public Message() {};
> > > >>>>>>>>>         public Message(long ts, String
message) {
> > > >>>>>>>>>             this.ts = ts;
> > > >>>>>>>>>             this.message = message;
> > > >>>>>>>>>         }
> > > >>>>>>>>>         public int compareTo(Message paramT)
{
> > > >>>>>>>>>             long ts1 = paramT.ts;
> > > >>>>>>>>>             return ts == ts1 ? 0 : ts
> ts1 ? 1 : -1;
> > > >>>>>>>>>         }
> > > >>>>>>>>>     }
> > > >>>>>>>>>
> > > >>>>>>>>> pipeline is like this:
> > > >>>>>>>>> builder.stream(Serdes.String(), messageSerde,
> > > >> "test-window-stream")
> > > >>>>>>>>> .aggregateByKey(new Initializer<SortedSet<Message>>()
{
> > > >>>>>>>>>     public SortedSet<Message> apply()
{
> > > >>>>>>>>>         return new TreeSet<Message>();
> > > >>>>>>>>>     }
> > > >>>>>>>>> }, new Aggregator<String, Message,
SortedSet<Message>>() {
> > > >>>>>>>>>     public SortedSet<Message> apply(String
aggKey, Message
> > value,
> > > >>>>>>>>> SortedSet<Message> aggregate) {
> > > >>>>>>>>>         aggregate.add(value);
> > > >>>>>>>>>         return aggregate;
> > > >>>>>>>>>     }
> > > >>>>>>>>> }, TimeWindows.of("stream-table", 10 *
1000L).advanceBy(5 *
> > > 1000L),
> > > >>>>>>>>> Serdes.String(), messagesSerde)
> > > >>>>>>>>> .foreach(new ForeachAction<Windowed<String>,
> > > >> SortedSet<Message>>() {
> > > >>>>>>>>>     public void apply(Windowed<String>
key,
> SortedSet<Message>
> > > >>>>>>> messages)
> > > >>>>>>>> {
> > > >>>>>>>>>         ...
> > > >>>>>>>>>     }
> > > >>>>>>>>> });
> > > >>>>>>>>>
> > > >>>>>>>>> So any message published between 10 and
20 seconds gets
> > > aggregated
> > > >> in
> > > >>>>>>> 10
> > > >>>>>>>> -
> > > >>>>>>>>> 20 bucket and I print the size of the
set.
> > > >>>>>>>>> However output I get is following:
> > > >>>>>>>>>
> > > >>>>>>>>> Published: 14
> > > >>>>>>>>> Aggregated: 10  20 -> 1
> > > >>>>>>>>>
> > > >>>>>>>>> Published: 18
> > > >>>>>>>>> Aggregated: 10  20 -> 2
> > > >>>>>>>>>
> > > >>>>>>>>> Published: 11
> > > >>>>>>>>> Aggregated: 10  20 -> 3
> > > >>>>>>>>>
> > > >>>>>>>>> Published: 17
> > > >>>>>>>>> Aggregated: 10  20 -> 4
> > > >>>>>>>>>
> > > >>>>>>>>> Published: 14
> > > >>>>>>>>> Aggregated: 10  20 -> 4
> > > >>>>>>>>>
> > > >>>>>>>>> Published: 15
> > > >>>>>>>>> Aggregated: 10  20 -> 5
> > > >>>>>>>>>
> > > >>>>>>>>> Published: 12
> > > >>>>>>>>> Aggregated: key2  10  20 -> 6
> > > >>>>>>>>>
> > > >>>>>>>>> Published: 12
> > > >>>>>>>>> Aggregated: 10  20 -> 6
> > > >>>>>>>>>
> > > >>>>>>>>> So if you see any message that occurs
again for same second,
> > > where
> > > >>>>>>>>> compareTo returns 0, it fails to get aggregated
in the
> > pipeline.
> > > >>>>>>>>> Notice ones published at 14 and 12 seconds.
> > > >>>>>>>>>
> > > >>>>>>>>> Now I am not sure if problem is with Java
ie I should use
> > > >> Comparator
> > > >>>>>>>>> interface and not Comparable for my Message
object. Or the
> > > problem
> > > >> is
> > > >>>>>>>> with
> > > >>>>>>>>> Kafka stream or with serializing and de-serializing
the set
> of
> > > >>>>>>> messages.
> > > >>>>>>>> If
> > > >>>>>>>>> I replace Set with List all is working
fine.
> > > >>>>>>>>>
> > > >>>>>>>>> Anyway any ideas here would be appreciated,
meanwhile let me
> > see
> > > >> what
> > > >>>>>>> is
> > > >>>>>>>>> the best java practice here.
> > > >>>>>>>>>
> > > >>>>>>>>> Thanks
> > > >>>>>>>>> Sachin
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>> On Mon, Nov 21, 2016 at 8:29 PM, Michael
Noll <
> > > >> michael@confluent.io>
> > > >>>>>>>>> wrote:
> > > >>>>>>>>>
> > > >>>>>>>>>> On Mon, Nov 21, 2016 at 1:06 PM, Sachin
Mittal <
> > > >> sjmittal@gmail.com
> > > >>>>>>>
> > > >>>>>>>>> wrote:
> > > >>>>>>>>>>
> > > >>>>>>>>>>> I am using kafka_2.10-0.10.0.1.
> > > >>>>>>>>>>> Say I am having a window of 60
minutes advanced by 15
> > minutes.
> > > >>>>>>>>>>> If the stream app using timestamp
extractor puts the
> message
> > in
> > > >>>>>> one
> > > >>>>>>>> or
> > > >>>>>>>>>> more
> > > >>>>>>>>>>> bucket(s), it will get aggregated
in those buckets.
> > > >>>>>>>>>>> I assume this statement is correct.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>> Yes.
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Also say when I restart the streams
application then bucket
> > > >>>>>>>> aggregation
> > > >>>>>>>>>>> will resume from last point of
halt.
> > > >>>>>>>>>>> I hope this is also correct.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>> Yes.
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>>> What I noticed that once a message
is placed in one bucket,
> > > that
> > > >>>>>>>> bucket
> > > >>>>>>>>>> was
> > > >>>>>>>>>>> not getting new messages.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>> This should not happen...
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>>> However when I ran a small test
case replicating that, it
> is
> > > >>>>>>> working
> > > >>>>>>>>>>> properly. There maybe some issues
in application reset.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>> ...and apparently it works (as expected)
in your small test
> > > case.
> > > >>>>>>>>>>
> > > >>>>>>>>>> Do you have any further information
that you could share
> with
> > us
> > > >> so
> > > >>>>>>> we
> > > >>>>>>>>> can
> > > >>>>>>>>>> help you better?  What's the difference,
for example,
> between
> > > your
> > > >>>>>>>>> "normal"
> > > >>>>>>>>>> use case and the small test case you
have been referring to?
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>> -Michael
> > > >>>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> --
> > > >>>>>>>> -- Guozhang
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> --
> > > >>>>>> -- Guozhang
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>>>
> > > >>>
> > > >>
> > > >>
> > > >
> > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message