kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mahendra Kariya <mahendra.kar...@go-jek.com>
Subject Re: Order of punctuate() and process() in a stream processor
Date Mon, 15 May 2017 00:49:58 GMT
We use Kafka Streams for quite a few aggregation tasks. For instance,
counting the number of messages with a particular status in a 1-minute time
window.

We have noticed that whenever we restart a stream, we see a sudden spike in
the aggregated numbers. After a few minutes, things are back to normal.
Could the above discussion be the reason for this?

Please note that we use custom timestamp extractor.



On Fri, May 12, 2017 at 11:24 PM, Matthias J. Sax <matthias@confluent.io>
wrote:

> I added the feedback to https://issues.apache.org/jira/browse/KAFKA-3514
>
> -Matthias
>
>
> On 5/12/17 10:38 AM, Thomas Becker wrote:
> > Thanks. I think the system time based punctuation scheme we were
> discussing would not result in repeated punctuations like this, but even
> using stream time it seems a bit odd. If you do anything in a punctuate
> call that is relatively expensive it's especially bad.
> >
> > ________________________________________
> > From: Matthias J. Sax [matthias@confluent.io]
> > Sent: Friday, May 12, 2017 1:18 PM
> > To: users@kafka.apache.org
> > Subject: Re: Order of punctuate() and process() in a stream processor
> >
> > Thanks for sharing.
> >
> > As punctuate is called with "streams time" you see the same time value
> > multiple times. It's again due to the coarse grained advance of "stream
> > time".
> >
> > @Thomas: I think, the way we handle it just simplifies the
> > implementation of punctuations. I don't see any other "advantage".
> >
> >
> > I will create a JIRA to track this -- we are currently working on some
> > improvements of punctuation and time management already, and it seems to
> > be another valuable improvement.
> >
> >
> > -Matthias
> >
> >
> > On 5/12/17 10:07 AM, Peter Sinoros Szabo wrote:
> >> Well, this is also a good question, because it is triggered with the
> same
> >> timestamp 3 times, so in order to create my update for both three
> seconds,
> >> I will have to count the number of punctuations and calculate the missed
> >> stream times for myself. It's ok for me to trigger it 3 times, but the
> >> timestamp should not be the same in each, but should be increased by the
> >> schedule time in each punctuate.
> >>
> >> - Sini
> >>
> >>
> >>
> >> From:   Thomas Becker <tobecker@Tivo.com>
> >> To:     "users@kafka.apache.org" <users@kafka.apache.org>
> >> Date:   2017/05/12 18:57
> >> Subject:        RE: Order of punctuate() and process() in a stream
> >> processor
> >>
> >>
> >>
> >> I'm a bit troubled by the fact that it fires 3 times despite the stream
> >> time being advanced all at once; is there a scenario when this is
> >> beneficial?
> >>
> >> ________________________________________
> >> From: Matthias J. Sax [matthias@confluent.io]
> >> Sent: Friday, May 12, 2017 12:38 PM
> >> To: users@kafka.apache.org
> >> Subject: Re: Order of punctuate() and process() in a stream processor
> >>
> >> Hi Peter,
> >>
> >> It's by design. Streams internally tracks time progress (so-called
> >> "streams time"). "streams time" get advanced *after* processing a
> record.
> >>
> >> Thus, in your case, "stream time" is still at its old value before it
> >> processed the first message of you send "burst". After that, "streams
> >> time" is advanced by 3 seconds, and thus, punctuate fires 3 time.
> >>
> >> I guess, we could change the design and include scheduled punctuations
> >> when advancing "streams time". But atm, we just don't do this.
> >>
> >> Does this make sense?
> >>
> >> Is this critical for your use case? Or do you just want to understand
> >> what's happening?
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 5/12/17 8:59 AM, Peter Sinoros Szabo wrote:
> >>> Hi,
> >>>
> >>>
> >>> Let's assume the following case.
> >>> - a stream processor that uses the Processor API
> >>> - context.schedule(1000) is called in the init()
> >>> - the processor reads only one topic that has one partition
> >>> - using custom timestamp extractor, but that timestamp is just a wall
> >>> clock time
> >>>
> >>>
> >>> Image the following events:
> >>> 1., for 10 seconds I send in 5 messages / second
> >>> 2., does not send any messages for 3 seconds
> >>> 3., starts the 5 messages / second again
> >>>
> >>> I see that punctuate() is not called during the 3 seconds when I do not
> >>> send any messages. This is ok according to the documentation, because
> >>> there is not any new messages to trigger the punctuate() call. When the
> >>> first few messages arrives after a restart the sending (point 3. above)
> >> I
> >>> see the following sequence of method calls:
> >>>
> >>> 1., process() on the 1st message
> >>> 2., punctuate() is called 3 times
> >>> 3., process() on the 2nd message
> >>> 4., process() on each following message
> >>>
> >>> What I would expect instead is that punctuate() is called first and
> then
> >>> process() is called on the messages, because the first message's
> >> timestamp
> >>> is already 3 seconds older then the last punctuate() was called, so the
> >>> first message belongs after the 3 punctuate() calls.
> >>>
> >>> Please let me know if this is a bug or intentional, in this case what
> is
> >>> the reason for processing one message before punctuate() is called?
> >>>
> >>>
> >>> Thanks,
> >>> Peter
> >>>
> >>> Péter Sinóros-Szabó
> >>> Software Engineer
> >>>
> >>> Ustream, an IBM Company
> >>> Andrassy ut 39, H-1061 Budapest
> >>> Mobile: +36203693050
> >>> Email: peter.sinoros-szabo@hu.ibm.com
> >>>
> >>
> >> ________________________________
> >>
> >> This email and any attachments may contain confidential and privileged
> >> material for the sole use of the intended recipient. Any review,
> copying,
> >> or distribution of this email (or any attachments) by others is
> >> prohibited. If you are not the intended recipient, please contact the
> >> sender immediately and permanently delete this email and any
> attachments.
> >> No employee or agent of TiVo Inc. is authorized to conclude any binding
> >> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
> >> Inc. may only be made by a signed written agreement.
> >>
> >>
> >>
> >>
> >>
> >
> > ________________________________
> >
> > This email and any attachments may contain confidential and privileged
> material for the sole use of the intended recipient. Any review, copying,
> or distribution of this email (or any attachments) by others is prohibited.
> If you are not the intended recipient, please contact the sender
> immediately and permanently delete this email and any attachments. No
> employee or agent of TiVo Inc. is authorized to conclude any binding
> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
> Inc. may only be made by a signed written agreement.
> >
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message