kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: Kafka streams dropping events in join when reading from earliest message on topic
Date Fri, 21 Jun 2019 00:19:24 GMT
> The two streams were read in separately:
>  instead of together:

If you want to join two streams, reading both topic separately sound
correct.



> There are twenty partitions per topic. It seems as if it is not reading equally fast
from all topic partitions.

This should not affect the correctness of the join, if you grace-period,
retention-time is large enough.



> If time is tracked on a per-record basis. Do you then take the max timestamp across all
partitions and topics as the current event time?

We track the maximum timestamp per task. In you case, you get 20 tasks,
each task joining 2 partition of the two input topics. Each tasks,
tracks time progress independently of other tasks, and across both
partitions.



> If so, if you read faster from one partition than from the other when processing old
data, what can you do to make sure this does not get discarded besides putting a very high
grace period?

This should not happen (as least since 2.1 release) because records a
processed in timestamp order. A task compare the timestamp of the "head"
record of both partitions and picks the one with lower timestamp for
processing.

Prior to 2.1, it was a best effort approach, however, consumer should
fetch data "round robin" across all partitions. This is only best effort
synchronization though. For this case, you might need to set a higher
grace-period/retention-time.

More details in the corresponding KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-353%3A+Improve+Kafka+Streams+Timestamp+Synchronization

The config John mentioned, is related, however, it's more relevant for a
"live" run, when you process data from the tail of a topic and producers
write data "in-bursts". For the reprocessing of existing data as you
described it, it should not have a huge impact (maybe on startup while
buffers are warmed up). However, even than, the grace-period is the more
important configuration for an inner stream-stream join.



> My join window is one second, grace period 50ms, retention time is default.

Why not setting grace-period to retention-time? There is no drawback in
a higher grace-period for stream-stream joins. I think this low grace
period is the root cause if missing data.



> I am also wondering about what to do with the commit interval? In normal cases this should
be on 1000 ms but in the case of this initial startup burst it should output faster?

Commit interval should not affect the joins result at all.




-Matthias



On 6/20/19 9:25 AM, John Roesler wrote:
> Hi!
> 
> You might also want to set MAX_TASK_IDLE_MS_CONFIG =
> "max.task.idle.ms" to a non-zero value. This will instruct Streams to
> wait the configured amount of time to buffer incoming events on all
> topics before choosing any records to process. In turn, this should
> cause records to be processed in roughly timestamp order across all
> your topics. Without it, Streams might run ahead on one of the topics
> before processing events from the others.
> 
> You _might_ want to set the idle time higher than your
> MAX_POLL_INTERVAL_MS_CONFIG = "max.poll.interval.ms", to be sure that
> you actually get a chance to poll for more records before giving up on
> the idle.
> 
> For any operator that is time-dependent, the maximum _observed_
> timestamp is considered the current stream time.
> 
> I didn't follow the question about commit interval. It's a fixed
> configuration, so you can't make it commit more frequently during the
> initial catch-up, but then again, why would you want to? It seems like
> you'd want the initial load to go as fast as possible, but committing
> more frequently will only slow it down.
> 
> I hope this helps,
> -John
> 
> On Thu, Jun 20, 2019 at 9:00 AM giselle.vandongen@klarrio.com
> <giselle.vandongen@klarrio.com> wrote:
>>
>> It seems like there were multiple issues:
>> 1. The two streams were read in separately:
>>     val stream1: KStream[String, String] = builder.stream[String, String](Set("topic1"))
>>     val stream2: KStream[String, String] = builder.stream[String, String](Set("topic2"))
>>  instead of together:
>>     val rawStreams: KStream[String, String] = builder.stream[String, String](Set("topic1",
"topic2"))
>> This second option got much more output but still not complete.
>> 2.  There are twenty partitions per topic. It seems as if it is not reading equally
fast from all topic partitions. When printing the input per thread the timestamps do not accumulate
nicely across partitions. If time is tracked on a per-record basis. Do you then take the max
timestamp across all partitions and topics as the current event time? If so, if you read faster
from one partition than from the other when processing old data, what can you do to make sure
this does not get discarded besides putting a very high grace period?
>>
>> My join window is one second, grace period 50ms, retention time is default.
>> I use the timestamp inside the observations. But I have tried with the default TimestampExtractor
(log append time) as well, which still did not give all the wanted output.
>>
>> I am also wondering about what to do with the commit interval? In normal cases this
should be on 1000 ms but in the case of this initial startup burst it should output faster?
>>
>> On 2019/06/17 16:47:11, "Matthias J. Sax" <matthias@confluent.io> wrote:
>>>> I verified keys and timestamps and they match.
>>>
>>> Did you verify the timestamps client side, ie, in your Streams application?
>>>
>>>> When is the watermark for the grace period advanced?
>>>
>>> There is nothing like a watermark. Time is tracked on a per-record basis.
>>>
>>>> the event time is the Kafka log append time.
>>>
>>> If it's log append time, that the broker sets the timestamp. Do you use
>>> the embedded record timestamp for the join (default)? Or do you have an
>>> embedded timestamps in the value and use an custom `TimestampExtractor`?
>>>
>>> How large is your join-window, what is your grace period and what's your
>>> store retention time?
>>>
>>>
>>> -Matthias
>>>
>>>
>>> On 6/17/19 5:24 AM, giselle.vandongen@klarrio.com wrote:
>>>> I verified keys and timestamps and they match.  If I start the publisher
and processor at the same time, the join has entirely correct output with 6000 messages coming
in and 3000 coming out.
>>>> Putting the grace period to a higher value has no effect.
>>>> When is the watermark for the grace period advanced? Per commit interval?
>>>> I read from 5 Kafka brokers and the event time is the Kafka log append time.
Could the ordering across brokers have something to do with it?
>>>>
>>>> On 2019/06/14 18:33:22, "Matthias J. Sax" <matthias@confluent.io> wrote:
>>>>> How do you know that the result should be 900,000 messages? Did you
>>>>> verify that the keys match and that the timestamps are correct?
>>>>>
>>>>> Did you try to remove grace-period or set a higher value? Maybe there
is
>>>>> an issue with ouf-of-order data?
>>>>>
>>>>> -Matthias
>>>>>
>>>>> On 6/14/19 5:05 AM, giselle.vandongen@klarrio.com wrote:
>>>>>> I have two streams of data flowing into a Kafka cluster. I want to
process this data with Kafka streams. The stream producers are started at some time t.
>>>>>>
>>>>>> I start up the Kafka Streams job 5 minutes later and start reading
from earliest from both topics (about 900 000 messages already on each topic). The job parses
the data and joins the two streams. On the intermediate topics, I see that all the earlier
2x900000 events are flowing through until the join. However, only 250 000 are outputted from
the join, instead of 900 000.
>>>>>>
>>>>>> After processing the burst, the code works perfect on the new incoming
events.
>>>>>>
>>>>>> Grace ms of the join is put on 50 millis but putting it on 5 minutes
or 10 minutes makes no difference. My other settings:
>>>>>>
>>>>>>     auto.offset.reset = earliest
>>>>>>     commit.interval.ms = 1000
>>>>>>     batch.size = 204800
>>>>>>     max.poll.records = 500000
>>>>>>
>>>>>> The join is on the following window:
>>>>>>
>>>>>> JoinWindows.of(Duration.ofMillis(1000))
>>>>>>       .grace(Duration.ofMillis(50))
>>>>>>
>>>>>> As far as I understand these joins should be done on event time.
What that can cause these results to be discarded?
>>>>>>
>>>>>
>>>>>
>>>
>>>


Mime
View raw message