kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sachin Mittal <sjmit...@gmail.com>
Subject Re: What timestamp is used by streams when doing windowed joins
Date Sat, 07 Dec 2019 07:19:56 GMT
Hi John,
If I check https://docs.confluent.io/current/streams/concepts.html#time
It has three notions of time => *event-time*, *processing-time*,
*ingestion-time* .

If I check
https://docs.confluent.io/current/streams/developer-guide/config-streams.html#streams-developer-guide-timestamp-extractor
It says that under default case:
if log.message.timestamp.type is set to CreateTime then * event-time* is
used.
if log.message.timestamp.type is set to LogAppendTime then *ingestion-time *is
used.

However what you are saying is that under Steams DSL we always use
event-time which can either be CreateTime or LogAppendTime.

Both of the statements makes sense to me but looks like they are sightly
different on how they relate the times.
One basically says
* event-time* <=> CreateTime
*ingestion-time *<=>  LogAppendTime
Where as other
event-time =  CreateTime or  LogAppendTime (depending on your broker/topic
config).

Yes setting  log.message.timestamp.type to LogAppendTime seems to be doing
window join better.

So what I understand than from JoinWindows.of(Duration.ofMinutes(5))is that
when joining the two records it checks if their LogAppendTime are within 5
minutes then they would get joined.
Please let me know if I got this part right?

Thanks
Sachin




On Sat, Dec 7, 2019 at 10:43 AM John Roesler <vvcephei@apache.org> wrote:

> Hi Sachin,
>
> I'd need more information to speculate about why your records are missing,
> but it sounds like you're suspecting something to do with the records'
> timestamps, so I'll just focus on answering your questions.
>
> Streams always uses the same timestamp for all operations, which is the
> timestamp returned by the timestamp extractor. Whether this is event time
> or ingestion time is up to the timestamp extractor you're using.
>
> If you're using the default timestamp extractor, then Streams will use the
> timestamp field on the ConsumerRecord that comes back from the broker. If
> you're using CreateTime, then it would hold the value of the timestamp
> written by the producer. If you're using LogAppendTime, then it's the
> timestamp representing when the broker actually adds the record to the
> topic.
>
> One potential point of confusion is that when we say a "record", we mean
> more than just the key and value that you typically manipulate using the
> Streams DSL. In addition to these fields, there is a separate timestamp
> field, which is part of the Consumer/Producer/Broker protocols. That's what
> we use for time tracking, so you do not need to worry about embedding and
> extracting the timestamp in your values.
>
> Streams will set the timestamp field on outgoing ProducerRecords it sends
> to the broker, so this would just be used by default for further stages in
> the pipeline. You don't need to add timestamp extractors further on.
>
> The only usage of processing time (aka "wall-clock time") is in wall-clock
> based punctuation, if you're using the low-level Processor API. Also, the
> commit interval is defined in terms of wall-clock time. If all you're
> considering is the semantics of the Streams DSL, processing/wall-clock time
> would not play any part in those semantics.
>
> I know that stream processing literature in general discusses event- vs.
> processing- vs. ingestion-time quite a bit, but for practical purposes,
> event time (either CreateTime or LogAppendTime) is the one that's useful
> for writing programs. Both ingestion time and processing time lead to
> non-deterministic programs with unclear semantics. That's why we pretty
> much stick to event time in the Streams DSL.
>
> Finally, yeah, if you just want to process records in the same order they
> appear in the topics, then LogAppendTime might be better.
>
> I hope this helps clear things up a bit.
>
> Thanks,
> -John
>
> On Fri, Dec 6, 2019, at 22:20, Sachin Mittal wrote:
> > Hi,
> > I have noticed some issues when doing stream to stream windowed joins.
> > Looks like my joined stream does not include all the records.
> >
> > Say I am doing join like this:
> > stream1.join(
> >             stream2,
> >             (lv, rv) -> ...,
> >             JoinWindows.of(Duration.ofMinutes(5)),
> >            ....)
> > What I have checked from the docs is that it will join 2 records within
> the
> > specified window.
> > However its not clear as what time it would take for each record?
> > Would it be
> > 1.  event-time or
> > 2. processing-time or
> > 3. ingestion-time
> >
> > I am right now using default configuration for
> > log.message.timestamp.type = CreateTime and default.timestamp.extractor
> >
> > From the docs I gather is that in default case it uses event-time.
> > So does it mean that there has to be a timestamp field in the record
> which
> > is to be extracted by custom timestamp extractor?
> >
> > Also in downstream when streams application actually writes (produces)
> new
> > record types, do we need to provide timestamp extractor for all such
> record
> > types
> > so the next process in the pipeline can pick up the timestamp to do the
> > windowed operations?
> >
> > Also when and how processing time is used at all by streams application?
> >
> > Finally say I don't want to worry about if timestamp is set by the
> > producers, is it better to simply set
> > log.message.timestamp.type =  LogAppendTime
> >
> > Thanks
> > Sachin
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message