kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sophie Blee-Goldman <sop...@confluent.io>
Subject Re: org.apache.kafka.streams.processor.TimestampExtractor#extract method in version 2.3 always returns -1 as value
Date Tue, 09 Jul 2019 00:52:30 GMT
Thanks for the notice Jonathan! We tracked down the problem and it should
be an easy fix: https://github.com/apache/kafka/pull/6719/files

On Fri, Jul 5, 2019 at 6:25 AM Jonathan Santilli <jonathansantilli@gmail.com>
wrote:

> Thanks a lot Bill for creating the issue, I have updated it with a little
> bit more of info.
>
> Cheers!
> --
> Jonathan
>
>
>
>
> On Fri, Jun 28, 2019 at 9:21 PM Bill Bejeck <bill@confluent.io> wrote:
>
> > Jonathan, Matthias
> >
> > I've created a Jira for this issue
> > https://issues.apache.org/jira/browse/KAFKA-8615.
> >
> > Jonathan, I plan to work on this when I get back from vacation on 7/8.
> If
> > you would like to work in this yourself before that, feel free to do so
> and
> > assign the ticket to yourself.
> >
> > Thanks,
> > Bill
> >
> > On Thu, Jun 27, 2019 at 1:38 PM Matthias J. Sax <matthias@confluent.io>
> > wrote:
> >
> > > Sounds like a regression to me.
> > >
> > > We did change some code to track partition time differently. Can you
> > > open a Jira?
> > >
> > >
> > > -Matthias
> > >
> > > On 6/26/19 7:58 AM, Jonathan Santilli wrote:
> > > > Sure Bill, sure, is the same code I have reported the issue for the
> > > > suppress some months ago:
> > > >
> > >
> >
> https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio
> > > >
> > > > In fact, I have reported at that moment, that after restarting the
> app,
> > > the
> > > > suppress was sending again downstream the already processed records.
> > > > Now, with the version 2.2.1+ after restarting the app, the
> > > > aggregation/suppress (do not know exactly where) is missing some
> > records
> > > to
> > > > be aggregated, even though they are in the input topic.
> > > >
> > > > Kafka Version 2.3
> > > >
> > > > *public* *class* OwnTimeExtractor *implements* TimestampExtractor {
> > > >
> > > >     @Override
> > > >
> > > >     *public* *long* extract(*final* ConsumerRecord<Object, Object>
> > > record,
> > > > *final* *long* previousTimestamp) {
> > > >
> > > >
> > > >         *// *previousTimestamp is always == -1
> > > >
> > > >     }
> > > > }
> > > >
> > > > final StreamsBuilder builder = new StreamsBuilder();
> > > > final KStream<..., ...> events = builder
> > > >         .stream(inputTopicNames, Consumed.with(..., ...)
> > > >         .withTimestampExtractor(new OwnTimeExtractor());
> > > >
> > > > events
> > > >     .filter((k, v) -> ...)
> > > >     .flatMapValues(v -> ...)
> > > >     .flatMapValues(v -> ...)
> > > >     .selectKey((k, v) -> v)
> > > >     .groupByKey(Grouped.with(..., ...))
> > > >     .windowedBy(
> > > >         TimeWindows.of(Duration.ofSeconds(windowSizeInSecs))
> > > >             .advanceBy(Duration.ofSeconds(windowSizeInSecs))
> > > >             .grace(Duration.ofSeconds(windowSizeGraceInSecs)))
> > > >     .reduce((agg, new) -> {
> > > >         ...
> > > >         return agg;
> > > >     })
> > > >
> > >
> >
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > > >     .toStream()
> > > >     .to(outPutTopicNameOfGroupedData, Produced.with(..., ...));
> > > >
> > > >
> > > >
> > > > On Wed, Jun 26, 2019 at 3:40 PM Bill Bejeck <bill@confluent.io>
> wrote:
> > > >
> > > >> Thanks for the reply Jonathan.
> > > >>
> > > >> Are you in a position to share your code so I can try to reproduce
> on
> > my
> > > >> end?
> > > >>
> > > >> -Bill
> > > >>
> > > >>
> > > >> On Wed, Jun 26, 2019 at 10:23 AM Jonathan Santilli <
> > > >> jonathansantilli@gmail.com> wrote:
> > > >>
> > > >>> Hello Bill,
> > > >>>
> > > >>> am implementing the TimestampExtractor Interface, then using it
to
> > > >> consume,
> > > >>> like:
> > > >>>
> > > >>> *final* KStream<..., ...> events = builder.stream(inputTopicList,
> > > >> Consumed.
> > > >>> *with*(keySerde, valueSerde).withTimestampExtractor(*new
> > > >> *OwnTimeExtractor(
> > > >>> ...)));
> > > >>>
> > > >>> Am not setting the default.timestamp.extractor config value.
> > > >>>
> > > >>> Cheers!
> > > >>> --
> > > >>> Jonathan
> > > >>>
> > > >>>
> > > >>> On Wed, Jun 26, 2019 at 3:16 PM Bill Bejeck <bill@confluent.io>
> > wrote:
> > > >>>
> > > >>>> Hi Jonathan,
> > > >>>>
> > > >>>> Thanks for reporting this.  Which timestamp extractor are
you
> using
> > in
> > > >>> the
> > > >>>> configs?
> > > >>>>
> > > >>>> Thanks,
> > > >>>> Bill
> > > >>>>
> > > >>>> On Wed, Jun 26, 2019 at 9:14 AM Jonathan Santilli <
> > > >>>> jonathansantilli@gmail.com> wrote:
> > > >>>>
> > > >>>>> Hello, hope you all are doing well,
> > > >>>>>
> > > >>>>> am testing the new version 2.3 for Kafka Streams specifically.
I
> > have
> > > >>>>> noticed that now, the implementation of the method extract
from
> the
> > > >>>>> interface org.apache.kafka.streams.processor.TimestampExtractor
> > > >>>>>
> > > >>>>> *public* *long* extract(ConsumerRecord<Object, Object>
record,
> > *long*
> > > >>>>> previousTimestamp)
> > > >>>>>
> > > >>>>>
> > > >>>>> is always returning -1 as value.
> > > >>>>>
> > > >>>>>
> > > >>>>> Previous version 2.2.1 was returning the correct value
for the
> > record
> > > >>>>> partition.
> > > >>>>>
> > > >>>>> Am aware the interface is market as @InterfaceStability.Evolving
> > and
> > > >> we
> > > >>>>> should not rely on the stability/compatibility. Am just
wondering
> > if
> > > >>> that
> > > >>>>> new behavior is intentional or is a bug.
> > > >>>>>
> > > >>>>>
> > > >>>>> Cheers!
> > > >>>>> --
> > > >>>>> Santilli Jonathan
> > > >>>>>
> > > >>>>
> > > >>>
> > > >>>
> > > >>> --
> > > >>> Santilli Jonathan
> > > >>>
> > > >>
> > > >
> > > >
> > >
> > >
> >
>
>
> --
> Santilli Jonathan
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message