kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sachin Mittal <sjmit...@gmail.com>
Subject Re: Kafka windowed table not aggregating correctly
Date Sat, 03 Dec 2016 06:07:42 GMT
Hi,
I think now it makes all the sense. The field I was using for timestamp
extractor contains timestamps which spans for greater than a day's duration
and it worked for wall clock because for short duration timestamps were in
day's range.

I wanted to understand one thing:
Say I have a timestamp extractor field and as record gets ingested future
records will have increasing values for the timestamp.

Now lets say default duration is one day. At a future time a record will
have timestamp which now is greater than the initial day's range.
What will happen then, it will create a new segment and then create windows
in it for the next day's duration?
What happens if now it gets a record from the previous day, will it get
discarded or will it again have just the single value aggregated in it
(previous values are lost).
So when new segment is create as I understand does it retain the older
segments data.

This is bit confusing, so would be helpful if you can explain in bit more
detail.

Thanks
Sachin


On Sat, Dec 3, 2016 at 5:18 AM, Guozhang Wang <wangguoz@gmail.com> wrote:

> Sachin,
>
> One thing to note is that the retention of the windowed stores works by
> keeping multiple segments of the stores where each segments stores a time
> range which can potentially span multiple windows, if a new window needs to
> be created that is further from the oldest segment's time range + retention
> period (from your code it seems you do not override it from
> TimeWindows.of("stream-table",
> 10 * 1000L).advanceBy(5 * 1000L), via until(...)), so the default of one
> day is used.
>
> So with WallclockTimeExtractor since it is using system time, it wont give
> you timestamps that span for more than a day during a short period of time,
> but if your own defined timestamps expand that value, then old segments
> will be dropped immediately and hence the aggregate values will be returned
> as a single value.
>
> Guozhang
>
>
> On Fri, Dec 2, 2016 at 11:58 AM, Matthias J. Sax <matthias@confluent.io>
> wrote:
>
> > The extractor is used in
> >
> > org.apache.kafka.streams.processor.internals.RecordQueue#addRawRecords()
> >
> > Let us know, if you could resolve the problem or need more help.
> >
> > -Matthias
> >
> > On 12/2/16 11:46 AM, Sachin Mittal wrote:
> > > https://github.com/SOHU-Co/kafka-node/ this is the node js client i am
> > > using. The version is 0.5x. Can you please tell me what code in streams
> > > calls the timestamp extractor. I can look there to see if there is any
> > > issue.
> > >
> > > Again issue happens only when producing the messages using producer
> that
> > is
> > > compatible with kafka version 0.8x. I see that this producer does not
> > send
> > > a record timestamp as this was introduced in version 0.10 only.
> > >
> > > Thanks
> > > Sachin
> > >
> > > On 3 Dec 2016 1:03 a.m., "Matthias J. Sax" <matthias@confluent.io>
> > wrote:
> > >
> > >> I am not sure what is happening. That's why it would be good to have a
> > >> toy example to reproduce the issue.
> > >>
> > >> What do you mean by "Kafka node version 0.5"?
> > >>
> > >> -Matthias
> > >>
> > >> On 12/2/16 11:30 AM, Sachin Mittal wrote:
> > >>> I can provide with the data but data does not seem to be the issue.
> > >>> If I submit the same data and use same timestamp extractor  using the
> > >> java
> > >>> client with kafka version 0.10.0.1 aggregation works fine.
> > >>> I find the issue only when submitting the data with kafka node
> version
> > >> 0.5.
> > >>> It looks like the stream does not extract the time correctly in that
> > >> case.
> > >>>
> > >>> Thanks
> > >>> Sachin
> > >>>
> > >>> On 2 Dec 2016 11:41 p.m., "Matthias J. Sax" <matthias@confluent.io>
> > >> wrote:
> > >>>
> > >>>> Can you provide example input data (including timetamps) and result.
> > >>>> What is the expected result (ie, what aggregation do you apply)?
> > >>>>
> > >>>>
> > >>>> -Matthias
> > >>>>
> > >>>> On 12/2/16 7:43 AM, Sachin Mittal wrote:
> > >>>>> Hi,
> > >>>>> After much debugging I found an issue with timestamp extractor.
> > >>>>>
> > >>>>> If I use a custom timestamp extractor with following code:
> > >>>>>     public static class MessageTimestampExtractor implements
> > >>>>> TimestampExtractor {
> > >>>>>         public long extract(ConsumerRecord<Object, Object>
record)
> {
> > >>>>>             if (record.value() instanceof Message) {
> > >>>>>                 return ((Message) record.value()).ts;
> > >>>>>             } else {
> > >>>>>                 return record.timestamp();
> > >>>>>             }
> > >>>>>         }
> > >>>>>     }
> > >>>>>
> > >>>>> Here message has a long field ts which stores the timestamp,
the
> > >>>>> aggregation does not work.
> > >>>>> Note I have checked and ts has valid timestamp values.
> > >>>>>
> > >>>>> However if I replace it with say WallclockTimestampExtractor
> > >> aggregation
> > >>>> is
> > >>>>> working fine.
> > >>>>>
> > >>>>> I do not understand what could be the issue here.
> > >>>>>
> > >>>>> Also note I am using kafka streams version 0.10.0.1 and I am
> > publishing
> > >>>>> messages via
> > >>>>> https://github.com/SOHU-Co/kafka-node/ whose version is quite
old
> > >> 0.5.x
> > >>>>>
> > >>>>> Let me know if there is some bug in time stamp extractions.
> > >>>>>
> > >>>>> Thanks
> > >>>>> Sachin
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>> On Mon, Nov 28, 2016 at 11:52 PM, Guozhang Wang <
> wangguoz@gmail.com>
> > >>>> wrote:
> > >>>>>
> > >>>>>> Sachin,
> > >>>>>>
> > >>>>>> This is indeed a bit wired, and we'd like to try to re-produce
> your
> > >>>> issue
> > >>>>>> locally. Do you have a sample input data for us to try
out?
> > >>>>>>
> > >>>>>> Guozhang
> > >>>>>>
> > >>>>>> On Fri, Nov 25, 2016 at 10:12 PM, Sachin Mittal <
> sjmittal@gmail.com
> > >
> > >>>>>> wrote:
> > >>>>>>
> > >>>>>>> Hi,
> > >>>>>>> I fixed that sorted set issue but I am facing a weird
problem
> > which I
> > >>>> am
> > >>>>>>> not able to replicate.
> > >>>>>>>
> > >>>>>>> Here is the sample problem that I could isolate:
> > >>>>>>> My class is like this:
> > >>>>>>>     public static class Message implements Comparable<Message>
{
> > >>>>>>>         public long ts;
> > >>>>>>>         public String message;
> > >>>>>>>         public String key;
> > >>>>>>>         public Message() {};
> > >>>>>>>         public Message(long ts, String message, String
key) {
> > >>>>>>>             this.ts = ts;
> > >>>>>>>             this.key = key;
> > >>>>>>>             this.message = message;
> > >>>>>>>         }
> > >>>>>>>         public int compareTo(Message paramT) {
> > >>>>>>>             long ts1 = paramT.ts;
> > >>>>>>>             return ts > ts1 ? 1 : -1;
> > >>>>>>>         }
> > >>>>>>>     }
> > >>>>>>>
> > >>>>>>> pipeline is like this:
> > >>>>>>> builder.stream(Serdes.String(), messageSerde,
> > "test-window-stream")\
> > >>>>>>>  .map(new KeyValueMapper<String, Message, KeyValue<String,
> > >> Message>>()
> > >>>> {
> > >>>>>>>      public KeyValue<String, Message> apply(String
key, Message
> > >> value)
> > >>>> {
> > >>>>>>>          return new KeyValue<String, Message>(value.key,
value);
> > >>>>>>>       }
> > >>>>>>>  })
> > >>>>>>> .through(Serdes.String(), messageSerde, "test-window-key-stream")
> > >>>>>>> .aggregateByKey(new Initializer<SortedSet<Message>>()
{
> > >>>>>>>     public SortedSet<Message> apply() {
> > >>>>>>>         return new TreeSet<Message>();
> > >>>>>>>     }
> > >>>>>>> }, new Aggregator<String, Message, SortedSet<Message>>()
{
> > >>>>>>>     public SortedSet<Message> apply(String aggKey,
Message value,
> > >>>>>>> SortedSet<Message> aggregate) {
> > >>>>>>>         aggregate.add(value);
> > >>>>>>>         return aggregate;
> > >>>>>>>     }
> > >>>>>>> }, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5
*
> 1000L),
> > >>>>>>> Serdes.String(), messagesSerde)
> > >>>>>>> .foreach(new ForeachAction<Windowed<String>,
> > SortedSet<Message>>() {
> > >>>>>>>     public void apply(Windowed<String> key, SortedSet<Message>
> > >>>> messages)
> > >>>>>> {
> > >>>>>>>         ...
> > >>>>>>>     }
> > >>>>>>> });
> > >>>>>>>
> > >>>>>>> So basically I rekey the original message into another
topic and
> > then
> > >>>>>>> aggregate it based on that key.
> > >>>>>>> What I have observed is that when I used windowed aggregation
the
> > >>>>>>> aggregator does not use previous aggregated value.
> > >>>>>>>
> > >>>>>>> public SortedSet<Message> apply(String aggKey,
Message value,
> > >>>>>>> SortedSet<Message> aggregate) {
> > >>>>>>>     aggregate.add(value);
> > >>>>>>>     return aggregate;
> > >>>>>>> }
> > >>>>>>>
> > >>>>>>> So in the above function the aggregate is an empty
set of every
> > value
> > >>>>>>> entering into pipeline. When I remove the windowed
aggregation,
> the
> > >>>>>>> aggregate set retains previously aggregated values
in the set.
> > >>>>>>>
> > >>>>>>> I am just not able to wrap my head around it. When
I ran this
> type
> > of
> > >>>>>> test
> > >>>>>>> locally on windows it is working fine. However a similar
pipeline
> > >> setup
> > >>>>>>> when run against production on linux is behaving strangely
and
> > always
> > >>>>>>> getting an empty aggregate set.
> > >>>>>>> Any idea what could be the reason, where should I look
at the
> > >> problem.
> > >>>>>> Does
> > >>>>>>> length of key string matters here? I will later try
to run the
> same
> > >>>>>> simple
> > >>>>>>> setup on linux and see what happens. But this is a
very strange
> > >>>> behavior.
> > >>>>>>>
> > >>>>>>> Thanks
> > >>>>>>> Sachin
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On Wed, Nov 23, 2016 at 12:04 AM, Guozhang Wang <
> > wangguoz@gmail.com>
> > >>>>>>> wrote:
> > >>>>>>>
> > >>>>>>>> Hello Sachin,
> > >>>>>>>>
> > >>>>>>>> In the implementation of SortedSet, if the object's
implemented
> > the
> > >>>>>>>> Comparable interface, that compareTo function is
applied in "
> > >>>>>>>> aggregate.add(value);", and hence if it returns
0, this element
> > will
> > >>>>>> not
> > >>>>>>> be
> > >>>>>>>> added since it is a Set.
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> Guozhang
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> On Mon, Nov 21, 2016 at 10:06 PM, Sachin Mittal
<
> > sjmittal@gmail.com
> > >>>
> > >>>>>>>> wrote:
> > >>>>>>>>
> > >>>>>>>>> Hi,
> > >>>>>>>>> What I find is that when I use sorted set as
aggregation it
> fails
> > >> to
> > >>>>>>>>> aggregate the values which have compareTo returning
0.
> > >>>>>>>>>
> > >>>>>>>>> My class is like this:
> > >>>>>>>>>     public class Message implements Comparable<Message>
{
> > >>>>>>>>>         public long ts;
> > >>>>>>>>>         public String message;
> > >>>>>>>>>         public Message() {};
> > >>>>>>>>>         public Message(long ts, String message)
{
> > >>>>>>>>>             this.ts = ts;
> > >>>>>>>>>             this.message = message;
> > >>>>>>>>>         }
> > >>>>>>>>>         public int compareTo(Message paramT)
{
> > >>>>>>>>>             long ts1 = paramT.ts;
> > >>>>>>>>>             return ts == ts1 ? 0 : ts >
ts1 ? 1 : -1;
> > >>>>>>>>>         }
> > >>>>>>>>>     }
> > >>>>>>>>>
> > >>>>>>>>> pipeline is like this:
> > >>>>>>>>> builder.stream(Serdes.String(), messageSerde,
> > >> "test-window-stream")
> > >>>>>>>>> .aggregateByKey(new Initializer<SortedSet<Message>>()
{
> > >>>>>>>>>     public SortedSet<Message> apply()
{
> > >>>>>>>>>         return new TreeSet<Message>();
> > >>>>>>>>>     }
> > >>>>>>>>> }, new Aggregator<String, Message, SortedSet<Message>>()
{
> > >>>>>>>>>     public SortedSet<Message> apply(String
aggKey, Message
> value,
> > >>>>>>>>> SortedSet<Message> aggregate) {
> > >>>>>>>>>         aggregate.add(value);
> > >>>>>>>>>         return aggregate;
> > >>>>>>>>>     }
> > >>>>>>>>> }, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5
*
> > 1000L),
> > >>>>>>>>> Serdes.String(), messagesSerde)
> > >>>>>>>>> .foreach(new ForeachAction<Windowed<String>,
> > >> SortedSet<Message>>() {
> > >>>>>>>>>     public void apply(Windowed<String>
key, SortedSet<Message>
> > >>>>>>> messages)
> > >>>>>>>> {
> > >>>>>>>>>         ...
> > >>>>>>>>>     }
> > >>>>>>>>> });
> > >>>>>>>>>
> > >>>>>>>>> So any message published between 10 and 20
seconds gets
> > aggregated
> > >> in
> > >>>>>>> 10
> > >>>>>>>> -
> > >>>>>>>>> 20 bucket and I print the size of the set.
> > >>>>>>>>> However output I get is following:
> > >>>>>>>>>
> > >>>>>>>>> Published: 14
> > >>>>>>>>> Aggregated: 10  20 -> 1
> > >>>>>>>>>
> > >>>>>>>>> Published: 18
> > >>>>>>>>> Aggregated: 10  20 -> 2
> > >>>>>>>>>
> > >>>>>>>>> Published: 11
> > >>>>>>>>> Aggregated: 10  20 -> 3
> > >>>>>>>>>
> > >>>>>>>>> Published: 17
> > >>>>>>>>> Aggregated: 10  20 -> 4
> > >>>>>>>>>
> > >>>>>>>>> Published: 14
> > >>>>>>>>> Aggregated: 10  20 -> 4
> > >>>>>>>>>
> > >>>>>>>>> Published: 15
> > >>>>>>>>> Aggregated: 10  20 -> 5
> > >>>>>>>>>
> > >>>>>>>>> Published: 12
> > >>>>>>>>> Aggregated: key2  10  20 -> 6
> > >>>>>>>>>
> > >>>>>>>>> Published: 12
> > >>>>>>>>> Aggregated: 10  20 -> 6
> > >>>>>>>>>
> > >>>>>>>>> So if you see any message that occurs again
for same second,
> > where
> > >>>>>>>>> compareTo returns 0, it fails to get aggregated
in the
> pipeline.
> > >>>>>>>>> Notice ones published at 14 and 12 seconds.
> > >>>>>>>>>
> > >>>>>>>>> Now I am not sure if problem is with Java ie
I should use
> > >> Comparator
> > >>>>>>>>> interface and not Comparable for my Message
object. Or the
> > problem
> > >> is
> > >>>>>>>> with
> > >>>>>>>>> Kafka stream or with serializing and de-serializing
the set of
> > >>>>>>> messages.
> > >>>>>>>> If
> > >>>>>>>>> I replace Set with List all is working fine.
> > >>>>>>>>>
> > >>>>>>>>> Anyway any ideas here would be appreciated,
meanwhile let me
> see
> > >> what
> > >>>>>>> is
> > >>>>>>>>> the best java practice here.
> > >>>>>>>>>
> > >>>>>>>>> Thanks
> > >>>>>>>>> Sachin
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> On Mon, Nov 21, 2016 at 8:29 PM, Michael Noll
<
> > >> michael@confluent.io>
> > >>>>>>>>> wrote:
> > >>>>>>>>>
> > >>>>>>>>>> On Mon, Nov 21, 2016 at 1:06 PM, Sachin
Mittal <
> > >> sjmittal@gmail.com
> > >>>>>>>
> > >>>>>>>>> wrote:
> > >>>>>>>>>>
> > >>>>>>>>>>> I am using kafka_2.10-0.10.0.1.
> > >>>>>>>>>>> Say I am having a window of 60 minutes
advanced by 15
> minutes.
> > >>>>>>>>>>> If the stream app using timestamp extractor
puts the message
> in
> > >>>>>> one
> > >>>>>>>> or
> > >>>>>>>>>> more
> > >>>>>>>>>>> bucket(s), it will get aggregated in
those buckets.
> > >>>>>>>>>>> I assume this statement is correct.
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> Yes.
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> Also say when I restart the streams
application then bucket
> > >>>>>>>> aggregation
> > >>>>>>>>>>> will resume from last point of halt.
> > >>>>>>>>>>> I hope this is also correct.
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> Yes.
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>> What I noticed that once a message
is placed in one bucket,
> > that
> > >>>>>>>> bucket
> > >>>>>>>>>> was
> > >>>>>>>>>>> not getting new messages.
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> This should not happen...
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>> However when I ran a small test case
replicating that, it is
> > >>>>>>> working
> > >>>>>>>>>>> properly. There maybe some issues in
application reset.
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> ...and apparently it works (as expected)
in your small test
> > case.
> > >>>>>>>>>>
> > >>>>>>>>>> Do you have any further information that
you could share with
> us
> > >> so
> > >>>>>>> we
> > >>>>>>>>> can
> > >>>>>>>>>> help you better?  What's the difference,
for example, between
> > your
> > >>>>>>>>> "normal"
> > >>>>>>>>>> use case and the small test case you have
been referring to?
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> -Michael
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> --
> > >>>>>>>> -- Guozhang
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>> --
> > >>>>>> -- Guozhang
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>>
> > >>>
> > >>
> > >>
> > >
> >
> >
>
>
> --
> -- Guozhang
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message