kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sachin Mittal <sjmit...@gmail.com>
Subject Re: Kafka windowed table not aggregating correctly
Date Sat, 10 Dec 2016 04:57:26 GMT
Hi,
I think windows retention period does not solves the problem, only delays
it.
Based on what I understand say I set the time to 1 year using until.
Then when I get the message with timestamp 1 year + 1 sec it will delete
the old windows and create new ones from that message.
Now let us say we get next message with timestamp 1 year - 1 sec, based on
what you said, it will ignore this message.

In my case we get messages from different sources whose clocks are not in
sync. So overall message come with increasing timestamp but for a short
duration there is no order guarantee.

So I think before deleting the older windows it should retain small portion
of old windows too, so nearby older messages are not dropped.

I suggest have something like windows.size.advanceBy.until.retain
Retain will retain the periods which fall under retain ms from the upper
bound.

So window can be defined as
TimeWindows.of("test-table", 3600 * 1000l).advanceBy(1800 *
1000l).untill(365 * 24 * 3600 * 1000l).retain(900 * 1000l)
So when dropping older windows it will retain the ones fall in last 15
minutes.


Please let me know in case I missed something on how and if at all older
messages are dropped.

Thanks
Sachin





On Sat, Dec 10, 2016 at 5:45 AM, Guozhang Wang <wangguoz@gmail.com> wrote:

> Assuming your windows retention period is the same to the window length,
> then it is true that ZZ will cause the current window to be dropper. And
> then when ZZA is recieved, it will not cause the old windows to be
> re-created but will be ignored since it is considered as "expired".
>
> Note that you can set the window retention period much longer than the
> window length itself, using the "until" API I mentioned above to handle any
> sudden future records.
>
>
>
> Guozhang
>
> On Thu, Dec 8, 2016 at 8:19 PM, Sachin Mittal <sjmittal@gmail.com> wrote:
>
> > Hi,
> > Right now in order to circumvent this problem I am using a timestamp
> whose
> > values increase by few ms as and when I get new records.
> > So lets say I have records in order
> > A -> lower limit TS + 1 sec
> > B -> lower limit TS + 3 sec
> > C -> lower limit TS + 5 sec
> > ..
> > Z -> upper limit TS - 1 sec
> >
> > Now say I get a record ZZ with ts upper limit TS + 1 sec I assume it will
> > drop the previous windows and create new ones based on this timestamp.
> > Please confirm this understanding.
> >
> > Now lets say I get new record ZZA with timestamp (old) upper limit TS - 1
> > sec, will this again cause new windows to be dropped and recreate older
> > windows fresh with all the older aggregation done so far lost?
> >
> > Thanks
> > Sachin
> >
> >
> >
> >
> > On Fri, Dec 9, 2016 at 12:16 AM, Guozhang Wang <wangguoz@gmail.com>
> wrote:
> >
> > > Hello Sachin,
> > >
> > > I am with you that ideally the windowing segmentation implementation
> > should
> > > be totally abstracted from users but today it is a bit confusing to
> > > understand. I have filed JIRA some time ago to improve on this end:
> > >
> > > https://issues.apache.org/jira/browse/KAFKA-3596
> > >
> > > So to your example, if a "far future record" was received whose
> timestamp
> > > is beyond current time + the retention period, it could potentially
> cause
> > > the current window to be dropped.
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Fri, Dec 2, 2016 at 10:07 PM, Sachin Mittal <sjmittal@gmail.com>
> > wrote:
> > >
> > > > Hi,
> > > > I think now it makes all the sense. The field I was using for
> timestamp
> > > > extractor contains timestamps which spans for greater than a day's
> > > duration
> > > > and it worked for wall clock because for short duration timestamps
> were
> > > in
> > > > day's range.
> > > >
> > > > I wanted to understand one thing:
> > > > Say I have a timestamp extractor field and as record gets ingested
> > future
> > > > records will have increasing values for the timestamp.
> > > >
> > > > Now lets say default duration is one day. At a future time a record
> > will
> > > > have timestamp which now is greater than the initial day's range.
> > > > What will happen then, it will create a new segment and then create
> > > windows
> > > > in it for the next day's duration?
> > > > What happens if now it gets a record from the previous day, will it
> get
> > > > discarded or will it again have just the single value aggregated in
> it
> > > > (previous values are lost).
> > > > So when new segment is create as I understand does it retain the
> older
> > > > segments data.
> > > >
> > > > This is bit confusing, so would be helpful if you can explain in bit
> > more
> > > > detail.
> > > >
> > > > Thanks
> > > > Sachin
> > > >
> > > >
> > > > On Sat, Dec 3, 2016 at 5:18 AM, Guozhang Wang <wangguoz@gmail.com>
> > > wrote:
> > > >
> > > > > Sachin,
> > > > >
> > > > > One thing to note is that the retention of the windowed stores
> works
> > by
> > > > > keeping multiple segments of the stores where each segments stores
> a
> > > time
> > > > > range which can potentially span multiple windows, if a new window
> > > needs
> > > > to
> > > > > be created that is further from the oldest segment's time range +
> > > > retention
> > > > > period (from your code it seems you do not override it from
> > > > > TimeWindows.of("stream-table",
> > > > > 10 * 1000L).advanceBy(5 * 1000L), via until(...)), so the default
> of
> > > one
> > > > > day is used.
> > > > >
> > > > > So with WallclockTimeExtractor since it is using system time, it
> wont
> > > > give
> > > > > you timestamps that span for more than a day during a short period
> of
> > > > time,
> > > > > but if your own defined timestamps expand that value, then old
> > segments
> > > > > will be dropped immediately and hence the aggregate values will be
> > > > returned
> > > > > as a single value.
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Fri, Dec 2, 2016 at 11:58 AM, Matthias J. Sax <
> > > matthias@confluent.io>
> > > > > wrote:
> > > > >
> > > > > > The extractor is used in
> > > > > >
> > > > > > org.apache.kafka.streams.processor.internals.
> > > > RecordQueue#addRawRecords()
> > > > > >
> > > > > > Let us know, if you could resolve the problem or need more help.
> > > > > >
> > > > > > -Matthias
> > > > > >
> > > > > > On 12/2/16 11:46 AM, Sachin Mittal wrote:
> > > > > > > https://github.com/SOHU-Co/kafka-node/ this is the node
js
> > client
> > > i
> > > > am
> > > > > > > using. The version is 0.5x. Can you please tell me what
code in
> > > > streams
> > > > > > > calls the timestamp extractor. I can look there to see
if there
> > is
> > > > any
> > > > > > > issue.
> > > > > > >
> > > > > > > Again issue happens only when producing the messages using
> > producer
> > > > > that
> > > > > > is
> > > > > > > compatible with kafka version 0.8x. I see that this producer
> does
> > > not
> > > > > > send
> > > > > > > a record timestamp as this was introduced in version 0.10
only.
> > > > > > >
> > > > > > > Thanks
> > > > > > > Sachin
> > > > > > >
> > > > > > > On 3 Dec 2016 1:03 a.m., "Matthias J. Sax" <
> > matthias@confluent.io>
> > > > > > wrote:
> > > > > > >
> > > > > > >> I am not sure what is happening. That's why it would
be good
> to
> > > > have a
> > > > > > >> toy example to reproduce the issue.
> > > > > > >>
> > > > > > >> What do you mean by "Kafka node version 0.5"?
> > > > > > >>
> > > > > > >> -Matthias
> > > > > > >>
> > > > > > >> On 12/2/16 11:30 AM, Sachin Mittal wrote:
> > > > > > >>> I can provide with the data but data does not seem
to be the
> > > issue.
> > > > > > >>> If I submit the same data and use same timestamp
extractor
> > using
> > > > the
> > > > > > >> java
> > > > > > >>> client with kafka version 0.10.0.1 aggregation
works fine.
> > > > > > >>> I find the issue only when submitting the data
with kafka
> node
> > > > > version
> > > > > > >> 0.5.
> > > > > > >>> It looks like the stream does not extract the time
correctly
> in
> > > > that
> > > > > > >> case.
> > > > > > >>>
> > > > > > >>> Thanks
> > > > > > >>> Sachin
> > > > > > >>>
> > > > > > >>> On 2 Dec 2016 11:41 p.m., "Matthias J. Sax" <
> > > matthias@confluent.io
> > > > >
> > > > > > >> wrote:
> > > > > > >>>
> > > > > > >>>> Can you provide example input data (including
timetamps) and
> > > > result.
> > > > > > >>>> What is the expected result (ie, what aggregation
do you
> > apply)?
> > > > > > >>>>
> > > > > > >>>>
> > > > > > >>>> -Matthias
> > > > > > >>>>
> > > > > > >>>> On 12/2/16 7:43 AM, Sachin Mittal wrote:
> > > > > > >>>>> Hi,
> > > > > > >>>>> After much debugging I found an issue with
timestamp
> > extractor.
> > > > > > >>>>>
> > > > > > >>>>> If I use a custom timestamp extractor with
following code:
> > > > > > >>>>>     public static class MessageTimestampExtractor
> implements
> > > > > > >>>>> TimestampExtractor {
> > > > > > >>>>>         public long extract(ConsumerRecord<Object,
Object>
> > > > record)
> > > > > {
> > > > > > >>>>>             if (record.value() instanceof
Message) {
> > > > > > >>>>>                 return ((Message) record.value()).ts;
> > > > > > >>>>>             } else {
> > > > > > >>>>>                 return record.timestamp();
> > > > > > >>>>>             }
> > > > > > >>>>>         }
> > > > > > >>>>>     }
> > > > > > >>>>>
> > > > > > >>>>> Here message has a long field ts which
stores the
> timestamp,
> > > the
> > > > > > >>>>> aggregation does not work.
> > > > > > >>>>> Note I have checked and ts has valid timestamp
values.
> > > > > > >>>>>
> > > > > > >>>>> However if I replace it with say
> WallclockTimestampExtractor
> > > > > > >> aggregation
> > > > > > >>>> is
> > > > > > >>>>> working fine.
> > > > > > >>>>>
> > > > > > >>>>> I do not understand what could be the issue
here.
> > > > > > >>>>>
> > > > > > >>>>> Also note I am using kafka streams version
0.10.0.1 and I
> am
> > > > > > publishing
> > > > > > >>>>> messages via
> > > > > > >>>>> https://github.com/SOHU-Co/kafka-node/
whose version is
> > quite
> > > > old
> > > > > > >> 0.5.x
> > > > > > >>>>>
> > > > > > >>>>> Let me know if there is some bug in time
stamp extractions.
> > > > > > >>>>>
> > > > > > >>>>> Thanks
> > > > > > >>>>> Sachin
> > > > > > >>>>>
> > > > > > >>>>>
> > > > > > >>>>>
> > > > > > >>>>> On Mon, Nov 28, 2016 at 11:52 PM, Guozhang
Wang <
> > > > > wangguoz@gmail.com>
> > > > > > >>>> wrote:
> > > > > > >>>>>
> > > > > > >>>>>> Sachin,
> > > > > > >>>>>>
> > > > > > >>>>>> This is indeed a bit wired, and we'd
like to try to
> > re-produce
> > > > > your
> > > > > > >>>> issue
> > > > > > >>>>>> locally. Do you have a sample input
data for us to try
> out?
> > > > > > >>>>>>
> > > > > > >>>>>> Guozhang
> > > > > > >>>>>>
> > > > > > >>>>>> On Fri, Nov 25, 2016 at 10:12 PM, Sachin
Mittal <
> > > > > sjmittal@gmail.com
> > > > > > >
> > > > > > >>>>>> wrote:
> > > > > > >>>>>>
> > > > > > >>>>>>> Hi,
> > > > > > >>>>>>> I fixed that sorted set issue but
I am facing a weird
> > problem
> > > > > > which I
> > > > > > >>>> am
> > > > > > >>>>>>> not able to replicate.
> > > > > > >>>>>>>
> > > > > > >>>>>>> Here is the sample problem that
I could isolate:
> > > > > > >>>>>>> My class is like this:
> > > > > > >>>>>>>     public static class Message
implements
> > > Comparable<Message>
> > > > {
> > > > > > >>>>>>>         public long ts;
> > > > > > >>>>>>>         public String message;
> > > > > > >>>>>>>         public String key;
> > > > > > >>>>>>>         public Message() {};
> > > > > > >>>>>>>         public Message(long ts,
String message, String
> > key) {
> > > > > > >>>>>>>             this.ts = ts;
> > > > > > >>>>>>>             this.key = key;
> > > > > > >>>>>>>             this.message = message;
> > > > > > >>>>>>>         }
> > > > > > >>>>>>>         public int compareTo(Message
paramT) {
> > > > > > >>>>>>>             long ts1 = paramT.ts;
> > > > > > >>>>>>>             return ts > ts1
? 1 : -1;
> > > > > > >>>>>>>         }
> > > > > > >>>>>>>     }
> > > > > > >>>>>>>
> > > > > > >>>>>>> pipeline is like this:
> > > > > > >>>>>>> builder.stream(Serdes.String(),
messageSerde,
> > > > > > "test-window-stream")\
> > > > > > >>>>>>>  .map(new KeyValueMapper<String,
Message,
> KeyValue<String,
> > > > > > >> Message>>()
> > > > > > >>>> {
> > > > > > >>>>>>>      public KeyValue<String,
Message> apply(String key,
> > > Message
> > > > > > >> value)
> > > > > > >>>> {
> > > > > > >>>>>>>          return new KeyValue<String,
Message>(value.key,
> > > > value);
> > > > > > >>>>>>>       }
> > > > > > >>>>>>>  })
> > > > > > >>>>>>> .through(Serdes.String(), messageSerde,
> > > > "test-window-key-stream")
> > > > > > >>>>>>> .aggregateByKey(new Initializer<SortedSet<Message>>()
{
> > > > > > >>>>>>>     public SortedSet<Message>
apply() {
> > > > > > >>>>>>>         return new TreeSet<Message>();
> > > > > > >>>>>>>     }
> > > > > > >>>>>>> }, new Aggregator<String, Message,
SortedSet<Message>>()
> {
> > > > > > >>>>>>>     public SortedSet<Message>
apply(String aggKey,
> Message
> > > > value,
> > > > > > >>>>>>> SortedSet<Message> aggregate)
{
> > > > > > >>>>>>>         aggregate.add(value);
> > > > > > >>>>>>>         return aggregate;
> > > > > > >>>>>>>     }
> > > > > > >>>>>>> }, TimeWindows.of("stream-table",
10 *
> 1000L).advanceBy(5 *
> > > > > 1000L),
> > > > > > >>>>>>> Serdes.String(), messagesSerde)
> > > > > > >>>>>>> .foreach(new ForeachAction<Windowed<String>,
> > > > > > SortedSet<Message>>() {
> > > > > > >>>>>>>     public void apply(Windowed<String>
key,
> > > SortedSet<Message>
> > > > > > >>>> messages)
> > > > > > >>>>>> {
> > > > > > >>>>>>>         ...
> > > > > > >>>>>>>     }
> > > > > > >>>>>>> });
> > > > > > >>>>>>>
> > > > > > >>>>>>> So basically I rekey the original
message into another
> > topic
> > > > and
> > > > > > then
> > > > > > >>>>>>> aggregate it based on that key.
> > > > > > >>>>>>> What I have observed is that when
I used windowed
> > aggregation
> > > > the
> > > > > > >>>>>>> aggregator does not use previous
aggregated value.
> > > > > > >>>>>>>
> > > > > > >>>>>>> public SortedSet<Message>
apply(String aggKey, Message
> > value,
> > > > > > >>>>>>> SortedSet<Message> aggregate)
{
> > > > > > >>>>>>>     aggregate.add(value);
> > > > > > >>>>>>>     return aggregate;
> > > > > > >>>>>>> }
> > > > > > >>>>>>>
> > > > > > >>>>>>> So in the above function the aggregate
is an empty set of
> > > every
> > > > > > value
> > > > > > >>>>>>> entering into pipeline. When I
remove the windowed
> > > aggregation,
> > > > > the
> > > > > > >>>>>>> aggregate set retains previously
aggregated values in the
> > > set.
> > > > > > >>>>>>>
> > > > > > >>>>>>> I am just not able to wrap my head
around it. When I ran
> > this
> > > > > type
> > > > > > of
> > > > > > >>>>>> test
> > > > > > >>>>>>> locally on windows it is working
fine. However a similar
> > > > pipeline
> > > > > > >> setup
> > > > > > >>>>>>> when run against production on
linux is behaving
> strangely
> > > and
> > > > > > always
> > > > > > >>>>>>> getting an empty aggregate set.
> > > > > > >>>>>>> Any idea what could be the reason,
where should I look at
> > the
> > > > > > >> problem.
> > > > > > >>>>>> Does
> > > > > > >>>>>>> length of key string matters here?
I will later try to
> run
> > > the
> > > > > same
> > > > > > >>>>>> simple
> > > > > > >>>>>>> setup on linux and see what happens.
But this is a very
> > > strange
> > > > > > >>>> behavior.
> > > > > > >>>>>>>
> > > > > > >>>>>>> Thanks
> > > > > > >>>>>>> Sachin
> > > > > > >>>>>>>
> > > > > > >>>>>>>
> > > > > > >>>>>>>
> > > > > > >>>>>>> On Wed, Nov 23, 2016 at 12:04 AM,
Guozhang Wang <
> > > > > > wangguoz@gmail.com>
> > > > > > >>>>>>> wrote:
> > > > > > >>>>>>>
> > > > > > >>>>>>>> Hello Sachin,
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> In the implementation of SortedSet,
if the object's
> > > > implemented
> > > > > > the
> > > > > > >>>>>>>> Comparable interface, that
compareTo function is applied
> > in
> > > "
> > > > > > >>>>>>>> aggregate.add(value);", and
hence if it returns 0, this
> > > > element
> > > > > > will
> > > > > > >>>>>> not
> > > > > > >>>>>>> be
> > > > > > >>>>>>>> added since it is a Set.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> Guozhang
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> On Mon, Nov 21, 2016 at 10:06
PM, Sachin Mittal <
> > > > > > sjmittal@gmail.com
> > > > > > >>>
> > > > > > >>>>>>>> wrote:
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>> Hi,
> > > > > > >>>>>>>>> What I find is that when
I use sorted set as
> aggregation
> > it
> > > > > fails
> > > > > > >> to
> > > > > > >>>>>>>>> aggregate the values which
have compareTo returning 0.
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> My class is like this:
> > > > > > >>>>>>>>>     public class Message
implements
> Comparable<Message> {
> > > > > > >>>>>>>>>         public long ts;
> > > > > > >>>>>>>>>         public String message;
> > > > > > >>>>>>>>>         public Message()
{};
> > > > > > >>>>>>>>>         public Message(long
ts, String message) {
> > > > > > >>>>>>>>>             this.ts = ts;
> > > > > > >>>>>>>>>             this.message
= message;
> > > > > > >>>>>>>>>         }
> > > > > > >>>>>>>>>         public int compareTo(Message
paramT) {
> > > > > > >>>>>>>>>             long ts1 =
paramT.ts;
> > > > > > >>>>>>>>>             return ts ==
ts1 ? 0 : ts > ts1 ? 1 : -1;
> > > > > > >>>>>>>>>         }
> > > > > > >>>>>>>>>     }
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> pipeline is like this:
> > > > > > >>>>>>>>> builder.stream(Serdes.String(),
messageSerde,
> > > > > > >> "test-window-stream")
> > > > > > >>>>>>>>> .aggregateByKey(new Initializer<SortedSet<Message>>()
> {
> > > > > > >>>>>>>>>     public SortedSet<Message>
apply() {
> > > > > > >>>>>>>>>         return new TreeSet<Message>();
> > > > > > >>>>>>>>>     }
> > > > > > >>>>>>>>> }, new Aggregator<String,
Message,
> SortedSet<Message>>()
> > {
> > > > > > >>>>>>>>>     public SortedSet<Message>
apply(String aggKey,
> > Message
> > > > > value,
> > > > > > >>>>>>>>> SortedSet<Message>
aggregate) {
> > > > > > >>>>>>>>>         aggregate.add(value);
> > > > > > >>>>>>>>>         return aggregate;
> > > > > > >>>>>>>>>     }
> > > > > > >>>>>>>>> }, TimeWindows.of("stream-table",
10 *
> > 1000L).advanceBy(5 *
> > > > > > 1000L),
> > > > > > >>>>>>>>> Serdes.String(), messagesSerde)
> > > > > > >>>>>>>>> .foreach(new ForeachAction<Windowed<String>,
> > > > > > >> SortedSet<Message>>() {
> > > > > > >>>>>>>>>     public void apply(Windowed<String>
key,
> > > > SortedSet<Message>
> > > > > > >>>>>>> messages)
> > > > > > >>>>>>>> {
> > > > > > >>>>>>>>>         ...
> > > > > > >>>>>>>>>     }
> > > > > > >>>>>>>>> });
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> So any message published
between 10 and 20 seconds gets
> > > > > > aggregated
> > > > > > >> in
> > > > > > >>>>>>> 10
> > > > > > >>>>>>>> -
> > > > > > >>>>>>>>> 20 bucket and I print the
size of the set.
> > > > > > >>>>>>>>> However output I get is
following:
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> Published: 14
> > > > > > >>>>>>>>> Aggregated: 10  20 ->
1
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> Published: 18
> > > > > > >>>>>>>>> Aggregated: 10  20 ->
2
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> Published: 11
> > > > > > >>>>>>>>> Aggregated: 10  20 ->
3
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> Published: 17
> > > > > > >>>>>>>>> Aggregated: 10  20 ->
4
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> Published: 14
> > > > > > >>>>>>>>> Aggregated: 10  20 ->
4
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> Published: 15
> > > > > > >>>>>>>>> Aggregated: 10  20 ->
5
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> Published: 12
> > > > > > >>>>>>>>> Aggregated: key2  10  20
-> 6
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> Published: 12
> > > > > > >>>>>>>>> Aggregated: 10  20 ->
6
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> So if you see any message
that occurs again for same
> > > second,
> > > > > > where
> > > > > > >>>>>>>>> compareTo returns 0, it
fails to get aggregated in the
> > > > > pipeline.
> > > > > > >>>>>>>>> Notice ones published at
14 and 12 seconds.
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> Now I am not sure if problem
is with Java ie I should
> use
> > > > > > >> Comparator
> > > > > > >>>>>>>>> interface and not Comparable
for my Message object. Or
> > the
> > > > > > problem
> > > > > > >> is
> > > > > > >>>>>>>> with
> > > > > > >>>>>>>>> Kafka stream or with serializing
and de-serializing the
> > set
> > > > of
> > > > > > >>>>>>> messages.
> > > > > > >>>>>>>> If
> > > > > > >>>>>>>>> I replace Set with List
all is working fine.
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> Anyway any ideas here would
be appreciated, meanwhile
> let
> > > me
> > > > > see
> > > > > > >> what
> > > > > > >>>>>>> is
> > > > > > >>>>>>>>> the best java practice
here.
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> Thanks
> > > > > > >>>>>>>>> Sachin
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> On Mon, Nov 21, 2016 at
8:29 PM, Michael Noll <
> > > > > > >> michael@confluent.io>
> > > > > > >>>>>>>>> wrote:
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>>> On Mon, Nov 21, 2016
at 1:06 PM, Sachin Mittal <
> > > > > > >> sjmittal@gmail.com
> > > > > > >>>>>>>
> > > > > > >>>>>>>>> wrote:
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>>> I am using kafka_2.10-0.10.0.1.
> > > > > > >>>>>>>>>>> Say I am having
a window of 60 minutes advanced by 15
> > > > > minutes.
> > > > > > >>>>>>>>>>> If the stream app
using timestamp extractor puts the
> > > > message
> > > > > in
> > > > > > >>>>>> one
> > > > > > >>>>>>>> or
> > > > > > >>>>>>>>>> more
> > > > > > >>>>>>>>>>> bucket(s), it will
get aggregated in those buckets.
> > > > > > >>>>>>>>>>> I assume this statement
is correct.
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> Yes.
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> Also say when I
restart the streams application then
> > > bucket
> > > > > > >>>>>>>> aggregation
> > > > > > >>>>>>>>>>> will resume from
last point of halt.
> > > > > > >>>>>>>>>>> I hope this is
also correct.
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> Yes.
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>>> What I noticed
that once a message is placed in one
> > > bucket,
> > > > > > that
> > > > > > >>>>>>>> bucket
> > > > > > >>>>>>>>>> was
> > > > > > >>>>>>>>>>> not getting new
messages.
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> This should not happen...
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>>> However when I
ran a small test case replicating
> that,
> > it
> > > > is
> > > > > > >>>>>>> working
> > > > > > >>>>>>>>>>> properly. There
maybe some issues in application
> reset.
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> ...and apparently it
works (as expected) in your small
> > > test
> > > > > > case.
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> Do you have any further
information that you could
> share
> > > > with
> > > > > us
> > > > > > >> so
> > > > > > >>>>>>> we
> > > > > > >>>>>>>>> can
> > > > > > >>>>>>>>>> help you better?  What's
the difference, for example,
> > > > between
> > > > > > your
> > > > > > >>>>>>>>> "normal"
> > > > > > >>>>>>>>>> use case and the small
test case you have been
> referring
> > > to?
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> -Michael
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> --
> > > > > > >>>>>>>> -- Guozhang
> > > > > > >>>>>>>>
> > > > > > >>>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>> --
> > > > > > >>>>>> -- Guozhang
> > > > > > >>>>>>
> > > > > > >>>>>
> > > > > > >>>>
> > > > > > >>>>
> > > > > > >>>
> > > > > > >>
> > > > > > >>
> > > > > > >
> > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message