kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "John Roesler" <vvcep...@apache.org>
Subject Re: What timestamp is used by streams when doing windowed joins
Date Sat, 07 Dec 2019 05:12:46 GMT
Hi Sachin,

I'd need more information to speculate about why your records are missing, but it sounds like
you're suspecting something to do with the records' timestamps, so I'll just focus on answering
your questions.

Streams always uses the same timestamp for all operations, which is the timestamp returned
by the timestamp extractor. Whether this is event time or ingestion time is up to the timestamp
extractor you're using.

If you're using the default timestamp extractor, then Streams will use the timestamp field
on the ConsumerRecord that comes back from the broker. If you're using CreateTime, then it
would hold the value of the timestamp written by the producer. If you're using LogAppendTime,
then it's the timestamp representing when the broker actually adds the record to the topic.

One potential point of confusion is that when we say a "record", we mean more than just the
key and value that you typically manipulate using the Streams DSL. In addition to these fields,
there is a separate timestamp field, which is part of the Consumer/Producer/Broker protocols.
That's what we use for time tracking, so you do not need to worry about embedding and extracting
the timestamp in your values.

Streams will set the timestamp field on outgoing ProducerRecords it sends to the broker, so
this would just be used by default for further stages in the pipeline. You don't need to add
timestamp extractors further on.

The only usage of processing time (aka "wall-clock time") is in wall-clock based punctuation,
if you're using the low-level Processor API. Also, the commit interval is defined in terms
of wall-clock time. If all you're considering is the semantics of the Streams DSL, processing/wall-clock
time would not play any part in those semantics.

I know that stream processing literature in general discusses event- vs. processing- vs. ingestion-time
quite a bit, but for practical purposes, event time (either CreateTime or LogAppendTime) is
the one that's useful for writing programs. Both ingestion time and processing time lead to
non-deterministic programs with unclear semantics. That's why we pretty much stick to event
time in the Streams DSL.

Finally, yeah, if you just want to process records in the same order they appear in the topics,
then LogAppendTime might be better. 

I hope this helps clear things up a bit.


On Fri, Dec 6, 2019, at 22:20, Sachin Mittal wrote:
> Hi,
> I have noticed some issues when doing stream to stream windowed joins.
> Looks like my joined stream does not include all the records.
> Say I am doing join like this:
> stream1.join(
>             stream2,
>             (lv, rv) -> ...,
>             JoinWindows.of(Duration.ofMinutes(5)),
>            ....)
> What I have checked from the docs is that it will join 2 records within the
> specified window.
> However its not clear as what time it would take for each record?
> Would it be
> 1.  event-time or
> 2. processing-time or
> 3. ingestion-time
> I am right now using default configuration for
> log.message.timestamp.type = CreateTime and default.timestamp.extractor
> From the docs I gather is that in default case it uses event-time.
> So does it mean that there has to be a timestamp field in the record which
> is to be extracted by custom timestamp extractor?
> Also in downstream when streams application actually writes (produces) new
> record types, do we need to provide timestamp extractor for all such record
> types
> so the next process in the pipeline can pick up the timestamp to do the
> windowed operations?
> Also when and how processing time is used at all by streams application?
> Finally say I don't want to worry about if timestamp is set by the
> producers, is it better to simply set
> log.message.timestamp.type =  LogAppendTime
> Thanks
> Sachin

View raw message