kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Thomas Becker <tobec...@Tivo.com>
Subject Re: Custom stream processor not triggering #punctuate()
Date Thu, 30 Mar 2017 18:02:37 GMT
Does this fix the problem though? The docs indicate that new data is required for each *partition*,
not topic. Overall I think the "stream time" notion is a good thing for a lot of use-cases,
but some others definitely require wall-clock based windowing. Is something planned for this?

-Tommy

On Tue, 2017-03-28 at 10:45 +0100, Elliot Crosby-McCullough wrote:

Hi Michael,

My confusion was that the events are being created, transferred, and
received several seconds apart (longer than the punctuate schedule) with no
stalling because I'm triggering them by hand, so regardless of what
mechanism is being used for timing it should still be called.

That said, I've just noticed in the callout box that it will only advance
stream time if all input topics have new data which in my testing is not
the case, so I suppose I will need to attach the processor to each input
topic rather than processing them all at the same time (in this use case
they were being split back out in the processor).

Thanks,
Elliot

On 28 March 2017 at 10:18, Michael Noll <michael@confluent.io<mailto:michael@confluent.io>>
wrote:



Elliot,

in the current API, `punctuate()` is called based on the current
stream-time (which defaults to event-time), not based on the current
wall-clock time / processing-time.  See http://docs.confluent.io/
current/streams/faq.html#why-is-punctuate-not-called.  The stream-time is
advanced only when new input records are coming in, so if there's e.g. a
stall on incoming records, then `punctuate()` will not be called.

If you need to schedule a call every N minutes of wall-clock time you'd
need to use your own scheduler.

Does that help?
Michael



On Tue, Mar 28, 2017 at 10:58 AM, Elliot Crosby-McCullough <
elliot.crosby-mccullough@freeagent.com<mailto:elliot.crosby-mccullough@freeagent.com>>
wrote:



Hi there,

I've written a simple processor which expects to have #process called on


it


for each message and configures regular punctuate calls via
`context.schedule`.

Regardless of what configuration I try for timestamp extraction I cannot
get #punctuate to be called, despite #process being called for every
message (which are being sent several seconds apart).  I've set the
schedule as low as 1 (though the docs aren't clear whether that's micro,
milli, or just seconds) and tried both the wallclock time extractor and


the


default time extractor in both the global config and the state store


serde.



These particular messages are being generated by another kafka streams


DSL


application and I'm using kafka 0.10.2.0, so presumably they also have
automatically embedded timestamps.

I can't for the life of me figure out what's going on.  Could you clue me
in?

Thanks,
Elliot






--
[cid:1490896956.1694.21.camel@tivo.com] Tommy Becker
Senior Software Engineer
O +1 919.460.4747
tivo.com<http://www.tivo.com/>

________________________________

This email and any attachments may contain confidential and privileged material for the sole
use of the intended recipient. Any review, copying, or distribution of this email (or any
attachments) by others is prohibited. If you are not the intended recipient, please contact
the sender immediately and permanently delete this email and any attachments. No employee
or agent of TiVo Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc.
by email. Binding agreements with TiVo Inc. may only be made by a signed written agreement.
Mime
  • Unnamed multipart/related (inline, None, 0 bytes)
View raw message