kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: org.apache.kafka.streams.processor.TimestampExtractor#extract method in version 2.3 always returns -1 as value
Date Thu, 27 Jun 2019 17:38:13 GMT
Sounds like a regression to me.

We did change some code to track partition time differently. Can you
open a Jira?


-Matthias

On 6/26/19 7:58 AM, Jonathan Santilli wrote:
> Sure Bill, sure, is the same code I have reported the issue for the
> suppress some months ago:
> https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio
> 
> In fact, I have reported at that moment, that after restarting the app, the
> suppress was sending again downstream the already processed records.
> Now, with the version 2.2.1+ after restarting the app, the
> aggregation/suppress (do not know exactly where) is missing some records to
> be aggregated, even though they are in the input topic.
> 
> Kafka Version 2.3
> 
> *public* *class* OwnTimeExtractor *implements* TimestampExtractor {
> 
>     @Override
> 
>     *public* *long* extract(*final* ConsumerRecord<Object, Object> record,
> *final* *long* previousTimestamp) {
> 
> 
>         *// *previousTimestamp is always == -1
> 
>     }
> }
> 
> final StreamsBuilder builder = new StreamsBuilder();
> final KStream<..., ...> events = builder
>         .stream(inputTopicNames, Consumed.with(..., ...)
>         .withTimestampExtractor(new OwnTimeExtractor());
> 
> events
>     .filter((k, v) -> ...)
>     .flatMapValues(v -> ...)
>     .flatMapValues(v -> ...)
>     .selectKey((k, v) -> v)
>     .groupByKey(Grouped.with(..., ...))
>     .windowedBy(
>         TimeWindows.of(Duration.ofSeconds(windowSizeInSecs))
>             .advanceBy(Duration.ofSeconds(windowSizeInSecs))
>             .grace(Duration.ofSeconds(windowSizeGraceInSecs)))
>     .reduce((agg, new) -> {
>         ...
>         return agg;
>     })
>     .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
>     .toStream()
>     .to(outPutTopicNameOfGroupedData, Produced.with(..., ...));
> 
> 
> 
> On Wed, Jun 26, 2019 at 3:40 PM Bill Bejeck <bill@confluent.io> wrote:
> 
>> Thanks for the reply Jonathan.
>>
>> Are you in a position to share your code so I can try to reproduce on my
>> end?
>>
>> -Bill
>>
>>
>> On Wed, Jun 26, 2019 at 10:23 AM Jonathan Santilli <
>> jonathansantilli@gmail.com> wrote:
>>
>>> Hello Bill,
>>>
>>> am implementing the TimestampExtractor Interface, then using it to
>> consume,
>>> like:
>>>
>>> *final* KStream<..., ...> events = builder.stream(inputTopicList,
>> Consumed.
>>> *with*(keySerde, valueSerde).withTimestampExtractor(*new
>> *OwnTimeExtractor(
>>> ...)));
>>>
>>> Am not setting the default.timestamp.extractor config value.
>>>
>>> Cheers!
>>> --
>>> Jonathan
>>>
>>>
>>> On Wed, Jun 26, 2019 at 3:16 PM Bill Bejeck <bill@confluent.io> wrote:
>>>
>>>> Hi Jonathan,
>>>>
>>>> Thanks for reporting this.  Which timestamp extractor are you using in
>>> the
>>>> configs?
>>>>
>>>> Thanks,
>>>> Bill
>>>>
>>>> On Wed, Jun 26, 2019 at 9:14 AM Jonathan Santilli <
>>>> jonathansantilli@gmail.com> wrote:
>>>>
>>>>> Hello, hope you all are doing well,
>>>>>
>>>>> am testing the new version 2.3 for Kafka Streams specifically. I have
>>>>> noticed that now, the implementation of the method extract from the
>>>>> interface org.apache.kafka.streams.processor.TimestampExtractor
>>>>>
>>>>> *public* *long* extract(ConsumerRecord<Object, Object> record,
*long*
>>>>> previousTimestamp)
>>>>>
>>>>>
>>>>> is always returning -1 as value.
>>>>>
>>>>>
>>>>> Previous version 2.2.1 was returning the correct value for the record
>>>>> partition.
>>>>>
>>>>> Am aware the interface is market as @InterfaceStability.Evolving and
>> we
>>>>> should not rely on the stability/compatibility. Am just wondering if
>>> that
>>>>> new behavior is intentional or is a bug.
>>>>>
>>>>>
>>>>> Cheers!
>>>>> --
>>>>> Santilli Jonathan
>>>>>
>>>>
>>>
>>>
>>> --
>>> Santilli Jonathan
>>>
>>
> 
> 


Mime
View raw message