kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <mj...@apache.org>
Subject Re: KIP to Gracefully handle timeout exception on kafka streams
Date Tue, 15 Dec 2020 21:15:11 GMT
Sounds like `default.api.timeout.ms`.

-Matthias

On 12/11/20 8:22 AM, Pushkar Deole wrote:
> Matthias,
> 
> By the way, one more of our service recently encountered this exception:
> can you suggest if this can also be avoided by tuning any specific
> configuration ?
> 
> {"@timestamp":"2020-11-24T13:33:38.617+00:00","@version":"1","message":"Exception
> processing processor thread -
> analytics-event-normalizer-d6aeedd6-f53b-4e20-91f6-ee4091041006-StreamThread-3
> stream - org.apache.kafka.common.errors.TimeoutException: Timeout of
> 60000ms expired before the position for partition engagement-18 could be
> determined","logger_name":"com.avaya.analytics.kafka.topology.EventNormalizerTopology","thread_name":"analytics-event-normalizer-d6aeedd6-f53b-4e20-91f6-ee4091041006-StreamThread-3","level":"ERROR","level_value":40000,"stack_trace":"java.lang.IllegalStateException:
> org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired
> before the position for partition engagement-18 could be determined\n\tat
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:510)\n\tat
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:478)\n\tat
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:226)\n\tat
> org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:543)\n\tat
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:977)\n\tat
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:823)\n\tat
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)\n\tat
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)\nCaused
> by: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms
> expired before the position for partition engagement-18 could be
> determined\n"}
> 
> 
> 
> On Sun, Nov 22, 2020 at 12:35 AM Matthias J. Sax <mjsax@apache.org> wrote:
> 
>> KIP-572 will only ship in 2.8.0.
>>
>> For the exception you hit, it's `max.block.ms` -- you might also look
>> into `default.api.timeout.ms`.
>>
>> In general, the relevant configs are documented in the JavaDocs of the
>> corresponding client method.
>>
>>
>> -Matthias
>>
>> On 11/20/20 9:11 PM, Pushkar Deole wrote:
>>> Thanks Matthias... We are already on kafka 2.5.0, and
>>> https://issues.apache.org/jira/browse/KAFKA-8803  mentions that these
>> type
>>> of issues are fixed in 2.5.0
>>>
>>> Is KIP-572 planned for 2.7.0 ?
>>>
>>> Also, for timeout and retries, can you provide which parameters should we
>>> configure to higher values for now?
>>>
>>>
>>> On Sat, Nov 21, 2020 at 5:15 AM Matthias J. Sax <mjsax@apache.org>
>> wrote:
>>>
>>>> Yes, if brokers are upgraded via rolling bounce, and the embedded
>>>> clients are configured with large enough timeouts and retries, they
>>>> should just fail over to running brokers if a single broker is bounced.
>>>>
>>>> If you get a timeout exception, than KafkaStreams dies atm -- we have
>>>> KIP-572 in-flight that will improve the situation by adding one more
>>>> retry layer within KafkaStreams itself. For now, you would need to
>>>> increase the corresponding client timeouts to avoid that the client
>>>> throws a timeout exception.
>>>>
>>>> There is however https://issues.apache.org/jira/browse/KAFKA-8803 that
>>>> you could have hit, too.
>>>>
>>>>
>>>> -Matthias
>>>>
>>>> On 11/18/20 7:05 AM, Pushkar Deole wrote:
>>>>> Matthias,
>>>>>
>>>>> We recently ran into an issue where kafka brokers upgraded (i guess it
>>>> was
>>>>> rolling update) for Aiven business plan 4 to plan 8. This involves
>> change
>>>>> to cpu, memory and storage for each broker.
>>>>>
>>>>> Since this should be rolling upgrade, we expected services to survive,
>>>>> however in one service we saw streams ran into Error with below
>>>> exception:
>>>>> Few questions:
>>>>> 1. If a broker goes down, the kafka streams client should handle
>>>> internally
>>>>> and connect to available broker since we have topic with replicas equal
>>>> to
>>>>> no. of brokers. Is this correct?
>>>>> 2. the below error says timeout expired while awaiting InitProducerId..
>>>>> what does this signify and why would this timeout occur when there will
>>>> be
>>>>> other brokers up and running?
>>>>>
>>>>>
>>>>>
>>>>
>> {"@timestamp":"2020-11-16T13:42:31.110+00:00","@version":"1","message":"Unexpected
>>>>> exception in stream
>>>>>
>>>>
>> processing.","logger_name":"com.avaya.analytics.filter.ApplicationConfig","thread_name":"analytics-event-filter-StreamThread-1","level":"ERROR","level_value":40000,"stack_trace":"org.apache.kafka.streams.errors.StreamsException:
>>>>> stream-thread [analytics-event-filter-StreamThread-1] Failed to
>>>>> rebalance.\n\tat
>>>>>
>>>>
>> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:862)\n\tat
>>>>>
>>>>
>> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)\n\tat
>>>>>
>>>>
>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)\n\tat
>>>>>
>>>>
>> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)\nCaused
>>>>> by: org.apache.kafka.streams.errors.StreamsException: stream-thread
>>>>> [analytics-event-filter-StreamThread-1] task [1_0] Failed to initialize
>>>>> task 1_0 due to timeout.\n\tat
>>>>>
>>>>
>> org.apache.kafka.streams.processor.internals.StreamTask.initializeTransactions(StreamTask.java:923)\n\tat
>>>>>
>>>>
>> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:206)\n\tat
>>>>>
>>>>
>> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:115)\n\tat
>>>>>
>>>>
>> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:352)\n\tat
>>>>>
>>>>
>> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:310)\n\tat
>>>>>
>>>>
>> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:295)\n\tat
>>>>>
>>>>
>> org.apache.kafka.streams.processor.internals.TaskManager.addNewActiveTasks(TaskManager.java:160)\n\tat
>>>>>
>>>>
>> org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:120)\n\tat
>>>>>
>>>>
>> org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:77)\n\tat
>>>>>
>>>>
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:278)\n\tat
>>>>>
>>>>
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:419)\n\tat
>>>>>
>>>>
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:439)\n\tat
>>>>>
>>>>
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)\n\tat
>>>>>
>>>>
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:497)\n\tat
>>>>>
>>>>
>> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1274)\n\tat
>>>>>
>>>>
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)\n\tat
>>>>>
>>>>
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1173)\n\tat
>>>>> brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:89)\n\tat
>>>>> brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:83)\n\tat
>>>>>
>>>>
>> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:853)\n\t...
>>>>> 3 common frames omitted\nCaused by:
>>>>> org.apache.kafka.common.errors.TimeoutException: Timeout expired after
>>>>> 60000milliseconds while awaiting InitProducerId\n"}
>>>>>
>>>>
>>>
>>
> 

Mime
View raw message