kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Frank Lyaruu <flya...@gmail.com>
Subject Re: Losing messages in Kafka Streams after upgrading
Date Sat, 03 Jun 2017 17:54:05 GMT
Hi Matthias,

Ok, that clarifies quite a bit. I never really went into the timestamp
aspects, as time does not really play a role in my application (aside from
the repartition topics, I have no KStreams or Windowed operation, just
different kind of KTable join).

I do think that the fail case I see (With this version joining two 'old'
KTables causes a small percentage of records to vanish) is far from
intuitive, and it somehow worked fine until a few weeks ago.

I think your option 3 should work. I'll make a custom timestamp extractor
(I actually do have a timestamp in my messages), and I'll set it to the
current time as they enter the streams application.

Thanks, that helped, regards, Frank

On Fri, Jun 2, 2017 at 9:17 PM, Matthias J. Sax <matthias@confluent.io>
wrote:

> Hi Frank,
>
> yes, retention policy is based on the embedded record timestamps and not
> on system time. Thus, if you send messages with an old timestamp, they
> can trigger log/segment rolling.
>
> >> I see that the repartition topics have timestamp.type = CreateTime, does
> >> that mean it uses the timestamp of the
> >> original message?
>
> Yes. That's the default setting on the broker side. For Streams, we
> maintain a so-called "stream time" that is computed based on the input
> record timestamps. This "stream time" is used to set the timestamp for
> records that are written by Stream. (so it's more or less the timestamp
> of the input records).
>
> >> Shouldn't that be LogAppendTime for repartition topics?
>
> No. Streams needs to preserve the original timestamp to guaranteed
> correct semantics for downstream window operations. Thus, it should be
> CreateTime -- if you switch to LogAppendTime, you might break your
> application logic and get wrong results.
>
> >> Or is there a way to configure that?
>
> You can configure this on a per topic basis on the brokers.
>
> >> If I hack into my Kafka streams code to force it to use LogAppendTime
> seems
> >> to solve my problem, but that appears to
> >> take a huge toll on the brokers. Throughput plummets, and I don't really
> >> know why.
>
> I am not sure what you mean by this? As it's a topic config, I don't
> understand how you can force this within you Streams application?
>
>
> IMHO, you have multiple options thoug:
>  - increase the retention time for you re-partitioning topics
>  - you could change the retention policy to number of bytes instead of
> time for the re-partitioning topics
>  - you can implement a custom timestamp extractor and adjust the
> timestamps accordingly ("stream time" is based on whatever timestamp
> extractor return)
>
> However, if you have records with old timestamps, I am wondering why
> they are not truncated in your input topic? Do you not face the same
> issue there?
>
> All my topics are compacted, I use no windowed operations at all, the only
'delete'
topics are the repartitioning internal topics.

>
> -Matthias
>
>
>
>
>
> On 6/2/17 9:33 AM, Frank Lyaruu wrote:
> > Hi Kafka people,
> >
> > I'm running an application that pushes database changes into a Kafka
> topic.
> > I'm also running a Kafka streams application
> > that listens to these topics, and groups them using the high level API,
> and
> > inserts them to another database.
> >
> > All topics are compacted, with the exception of the 'repartition topics',
> > which are configured to be retained for 36 hours.
> >
> > Note that the changes in the original kafka topics can be old (generally
> > more than 36 hours), as they only change when
> > the data changes.
> >
> > When I start an instance of the Kafka Streams application, I see the
> > repartition topics being deleted immediately,
> > sometimes before they are processed, and it looks like the repartition
> > messages use the same timestamp as the
> > original message.
> >
> > I see that the repartition topics have timestamp.type = CreateTime, does
> > that mean it uses the timestamp of the
> > original message? Shouldn't that be LogAppendTime for repartition topics?
> > Or is there a way to configure that?
> >
> > If I hack into my Kafka streams code to force it to use LogAppendTime
> seems
> > to solve my problem, but that appears to
> > take a huge toll on the brokers. Throughput plummets, and I don't really
> > know why.
> >
> > Any ideas?
> >
> > Frank
> >
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message