kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jonathan Santilli <jonathansanti...@gmail.com>
Subject Re: org.apache.kafka.streams.processor.TimestampExtractor#extract method in version 2.3 always returns -1 as value
Date Wed, 26 Jun 2019 14:58:14 GMT
Sure Bill, sure, is the same code I have reported the issue for the
suppress some months ago:
https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio

In fact, I have reported at that moment, that after restarting the app, the
suppress was sending again downstream the already processed records.
Now, with the version 2.2.1+ after restarting the app, the
aggregation/suppress (do not know exactly where) is missing some records to
be aggregated, even though they are in the input topic.

Kafka Version 2.3

*public* *class* OwnTimeExtractor *implements* TimestampExtractor {

    @Override

    *public* *long* extract(*final* ConsumerRecord<Object, Object> record,
*final* *long* previousTimestamp) {


        *// *previousTimestamp is always == -1

    }
}

final StreamsBuilder builder = new StreamsBuilder();
final KStream<..., ...> events = builder
        .stream(inputTopicNames, Consumed.with(..., ...)
        .withTimestampExtractor(new OwnTimeExtractor());

events
    .filter((k, v) -> ...)
    .flatMapValues(v -> ...)
    .flatMapValues(v -> ...)
    .selectKey((k, v) -> v)
    .groupByKey(Grouped.with(..., ...))
    .windowedBy(
        TimeWindows.of(Duration.ofSeconds(windowSizeInSecs))
            .advanceBy(Duration.ofSeconds(windowSizeInSecs))
            .grace(Duration.ofSeconds(windowSizeGraceInSecs)))
    .reduce((agg, new) -> {
        ...
        return agg;
    })
    .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
    .toStream()
    .to(outPutTopicNameOfGroupedData, Produced.with(..., ...));



On Wed, Jun 26, 2019 at 3:40 PM Bill Bejeck <bill@confluent.io> wrote:

> Thanks for the reply Jonathan.
>
> Are you in a position to share your code so I can try to reproduce on my
> end?
>
> -Bill
>
>
> On Wed, Jun 26, 2019 at 10:23 AM Jonathan Santilli <
> jonathansantilli@gmail.com> wrote:
>
> > Hello Bill,
> >
> > am implementing the TimestampExtractor Interface, then using it to
> consume,
> > like:
> >
> > *final* KStream<..., ...> events = builder.stream(inputTopicList,
> Consumed.
> > *with*(keySerde, valueSerde).withTimestampExtractor(*new
> *OwnTimeExtractor(
> > ...)));
> >
> > Am not setting the default.timestamp.extractor config value.
> >
> > Cheers!
> > --
> > Jonathan
> >
> >
> > On Wed, Jun 26, 2019 at 3:16 PM Bill Bejeck <bill@confluent.io> wrote:
> >
> > > Hi Jonathan,
> > >
> > > Thanks for reporting this.  Which timestamp extractor are you using in
> > the
> > > configs?
> > >
> > > Thanks,
> > > Bill
> > >
> > > On Wed, Jun 26, 2019 at 9:14 AM Jonathan Santilli <
> > > jonathansantilli@gmail.com> wrote:
> > >
> > > > Hello, hope you all are doing well,
> > > >
> > > > am testing the new version 2.3 for Kafka Streams specifically. I have
> > > > noticed that now, the implementation of the method extract from the
> > > > interface org.apache.kafka.streams.processor.TimestampExtractor
> > > >
> > > > *public* *long* extract(ConsumerRecord<Object, Object> record, *long*
> > > > previousTimestamp)
> > > >
> > > >
> > > > is always returning -1 as value.
> > > >
> > > >
> > > > Previous version 2.2.1 was returning the correct value for the record
> > > > partition.
> > > >
> > > > Am aware the interface is market as @InterfaceStability.Evolving and
> we
> > > > should not rely on the stability/compatibility. Am just wondering if
> > that
> > > > new behavior is intentional or is a bug.
> > > >
> > > >
> > > > Cheers!
> > > > --
> > > > Santilli Jonathan
> > > >
> > >
> >
> >
> > --
> > Santilli Jonathan
> >
>


-- 
Santilli Jonathan

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message