kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Frank Lyaruu <flya...@gmail.com>
Subject Re: Losing messages in Kafka Streams after upgrading
Date Wed, 07 Jun 2017 10:21:00 GMT
I tried to use a TimestampExtractor that uses our timestamps from the
messages, and use a 'map' operation on the KTable to set it to current, to
have a precise point where I discard our original timestamps. That does not
work, (I verified by writing a separate java Kafka Consumer and spit out
the timestamps) as the TimestampExtractor only gets called once, and it
will stick with that time. I did not really have a good reason not to
simply use the WallclockTimeExtractor, and that one seems to do exactly
what I wanted.

So, I'm good! I am interested in the community discussion Guozhang
mentions. Is there a KIP for that?

regards, Frank


On Mon, Jun 5, 2017 at 8:25 PM, Matthias J. Sax <matthias@confluent.io>
wrote:

> Frank,
>
> If you use "now", I assume you are calling System.currentTimeMillis().
> If yes, you can also use predefined WallclockTimestampExtractor that
> ships with Streams (no need to write your own one).
>
> > I thought that the Timestamp extractor would then also use
> >> that updated timestamp as 'stream time', but I don't really see that
> >> happening, so that assumption was wrong.
>
> Yes, this should happen. Not sure why you don't observe this. And thus,
> the producer should use this timestamp to write the records.
>
> How did you verify the timestamps that are set for your output records?
>
>
> -Matthias
>
>
> On 6/5/17 6:15 AM, Frank Lyaruu wrote:
> > Thanks Guozhang,
> >
> > I figured I could use a custom timestamp extractor, and set that
> timestamp
> > to 'now' when reading a source topic, as the original timestamp is pretty
> > much irrelevant. I thought that the Timestamp extractor would then also
> use
> > that updated timestamp as 'stream time', but I don't really see that
> > happening, so that assumption was wrong.
> >
> > If I could configure a timestamp extractor that would also be used by the
> > producer I think I'd be in business, but right now I don't see an elegant
> > way forward, so any ideas for work arounds are welcome.
> >
> > regards, Frank
> >
> > On Mon, Jun 5, 2017 at 7:01 AM, Guozhang Wang <wangguoz@gmail.com>
> wrote:
> >
> >> Frank, thanks for sharing with your findings.
> >>
> >> I think this is a general issue to consider in Streams, and the
> community
> >> has been thinking about it: we write intermediate topics with the stream
> >> time that is inherited from the source topic's timestamps, however that
> >> timestamp is used for log rolling / retention etc as well, and these two
> >> purposes (use timestamps in processing for out-of-ordering and late
> >> arrivals, and operations on the Kafka topics) could rely on different
> >> timestamp semantics. We need to revisit on timestamps can be maintained
> >> across the topology in Streams.
> >>
> >> Guozhang
> >>
> >> On Sat, Jun 3, 2017 at 10:54 AM, Frank Lyaruu <flyaruu@gmail.com>
> wrote:
> >>
> >>> Hi Matthias,
> >>>
> >>> Ok, that clarifies quite a bit. I never really went into the timestamp
> >>> aspects, as time does not really play a role in my application (aside
> >> from
> >>> the repartition topics, I have no KStreams or Windowed operation, just
> >>> different kind of KTable join).
> >>>
> >>> I do think that the fail case I see (With this version joining two
> 'old'
> >>> KTables causes a small percentage of records to vanish) is far from
> >>> intuitive, and it somehow worked fine until a few weeks ago.
> >>>
> >>> I think your option 3 should work. I'll make a custom timestamp
> extractor
> >>> (I actually do have a timestamp in my messages), and I'll set it to the
> >>> current time as they enter the streams application.
> >>>
> >>> Thanks, that helped, regards, Frank
> >>>
> >>> On Fri, Jun 2, 2017 at 9:17 PM, Matthias J. Sax <matthias@confluent.io
> >
> >>> wrote:
> >>>
> >>>> Hi Frank,
> >>>>
> >>>> yes, retention policy is based on the embedded record timestamps and
> >> not
> >>>> on system time. Thus, if you send messages with an old timestamp, they
> >>>> can trigger log/segment rolling.
> >>>>
> >>>>>> I see that the repartition topics have timestamp.type = CreateTime,
> >>> does
> >>>>>> that mean it uses the timestamp of the
> >>>>>> original message?
> >>>>
> >>>> Yes. That's the default setting on the broker side. For Streams, we
> >>>> maintain a so-called "stream time" that is computed based on the input
> >>>> record timestamps. This "stream time" is used to set the timestamp for
> >>>> records that are written by Stream. (so it's more or less the
> timestamp
> >>>> of the input records).
> >>>>
> >>>>>> Shouldn't that be LogAppendTime for repartition topics?
> >>>>
> >>>> No. Streams needs to preserve the original timestamp to guaranteed
> >>>> correct semantics for downstream window operations. Thus, it should
be
> >>>> CreateTime -- if you switch to LogAppendTime, you might break your
> >>>> application logic and get wrong results.
> >>>>
> >>>>>> Or is there a way to configure that?
> >>>>
> >>>> You can configure this on a per topic basis on the brokers.
> >>>>
> >>>>>> If I hack into my Kafka streams code to force it to use
> >> LogAppendTime
> >>>> seems
> >>>>>> to solve my problem, but that appears to
> >>>>>> take a huge toll on the brokers. Throughput plummets, and I
don't
> >>> really
> >>>>>> know why.
> >>>>
> >>>> I am not sure what you mean by this? As it's a topic config, I don't
> >>>> understand how you can force this within you Streams application?
> >>>>
> >>>>
> >>>> IMHO, you have multiple options thoug:
> >>>>  - increase the retention time for you re-partitioning topics
> >>>>  - you could change the retention policy to number of bytes instead
of
> >>>> time for the re-partitioning topics
> >>>>  - you can implement a custom timestamp extractor and adjust the
> >>>> timestamps accordingly ("stream time" is based on whatever timestamp
> >>>> extractor return)
> >>>>
> >>>> However, if you have records with old timestamps, I am wondering why
> >>>> they are not truncated in your input topic? Do you not face the same
> >>>> issue there?
> >>>>
> >>>> All my topics are compacted, I use no windowed operations at all, the
> >>> only
> >>> 'delete'
> >>> topics are the repartitioning internal topics.
> >>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> On 6/2/17 9:33 AM, Frank Lyaruu wrote:
> >>>>> Hi Kafka people,
> >>>>>
> >>>>> I'm running an application that pushes database changes into a Kafka
> >>>> topic.
> >>>>> I'm also running a Kafka streams application
> >>>>> that listens to these topics, and groups them using the high level
> >> API,
> >>>> and
> >>>>> inserts them to another database.
> >>>>>
> >>>>> All topics are compacted, with the exception of the 'repartition
> >>> topics',
> >>>>> which are configured to be retained for 36 hours.
> >>>>>
> >>>>> Note that the changes in the original kafka topics can be old
> >>> (generally
> >>>>> more than 36 hours), as they only change when
> >>>>> the data changes.
> >>>>>
> >>>>> When I start an instance of the Kafka Streams application, I see
the
> >>>>> repartition topics being deleted immediately,
> >>>>> sometimes before they are processed, and it looks like the
> >> repartition
> >>>>> messages use the same timestamp as the
> >>>>> original message.
> >>>>>
> >>>>> I see that the repartition topics have timestamp.type = CreateTime,
> >>> does
> >>>>> that mean it uses the timestamp of the
> >>>>> original message? Shouldn't that be LogAppendTime for repartition
> >>> topics?
> >>>>> Or is there a way to configure that?
> >>>>>
> >>>>> If I hack into my Kafka streams code to force it to use LogAppendTime
> >>>> seems
> >>>>> to solve my problem, but that appears to
> >>>>> take a huge toll on the brokers. Throughput plummets, and I don't
> >>> really
> >>>>> know why.
> >>>>>
> >>>>> Any ideas?
> >>>>>
> >>>>> Frank
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message