kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: Losing messages in Kafka Streams after upgrading
Date Mon, 05 Jun 2017 05:01:38 GMT
Frank, thanks for sharing with your findings.

I think this is a general issue to consider in Streams, and the community
has been thinking about it: we write intermediate topics with the stream
time that is inherited from the source topic's timestamps, however that
timestamp is used for log rolling / retention etc as well, and these two
purposes (use timestamps in processing for out-of-ordering and late
arrivals, and operations on the Kafka topics) could rely on different
timestamp semantics. We need to revisit on timestamps can be maintained
across the topology in Streams.

Guozhang

On Sat, Jun 3, 2017 at 10:54 AM, Frank Lyaruu <flyaruu@gmail.com> wrote:

> Hi Matthias,
>
> Ok, that clarifies quite a bit. I never really went into the timestamp
> aspects, as time does not really play a role in my application (aside from
> the repartition topics, I have no KStreams or Windowed operation, just
> different kind of KTable join).
>
> I do think that the fail case I see (With this version joining two 'old'
> KTables causes a small percentage of records to vanish) is far from
> intuitive, and it somehow worked fine until a few weeks ago.
>
> I think your option 3 should work. I'll make a custom timestamp extractor
> (I actually do have a timestamp in my messages), and I'll set it to the
> current time as they enter the streams application.
>
> Thanks, that helped, regards, Frank
>
> On Fri, Jun 2, 2017 at 9:17 PM, Matthias J. Sax <matthias@confluent.io>
> wrote:
>
> > Hi Frank,
> >
> > yes, retention policy is based on the embedded record timestamps and not
> > on system time. Thus, if you send messages with an old timestamp, they
> > can trigger log/segment rolling.
> >
> > >> I see that the repartition topics have timestamp.type = CreateTime,
> does
> > >> that mean it uses the timestamp of the
> > >> original message?
> >
> > Yes. That's the default setting on the broker side. For Streams, we
> > maintain a so-called "stream time" that is computed based on the input
> > record timestamps. This "stream time" is used to set the timestamp for
> > records that are written by Stream. (so it's more or less the timestamp
> > of the input records).
> >
> > >> Shouldn't that be LogAppendTime for repartition topics?
> >
> > No. Streams needs to preserve the original timestamp to guaranteed
> > correct semantics for downstream window operations. Thus, it should be
> > CreateTime -- if you switch to LogAppendTime, you might break your
> > application logic and get wrong results.
> >
> > >> Or is there a way to configure that?
> >
> > You can configure this on a per topic basis on the brokers.
> >
> > >> If I hack into my Kafka streams code to force it to use LogAppendTime
> > seems
> > >> to solve my problem, but that appears to
> > >> take a huge toll on the brokers. Throughput plummets, and I don't
> really
> > >> know why.
> >
> > I am not sure what you mean by this? As it's a topic config, I don't
> > understand how you can force this within you Streams application?
> >
> >
> > IMHO, you have multiple options thoug:
> >  - increase the retention time for you re-partitioning topics
> >  - you could change the retention policy to number of bytes instead of
> > time for the re-partitioning topics
> >  - you can implement a custom timestamp extractor and adjust the
> > timestamps accordingly ("stream time" is based on whatever timestamp
> > extractor return)
> >
> > However, if you have records with old timestamps, I am wondering why
> > they are not truncated in your input topic? Do you not face the same
> > issue there?
> >
> > All my topics are compacted, I use no windowed operations at all, the
> only
> 'delete'
> topics are the repartitioning internal topics.
>
> >
> > -Matthias
> >
> >
> >
> >
> >
> > On 6/2/17 9:33 AM, Frank Lyaruu wrote:
> > > Hi Kafka people,
> > >
> > > I'm running an application that pushes database changes into a Kafka
> > topic.
> > > I'm also running a Kafka streams application
> > > that listens to these topics, and groups them using the high level API,
> > and
> > > inserts them to another database.
> > >
> > > All topics are compacted, with the exception of the 'repartition
> topics',
> > > which are configured to be retained for 36 hours.
> > >
> > > Note that the changes in the original kafka topics can be old
> (generally
> > > more than 36 hours), as they only change when
> > > the data changes.
> > >
> > > When I start an instance of the Kafka Streams application, I see the
> > > repartition topics being deleted immediately,
> > > sometimes before they are processed, and it looks like the repartition
> > > messages use the same timestamp as the
> > > original message.
> > >
> > > I see that the repartition topics have timestamp.type = CreateTime,
> does
> > > that mean it uses the timestamp of the
> > > original message? Shouldn't that be LogAppendTime for repartition
> topics?
> > > Or is there a way to configure that?
> > >
> > > If I hack into my Kafka streams code to force it to use LogAppendTime
> > seems
> > > to solve my problem, but that appears to
> > > take a huge toll on the brokers. Throughput plummets, and I don't
> really
> > > know why.
> > >
> > > Any ideas?
> > >
> > > Frank
> > >
> >
> >
>



-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message