kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Javier Holguera <javier.holgu...@zopa.com>
Subject RE: Kafka Streams - max.poll.interval.ms defaults to Integer.MAX_VALUE
Date Mon, 01 Jan 2018 12:49:48 GMT
Hi Matthias,

Thanks for clarifying the last point. I think I got a full picture of the situation now.

I agree that the most likely scenario is one where the third party fails to serve all requests,
so other instances of the app would be equally affected. The scenario that I have in mind
is mostly one where one instance is slowed down while the others aren't. I imagine that it
could happen if whatever partition strategy we have chosen for the topic ends up triggering
a slow path on the third party. That instance would be repeatedly slow while the others would
hit the happy path.

I agree that calling a third party from a Streams app is not ideal. However, in this case
we can't pre-load the information unfortunately. It's more like a lookup that we have to do
against the third party and we don't know in advance the query criteria.

Thanks for taking the time explaining the whole flow.

-----Original Message-----
From: Matthias J. Sax [mailto:matthias@confluent.io] 
Sent: Thursday, December 28, 2017 6:58 PM
To: users@kafka.apache.org
Subject: Re: Kafka Streams - max.poll.interval.ms defaults to Integer.MAX_VALUE

> I imagine that it means other consumers would not consume either until the rebalance
is completed successfully. Is that correct?

Yes.

>> If it is, then we will assign explicit, narrower values to max.poll.timeout.ms to
avoid halting the stream in that scenario.

You can do that. After the 500 seconds passed and the straggling instance calls `poll()` again,
it will trigger a new rebalance an rejoin the group.

However, I am wondering about the overall scenario: if the external third party service slows
down or is offline, would this not affect all your instances? So what do you gain?

Also note, if you "unblock" the rebalance, the records that are processed by the straggler,
will be reassigned to the remaining instances, and thus, I would assume that those get "stuck"
too as they need to call the same external service to process the data?

In general, it is not recommended to call an external service within Kafka Streams if possible.
It would be better, to load the corresponding data into a topic and read as a KTable to do
a stream-table join. Not sure if this is feasible for your use-case though.


-Matthias



On 12/28/17 7:16 AM, Javier Holguera wrote:
> Hi Matthias,
> 
> 
> Thanks for your detailed answer.
> 
> 
> I have one more question (sorry to be so annoying!). If we had a topology making calls
to a third party takes, let's say, 5 seconds to process requests and we pull a batch of 20
records, the app would be unresponsive in the event of a rebalance for up to 100 seconds (in
the worst possible scenario).
> 
> 
> If the third party slows down or even goes down and we timeout after 25 seconds per call,
the unresponsive period could grow to 500 seconds. I imagine that it means other consumers
would not consume either until the rebalance is completed successfully. Is that correct?
> 
> 
> If it is, then we will assign explicit, narrower values to max.poll.timeout.ms to avoid
halting the stream in that scenario.
> 
> 
> Thanks.
> 
> 
> ________________________________
> From: Matthias J. Sax <matthias@confluent.io>
> Sent: 27 December 2017 19:54:38
> To: users@kafka.apache.org
> Subject: Re: Kafka Streams - max.poll.interval.ms defaults to 
> Integer.MAX_VALUE
> 
> It would -- however, this is not an issue for KafkaStreams as we make 
> sure the thread is either still alive or properly shut down. Thus, if 
> an error happens and the thread really dies, KafkaStreams ensures that 
> the heartbeat thread is stopped and thus, a rebalance would not block 
> forever as it drops out of the group via session-timeout.
> 
> And as long as a KafkaStreams instance does restore, the rebalance 
> should block by design.
> 
> Note, that all this does not hold for newer versions of KafkaStreams 
> anymore.
> 
> 
> -Matthias
> 
> 
> 
> On 12/27/17 6:55 AM, Javier Holguera wrote:
>> Hi Matthias,
>>
>> Thanks for your answer. It makes a lot of sense.
>>
>> Just a follow-up question. KIP-62 says: "we give the client as much as max.poll.interval.ms
to handle a batch of records, this is also the maximum time before a consumer can be expected
to rejoin the group in the worst case". Does it mean that a broker would wait Integer.MAX_VALUE
for a client to report in the event of a rebalance? That sounds improbable, so I must be missing
something.
>>
>> Thanks.
>>
>>
>> -----Original Message-----
>> From: Matthias J. Sax [mailto:matthias@confluent.io]
>> Sent: Friday, December 22, 2017 9:13 PM
>> To: users@kafka.apache.org
>> Subject: Re: Kafka Streams - max.poll.interval.ms defaults to 
>> Integer.MAX_VALUE
>>
>> The value was change to make Streams application robust against large state restore
phases during rebalance.
>>
>> Ie, it is targeted to exactly "fix" 2. If an application needs to restore state,
this state restore might take longer than the max.poll.interval.ms parameter and thus, even
if the application is in a good state it drops out of the group. This results in rebalance
"storms". The consumer default of 30 seconds is too small for most applications and thus we
set it to MAX_VALUE -- if you have a good estimate on the max expected state restore time,
you can safely set the timeout to an appropriate value.
>>
>> Note, in Kafka 0.11 and 1.0 Kafka Streams state restore was largely improved and
it should not be an issue there to reduce the timeout accordingly.
>>
>>
>> -Matthias
>>
>> On 12/20/17 7:14 AM, Javier Holguera wrote:
>>> Hi,
>>>
>>> According to the documentation, "max.poll.interval.ms" defaults to Integer.MAX_VALUE
for Kafka Streams since 0.10.2.1.
>>>
>>> Considering that the "max.poll.interval.ms" is:
>>>
>>>   1.  A "processing timeout" to control an upper limit for processing a batch
of records AND
>>>   2.  The rebalance timeout that the client will communicate to the 
>>> broker, according to KIP-62
>>>
>>> How do Kafka Streams application detect slow consumers that are taking too long
to process a batch of messages? What replaces the existing mechanism with a smaller "max.poll.interval.ms"
where the application will willingly abandon the consumer group when the timeout expires?
>>>
>>> From the broker perspective, what does it mean that the application communicates
a "rebalance timeout" of Integer.MAX_VALUE? I can imagine it will not wait for that long in
a rebalance. What happens then?
>>>
>>> Thanks.
>>>
>>
> 
> 


Mime
View raw message