kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sachin Mittal <sjmit...@gmail.com>
Subject Re: Kafka windowed table not aggregating correctly
Date Fri, 09 Dec 2016 04:19:10 GMT
Hi,
Right now in order to circumvent this problem I am using a timestamp whose
values increase by few ms as and when I get new records.
So lets say I have records in order
A -> lower limit TS + 1 sec
B -> lower limit TS + 3 sec
C -> lower limit TS + 5 sec
..
Z -> upper limit TS - 1 sec

Now say I get a record ZZ with ts upper limit TS + 1 sec I assume it will
drop the previous windows and create new ones based on this timestamp.
Please confirm this understanding.

Now lets say I get new record ZZA with timestamp (old) upper limit TS - 1
sec, will this again cause new windows to be dropped and recreate older
windows fresh with all the older aggregation done so far lost?

Thanks
Sachin




On Fri, Dec 9, 2016 at 12:16 AM, Guozhang Wang <wangguoz@gmail.com> wrote:

> Hello Sachin,
>
> I am with you that ideally the windowing segmentation implementation should
> be totally abstracted from users but today it is a bit confusing to
> understand. I have filed JIRA some time ago to improve on this end:
>
> https://issues.apache.org/jira/browse/KAFKA-3596
>
> So to your example, if a "far future record" was received whose timestamp
> is beyond current time + the retention period, it could potentially cause
> the current window to be dropped.
>
>
> Guozhang
>
>
> On Fri, Dec 2, 2016 at 10:07 PM, Sachin Mittal <sjmittal@gmail.com> wrote:
>
> > Hi,
> > I think now it makes all the sense. The field I was using for timestamp
> > extractor contains timestamps which spans for greater than a day's
> duration
> > and it worked for wall clock because for short duration timestamps were
> in
> > day's range.
> >
> > I wanted to understand one thing:
> > Say I have a timestamp extractor field and as record gets ingested future
> > records will have increasing values for the timestamp.
> >
> > Now lets say default duration is one day. At a future time a record will
> > have timestamp which now is greater than the initial day's range.
> > What will happen then, it will create a new segment and then create
> windows
> > in it for the next day's duration?
> > What happens if now it gets a record from the previous day, will it get
> > discarded or will it again have just the single value aggregated in it
> > (previous values are lost).
> > So when new segment is create as I understand does it retain the older
> > segments data.
> >
> > This is bit confusing, so would be helpful if you can explain in bit more
> > detail.
> >
> > Thanks
> > Sachin
> >
> >
> > On Sat, Dec 3, 2016 at 5:18 AM, Guozhang Wang <wangguoz@gmail.com>
> wrote:
> >
> > > Sachin,
> > >
> > > One thing to note is that the retention of the windowed stores works by
> > > keeping multiple segments of the stores where each segments stores a
> time
> > > range which can potentially span multiple windows, if a new window
> needs
> > to
> > > be created that is further from the oldest segment's time range +
> > retention
> > > period (from your code it seems you do not override it from
> > > TimeWindows.of("stream-table",
> > > 10 * 1000L).advanceBy(5 * 1000L), via until(...)), so the default of
> one
> > > day is used.
> > >
> > > So with WallclockTimeExtractor since it is using system time, it wont
> > give
> > > you timestamps that span for more than a day during a short period of
> > time,
> > > but if your own defined timestamps expand that value, then old segments
> > > will be dropped immediately and hence the aggregate values will be
> > returned
> > > as a single value.
> > >
> > > Guozhang
> > >
> > >
> > > On Fri, Dec 2, 2016 at 11:58 AM, Matthias J. Sax <
> matthias@confluent.io>
> > > wrote:
> > >
> > > > The extractor is used in
> > > >
> > > > org.apache.kafka.streams.processor.internals.
> > RecordQueue#addRawRecords()
> > > >
> > > > Let us know, if you could resolve the problem or need more help.
> > > >
> > > > -Matthias
> > > >
> > > > On 12/2/16 11:46 AM, Sachin Mittal wrote:
> > > > > https://github.com/SOHU-Co/kafka-node/ this is the node js client
> i
> > am
> > > > > using. The version is 0.5x. Can you please tell me what code in
> > streams
> > > > > calls the timestamp extractor. I can look there to see if there is
> > any
> > > > > issue.
> > > > >
> > > > > Again issue happens only when producing the messages using producer
> > > that
> > > > is
> > > > > compatible with kafka version 0.8x. I see that this producer does
> not
> > > > send
> > > > > a record timestamp as this was introduced in version 0.10 only.
> > > > >
> > > > > Thanks
> > > > > Sachin
> > > > >
> > > > > On 3 Dec 2016 1:03 a.m., "Matthias J. Sax" <matthias@confluent.io>
> > > > wrote:
> > > > >
> > > > >> I am not sure what is happening. That's why it would be good
to
> > have a
> > > > >> toy example to reproduce the issue.
> > > > >>
> > > > >> What do you mean by "Kafka node version 0.5"?
> > > > >>
> > > > >> -Matthias
> > > > >>
> > > > >> On 12/2/16 11:30 AM, Sachin Mittal wrote:
> > > > >>> I can provide with the data but data does not seem to be
the
> issue.
> > > > >>> If I submit the same data and use same timestamp extractor
 using
> > the
> > > > >> java
> > > > >>> client with kafka version 0.10.0.1 aggregation works fine.
> > > > >>> I find the issue only when submitting the data with kafka
node
> > > version
> > > > >> 0.5.
> > > > >>> It looks like the stream does not extract the time correctly
in
> > that
> > > > >> case.
> > > > >>>
> > > > >>> Thanks
> > > > >>> Sachin
> > > > >>>
> > > > >>> On 2 Dec 2016 11:41 p.m., "Matthias J. Sax" <
> matthias@confluent.io
> > >
> > > > >> wrote:
> > > > >>>
> > > > >>>> Can you provide example input data (including timetamps)
and
> > result.
> > > > >>>> What is the expected result (ie, what aggregation do
you apply)?
> > > > >>>>
> > > > >>>>
> > > > >>>> -Matthias
> > > > >>>>
> > > > >>>> On 12/2/16 7:43 AM, Sachin Mittal wrote:
> > > > >>>>> Hi,
> > > > >>>>> After much debugging I found an issue with timestamp
extractor.
> > > > >>>>>
> > > > >>>>> If I use a custom timestamp extractor with following
code:
> > > > >>>>>     public static class MessageTimestampExtractor
implements
> > > > >>>>> TimestampExtractor {
> > > > >>>>>         public long extract(ConsumerRecord<Object,
Object>
> > record)
> > > {
> > > > >>>>>             if (record.value() instanceof Message)
{
> > > > >>>>>                 return ((Message) record.value()).ts;
> > > > >>>>>             } else {
> > > > >>>>>                 return record.timestamp();
> > > > >>>>>             }
> > > > >>>>>         }
> > > > >>>>>     }
> > > > >>>>>
> > > > >>>>> Here message has a long field ts which stores the
timestamp,
> the
> > > > >>>>> aggregation does not work.
> > > > >>>>> Note I have checked and ts has valid timestamp values.
> > > > >>>>>
> > > > >>>>> However if I replace it with say WallclockTimestampExtractor
> > > > >> aggregation
> > > > >>>> is
> > > > >>>>> working fine.
> > > > >>>>>
> > > > >>>>> I do not understand what could be the issue here.
> > > > >>>>>
> > > > >>>>> Also note I am using kafka streams version 0.10.0.1
and I am
> > > > publishing
> > > > >>>>> messages via
> > > > >>>>> https://github.com/SOHU-Co/kafka-node/ whose version
is quite
> > old
> > > > >> 0.5.x
> > > > >>>>>
> > > > >>>>> Let me know if there is some bug in time stamp extractions.
> > > > >>>>>
> > > > >>>>> Thanks
> > > > >>>>> Sachin
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> On Mon, Nov 28, 2016 at 11:52 PM, Guozhang Wang <
> > > wangguoz@gmail.com>
> > > > >>>> wrote:
> > > > >>>>>
> > > > >>>>>> Sachin,
> > > > >>>>>>
> > > > >>>>>> This is indeed a bit wired, and we'd like to
try to re-produce
> > > your
> > > > >>>> issue
> > > > >>>>>> locally. Do you have a sample input data for
us to try out?
> > > > >>>>>>
> > > > >>>>>> Guozhang
> > > > >>>>>>
> > > > >>>>>> On Fri, Nov 25, 2016 at 10:12 PM, Sachin Mittal
<
> > > sjmittal@gmail.com
> > > > >
> > > > >>>>>> wrote:
> > > > >>>>>>
> > > > >>>>>>> Hi,
> > > > >>>>>>> I fixed that sorted set issue but I am facing
a weird problem
> > > > which I
> > > > >>>> am
> > > > >>>>>>> not able to replicate.
> > > > >>>>>>>
> > > > >>>>>>> Here is the sample problem that I could isolate:
> > > > >>>>>>> My class is like this:
> > > > >>>>>>>     public static class Message implements
> Comparable<Message>
> > {
> > > > >>>>>>>         public long ts;
> > > > >>>>>>>         public String message;
> > > > >>>>>>>         public String key;
> > > > >>>>>>>         public Message() {};
> > > > >>>>>>>         public Message(long ts, String message,
String key) {
> > > > >>>>>>>             this.ts = ts;
> > > > >>>>>>>             this.key = key;
> > > > >>>>>>>             this.message = message;
> > > > >>>>>>>         }
> > > > >>>>>>>         public int compareTo(Message paramT)
{
> > > > >>>>>>>             long ts1 = paramT.ts;
> > > > >>>>>>>             return ts > ts1 ? 1 : -1;
> > > > >>>>>>>         }
> > > > >>>>>>>     }
> > > > >>>>>>>
> > > > >>>>>>> pipeline is like this:
> > > > >>>>>>> builder.stream(Serdes.String(), messageSerde,
> > > > "test-window-stream")\
> > > > >>>>>>>  .map(new KeyValueMapper<String, Message,
KeyValue<String,
> > > > >> Message>>()
> > > > >>>> {
> > > > >>>>>>>      public KeyValue<String, Message>
apply(String key,
> Message
> > > > >> value)
> > > > >>>> {
> > > > >>>>>>>          return new KeyValue<String, Message>(value.key,
> > value);
> > > > >>>>>>>       }
> > > > >>>>>>>  })
> > > > >>>>>>> .through(Serdes.String(), messageSerde,
> > "test-window-key-stream")
> > > > >>>>>>> .aggregateByKey(new Initializer<SortedSet<Message>>()
{
> > > > >>>>>>>     public SortedSet<Message> apply()
{
> > > > >>>>>>>         return new TreeSet<Message>();
> > > > >>>>>>>     }
> > > > >>>>>>> }, new Aggregator<String, Message, SortedSet<Message>>()
{
> > > > >>>>>>>     public SortedSet<Message> apply(String
aggKey, Message
> > value,
> > > > >>>>>>> SortedSet<Message> aggregate) {
> > > > >>>>>>>         aggregate.add(value);
> > > > >>>>>>>         return aggregate;
> > > > >>>>>>>     }
> > > > >>>>>>> }, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5
*
> > > 1000L),
> > > > >>>>>>> Serdes.String(), messagesSerde)
> > > > >>>>>>> .foreach(new ForeachAction<Windowed<String>,
> > > > SortedSet<Message>>() {
> > > > >>>>>>>     public void apply(Windowed<String>
key,
> SortedSet<Message>
> > > > >>>> messages)
> > > > >>>>>> {
> > > > >>>>>>>         ...
> > > > >>>>>>>     }
> > > > >>>>>>> });
> > > > >>>>>>>
> > > > >>>>>>> So basically I rekey the original message
into another topic
> > and
> > > > then
> > > > >>>>>>> aggregate it based on that key.
> > > > >>>>>>> What I have observed is that when I used
windowed aggregation
> > the
> > > > >>>>>>> aggregator does not use previous aggregated
value.
> > > > >>>>>>>
> > > > >>>>>>> public SortedSet<Message> apply(String
aggKey, Message value,
> > > > >>>>>>> SortedSet<Message> aggregate) {
> > > > >>>>>>>     aggregate.add(value);
> > > > >>>>>>>     return aggregate;
> > > > >>>>>>> }
> > > > >>>>>>>
> > > > >>>>>>> So in the above function the aggregate is
an empty set of
> every
> > > > value
> > > > >>>>>>> entering into pipeline. When I remove the
windowed
> aggregation,
> > > the
> > > > >>>>>>> aggregate set retains previously aggregated
values in the
> set.
> > > > >>>>>>>
> > > > >>>>>>> I am just not able to wrap my head around
it. When I ran this
> > > type
> > > > of
> > > > >>>>>> test
> > > > >>>>>>> locally on windows it is working fine. However
a similar
> > pipeline
> > > > >> setup
> > > > >>>>>>> when run against production on linux is behaving
strangely
> and
> > > > always
> > > > >>>>>>> getting an empty aggregate set.
> > > > >>>>>>> Any idea what could be the reason, where
should I look at the
> > > > >> problem.
> > > > >>>>>> Does
> > > > >>>>>>> length of key string matters here? I will
later try to run
> the
> > > same
> > > > >>>>>> simple
> > > > >>>>>>> setup on linux and see what happens. But
this is a very
> strange
> > > > >>>> behavior.
> > > > >>>>>>>
> > > > >>>>>>> Thanks
> > > > >>>>>>> Sachin
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> On Wed, Nov 23, 2016 at 12:04 AM, Guozhang
Wang <
> > > > wangguoz@gmail.com>
> > > > >>>>>>> wrote:
> > > > >>>>>>>
> > > > >>>>>>>> Hello Sachin,
> > > > >>>>>>>>
> > > > >>>>>>>> In the implementation of SortedSet, if
the object's
> > implemented
> > > > the
> > > > >>>>>>>> Comparable interface, that compareTo
function is applied in
> "
> > > > >>>>>>>> aggregate.add(value);", and hence if
it returns 0, this
> > element
> > > > will
> > > > >>>>>> not
> > > > >>>>>>> be
> > > > >>>>>>>> added since it is a Set.
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>> Guozhang
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>> On Mon, Nov 21, 2016 at 10:06 PM, Sachin
Mittal <
> > > > sjmittal@gmail.com
> > > > >>>
> > > > >>>>>>>> wrote:
> > > > >>>>>>>>
> > > > >>>>>>>>> Hi,
> > > > >>>>>>>>> What I find is that when I use sorted
set as aggregation it
> > > fails
> > > > >> to
> > > > >>>>>>>>> aggregate the values which have compareTo
returning 0.
> > > > >>>>>>>>>
> > > > >>>>>>>>> My class is like this:
> > > > >>>>>>>>>     public class Message implements
Comparable<Message> {
> > > > >>>>>>>>>         public long ts;
> > > > >>>>>>>>>         public String message;
> > > > >>>>>>>>>         public Message() {};
> > > > >>>>>>>>>         public Message(long ts, String
message) {
> > > > >>>>>>>>>             this.ts = ts;
> > > > >>>>>>>>>             this.message = message;
> > > > >>>>>>>>>         }
> > > > >>>>>>>>>         public int compareTo(Message
paramT) {
> > > > >>>>>>>>>             long ts1 = paramT.ts;
> > > > >>>>>>>>>             return ts == ts1 ? 0
: ts > ts1 ? 1 : -1;
> > > > >>>>>>>>>         }
> > > > >>>>>>>>>     }
> > > > >>>>>>>>>
> > > > >>>>>>>>> pipeline is like this:
> > > > >>>>>>>>> builder.stream(Serdes.String(), messageSerde,
> > > > >> "test-window-stream")
> > > > >>>>>>>>> .aggregateByKey(new Initializer<SortedSet<Message>>()
{
> > > > >>>>>>>>>     public SortedSet<Message>
apply() {
> > > > >>>>>>>>>         return new TreeSet<Message>();
> > > > >>>>>>>>>     }
> > > > >>>>>>>>> }, new Aggregator<String, Message,
SortedSet<Message>>() {
> > > > >>>>>>>>>     public SortedSet<Message>
apply(String aggKey, Message
> > > value,
> > > > >>>>>>>>> SortedSet<Message> aggregate)
{
> > > > >>>>>>>>>         aggregate.add(value);
> > > > >>>>>>>>>         return aggregate;
> > > > >>>>>>>>>     }
> > > > >>>>>>>>> }, TimeWindows.of("stream-table",
10 * 1000L).advanceBy(5 *
> > > > 1000L),
> > > > >>>>>>>>> Serdes.String(), messagesSerde)
> > > > >>>>>>>>> .foreach(new ForeachAction<Windowed<String>,
> > > > >> SortedSet<Message>>() {
> > > > >>>>>>>>>     public void apply(Windowed<String>
key,
> > SortedSet<Message>
> > > > >>>>>>> messages)
> > > > >>>>>>>> {
> > > > >>>>>>>>>         ...
> > > > >>>>>>>>>     }
> > > > >>>>>>>>> });
> > > > >>>>>>>>>
> > > > >>>>>>>>> So any message published between
10 and 20 seconds gets
> > > > aggregated
> > > > >> in
> > > > >>>>>>> 10
> > > > >>>>>>>> -
> > > > >>>>>>>>> 20 bucket and I print the size of
the set.
> > > > >>>>>>>>> However output I get is following:
> > > > >>>>>>>>>
> > > > >>>>>>>>> Published: 14
> > > > >>>>>>>>> Aggregated: 10  20 -> 1
> > > > >>>>>>>>>
> > > > >>>>>>>>> Published: 18
> > > > >>>>>>>>> Aggregated: 10  20 -> 2
> > > > >>>>>>>>>
> > > > >>>>>>>>> Published: 11
> > > > >>>>>>>>> Aggregated: 10  20 -> 3
> > > > >>>>>>>>>
> > > > >>>>>>>>> Published: 17
> > > > >>>>>>>>> Aggregated: 10  20 -> 4
> > > > >>>>>>>>>
> > > > >>>>>>>>> Published: 14
> > > > >>>>>>>>> Aggregated: 10  20 -> 4
> > > > >>>>>>>>>
> > > > >>>>>>>>> Published: 15
> > > > >>>>>>>>> Aggregated: 10  20 -> 5
> > > > >>>>>>>>>
> > > > >>>>>>>>> Published: 12
> > > > >>>>>>>>> Aggregated: key2  10  20 -> 6
> > > > >>>>>>>>>
> > > > >>>>>>>>> Published: 12
> > > > >>>>>>>>> Aggregated: 10  20 -> 6
> > > > >>>>>>>>>
> > > > >>>>>>>>> So if you see any message that occurs
again for same
> second,
> > > > where
> > > > >>>>>>>>> compareTo returns 0, it fails to
get aggregated in the
> > > pipeline.
> > > > >>>>>>>>> Notice ones published at 14 and 12
seconds.
> > > > >>>>>>>>>
> > > > >>>>>>>>> Now I am not sure if problem is with
Java ie I should use
> > > > >> Comparator
> > > > >>>>>>>>> interface and not Comparable for
my Message object. Or the
> > > > problem
> > > > >> is
> > > > >>>>>>>> with
> > > > >>>>>>>>> Kafka stream or with serializing
and de-serializing the set
> > of
> > > > >>>>>>> messages.
> > > > >>>>>>>> If
> > > > >>>>>>>>> I replace Set with List all is working
fine.
> > > > >>>>>>>>>
> > > > >>>>>>>>> Anyway any ideas here would be appreciated,
meanwhile let
> me
> > > see
> > > > >> what
> > > > >>>>>>> is
> > > > >>>>>>>>> the best java practice here.
> > > > >>>>>>>>>
> > > > >>>>>>>>> Thanks
> > > > >>>>>>>>> Sachin
> > > > >>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>> On Mon, Nov 21, 2016 at 8:29 PM,
Michael Noll <
> > > > >> michael@confluent.io>
> > > > >>>>>>>>> wrote:
> > > > >>>>>>>>>
> > > > >>>>>>>>>> On Mon, Nov 21, 2016 at 1:06
PM, Sachin Mittal <
> > > > >> sjmittal@gmail.com
> > > > >>>>>>>
> > > > >>>>>>>>> wrote:
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> I am using kafka_2.10-0.10.0.1.
> > > > >>>>>>>>>>> Say I am having a window
of 60 minutes advanced by 15
> > > minutes.
> > > > >>>>>>>>>>> If the stream app using timestamp
extractor puts the
> > message
> > > in
> > > > >>>>>> one
> > > > >>>>>>>> or
> > > > >>>>>>>>>> more
> > > > >>>>>>>>>>> bucket(s), it will get aggregated
in those buckets.
> > > > >>>>>>>>>>> I assume this statement is
correct.
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> Yes.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> Also say when I restart the
streams application then
> bucket
> > > > >>>>>>>> aggregation
> > > > >>>>>>>>>>> will resume from last point
of halt.
> > > > >>>>>>>>>>> I hope this is also correct.
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> Yes.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> What I noticed that once
a message is placed in one
> bucket,
> > > > that
> > > > >>>>>>>> bucket
> > > > >>>>>>>>>> was
> > > > >>>>>>>>>>> not getting new messages.
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> This should not happen...
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> However when I ran a small
test case replicating that, it
> > is
> > > > >>>>>>> working
> > > > >>>>>>>>>>> properly. There maybe some
issues in application reset.
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> ...and apparently it works (as
expected) in your small
> test
> > > > case.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> Do you have any further information
that you could share
> > with
> > > us
> > > > >> so
> > > > >>>>>>> we
> > > > >>>>>>>>> can
> > > > >>>>>>>>>> help you better?  What's the
difference, for example,
> > between
> > > > your
> > > > >>>>>>>>> "normal"
> > > > >>>>>>>>>> use case and the small test case
you have been referring
> to?
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> -Michael
> > > > >>>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>> --
> > > > >>>>>>>> -- Guozhang
> > > > >>>>>>>>
> > > > >>>>>>>
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>> --
> > > > >>>>>> -- Guozhang
> > > > >>>>>>
> > > > >>>>>
> > > > >>>>
> > > > >>>>
> > > > >>>
> > > > >>
> > > > >>
> > > > >
> > > >
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message