kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: Kafka streams dropping events in join when reading from earliest message on topic
Date Mon, 17 Jun 2019 16:47:11 GMT
> I verified keys and timestamps and they match.

Did you verify the timestamps client side, ie, in your Streams application?

> When is the watermark for the grace period advanced? 

There is nothing like a watermark. Time is tracked on a per-record basis.

> the event time is the Kafka log append time.

If it's log append time, that the broker sets the timestamp. Do you use
the embedded record timestamp for the join (default)? Or do you have an
embedded timestamps in the value and use an custom `TimestampExtractor`?

How large is your join-window, what is your grace period and what's your
store retention time?


On 6/17/19 5:24 AM, giselle.vandongen@klarrio.com wrote:
> I verified keys and timestamps and they match.  If I start the publisher and processor
at the same time, the join has entirely correct output with 6000 messages coming in and 3000
coming out. 
> Putting the grace period to a higher value has no effect. 
> When is the watermark for the grace period advanced? Per commit interval? 
> I read from 5 Kafka brokers and the event time is the Kafka log append time. Could the
ordering across brokers have something to do with it?
> On 2019/06/14 18:33:22, "Matthias J. Sax" <matthias@confluent.io> wrote: 
>> How do you know that the result should be 900,000 messages? Did you
>> verify that the keys match and that the timestamps are correct?
>> Did you try to remove grace-period or set a higher value? Maybe there is
>> an issue with ouf-of-order data?
>> -Matthias
>> On 6/14/19 5:05 AM, giselle.vandongen@klarrio.com wrote:
>>> I have two streams of data flowing into a Kafka cluster. I want to process this
data with Kafka streams. The stream producers are started at some time t.
>>> I start up the Kafka Streams job 5 minutes later and start reading from earliest
from both topics (about 900 000 messages already on each topic). The job parses the data and
joins the two streams. On the intermediate topics, I see that all the earlier 2x900000 events
are flowing through until the join. However, only 250 000 are outputted from the join, instead
of 900 000. 
>>> After processing the burst, the code works perfect on the new incoming events.
>>> Grace ms of the join is put on 50 millis but putting it on 5 minutes or 10 minutes
makes no difference. My other settings:
>>>     auto.offset.reset = earliest
>>>     commit.interval.ms = 1000
>>>     batch.size = 204800
>>>     max.poll.records = 500000
>>> The join is on the following window:
>>> JoinWindows.of(Duration.ofMillis(1000))
>>>       .grace(Duration.ofMillis(50))
>>> As far as I understand these joins should be done on event time. What that can
cause these results to be discarded?

View raw message