kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Frank Lyaruu <flya...@gmail.com>
Subject Re: Losing messages in Kafka Streams after upgrading
Date Mon, 05 Jun 2017 13:15:46 GMT
Thanks Guozhang,

I figured I could use a custom timestamp extractor, and set that timestamp
to 'now' when reading a source topic, as the original timestamp is pretty
much irrelevant. I thought that the Timestamp extractor would then also use
that updated timestamp as 'stream time', but I don't really see that
happening, so that assumption was wrong.

If I could configure a timestamp extractor that would also be used by the
producer I think I'd be in business, but right now I don't see an elegant
way forward, so any ideas for work arounds are welcome.

regards, Frank

On Mon, Jun 5, 2017 at 7:01 AM, Guozhang Wang <wangguoz@gmail.com> wrote:

> Frank, thanks for sharing with your findings.
>
> I think this is a general issue to consider in Streams, and the community
> has been thinking about it: we write intermediate topics with the stream
> time that is inherited from the source topic's timestamps, however that
> timestamp is used for log rolling / retention etc as well, and these two
> purposes (use timestamps in processing for out-of-ordering and late
> arrivals, and operations on the Kafka topics) could rely on different
> timestamp semantics. We need to revisit on timestamps can be maintained
> across the topology in Streams.
>
> Guozhang
>
> On Sat, Jun 3, 2017 at 10:54 AM, Frank Lyaruu <flyaruu@gmail.com> wrote:
>
> > Hi Matthias,
> >
> > Ok, that clarifies quite a bit. I never really went into the timestamp
> > aspects, as time does not really play a role in my application (aside
> from
> > the repartition topics, I have no KStreams or Windowed operation, just
> > different kind of KTable join).
> >
> > I do think that the fail case I see (With this version joining two 'old'
> > KTables causes a small percentage of records to vanish) is far from
> > intuitive, and it somehow worked fine until a few weeks ago.
> >
> > I think your option 3 should work. I'll make a custom timestamp extractor
> > (I actually do have a timestamp in my messages), and I'll set it to the
> > current time as they enter the streams application.
> >
> > Thanks, that helped, regards, Frank
> >
> > On Fri, Jun 2, 2017 at 9:17 PM, Matthias J. Sax <matthias@confluent.io>
> > wrote:
> >
> > > Hi Frank,
> > >
> > > yes, retention policy is based on the embedded record timestamps and
> not
> > > on system time. Thus, if you send messages with an old timestamp, they
> > > can trigger log/segment rolling.
> > >
> > > >> I see that the repartition topics have timestamp.type = CreateTime,
> > does
> > > >> that mean it uses the timestamp of the
> > > >> original message?
> > >
> > > Yes. That's the default setting on the broker side. For Streams, we
> > > maintain a so-called "stream time" that is computed based on the input
> > > record timestamps. This "stream time" is used to set the timestamp for
> > > records that are written by Stream. (so it's more or less the timestamp
> > > of the input records).
> > >
> > > >> Shouldn't that be LogAppendTime for repartition topics?
> > >
> > > No. Streams needs to preserve the original timestamp to guaranteed
> > > correct semantics for downstream window operations. Thus, it should be
> > > CreateTime -- if you switch to LogAppendTime, you might break your
> > > application logic and get wrong results.
> > >
> > > >> Or is there a way to configure that?
> > >
> > > You can configure this on a per topic basis on the brokers.
> > >
> > > >> If I hack into my Kafka streams code to force it to use
> LogAppendTime
> > > seems
> > > >> to solve my problem, but that appears to
> > > >> take a huge toll on the brokers. Throughput plummets, and I don't
> > really
> > > >> know why.
> > >
> > > I am not sure what you mean by this? As it's a topic config, I don't
> > > understand how you can force this within you Streams application?
> > >
> > >
> > > IMHO, you have multiple options thoug:
> > >  - increase the retention time for you re-partitioning topics
> > >  - you could change the retention policy to number of bytes instead of
> > > time for the re-partitioning topics
> > >  - you can implement a custom timestamp extractor and adjust the
> > > timestamps accordingly ("stream time" is based on whatever timestamp
> > > extractor return)
> > >
> > > However, if you have records with old timestamps, I am wondering why
> > > they are not truncated in your input topic? Do you not face the same
> > > issue there?
> > >
> > > All my topics are compacted, I use no windowed operations at all, the
> > only
> > 'delete'
> > topics are the repartitioning internal topics.
> >
> > >
> > > -Matthias
> > >
> > >
> > >
> > >
> > >
> > > On 6/2/17 9:33 AM, Frank Lyaruu wrote:
> > > > Hi Kafka people,
> > > >
> > > > I'm running an application that pushes database changes into a Kafka
> > > topic.
> > > > I'm also running a Kafka streams application
> > > > that listens to these topics, and groups them using the high level
> API,
> > > and
> > > > inserts them to another database.
> > > >
> > > > All topics are compacted, with the exception of the 'repartition
> > topics',
> > > > which are configured to be retained for 36 hours.
> > > >
> > > > Note that the changes in the original kafka topics can be old
> > (generally
> > > > more than 36 hours), as they only change when
> > > > the data changes.
> > > >
> > > > When I start an instance of the Kafka Streams application, I see the
> > > > repartition topics being deleted immediately,
> > > > sometimes before they are processed, and it looks like the
> repartition
> > > > messages use the same timestamp as the
> > > > original message.
> > > >
> > > > I see that the repartition topics have timestamp.type = CreateTime,
> > does
> > > > that mean it uses the timestamp of the
> > > > original message? Shouldn't that be LogAppendTime for repartition
> > topics?
> > > > Or is there a way to configure that?
> > > >
> > > > If I hack into my Kafka streams code to force it to use LogAppendTime
> > > seems
> > > > to solve my problem, but that appears to
> > > > take a huge toll on the brokers. Throughput plummets, and I don't
> > really
> > > > know why.
> > > >
> > > > Any ideas?
> > > >
> > > > Frank
> > > >
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message