kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Pushkar Deole <pdeole2...@gmail.com>
Subject Re: KIP to Gracefully handle timeout exception on kafka streams
Date Fri, 11 Dec 2020 16:22:04 GMT
Matthias,

By the way, one more of our service recently encountered this exception:
can you suggest if this can also be avoided by tuning any specific
configuration ?

{"@timestamp":"2020-11-24T13:33:38.617+00:00","@version":"1","message":"Exception
processing processor thread -
analytics-event-normalizer-d6aeedd6-f53b-4e20-91f6-ee4091041006-StreamThread-3
stream - org.apache.kafka.common.errors.TimeoutException: Timeout of
60000ms expired before the position for partition engagement-18 could be
determined","logger_name":"com.avaya.analytics.kafka.topology.EventNormalizerTopology","thread_name":"analytics-event-normalizer-d6aeedd6-f53b-4e20-91f6-ee4091041006-StreamThread-3","level":"ERROR","level_value":40000,"stack_trace":"java.lang.IllegalStateException:
org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired
before the position for partition engagement-18 could be determined\n\tat
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:510)\n\tat
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:478)\n\tat
org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:226)\n\tat
org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:543)\n\tat
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:977)\n\tat
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:823)\n\tat
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)\n\tat
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)\nCaused
by: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms
expired before the position for partition engagement-18 could be
determined\n"}



On Sun, Nov 22, 2020 at 12:35 AM Matthias J. Sax <mjsax@apache.org> wrote:

> KIP-572 will only ship in 2.8.0.
>
> For the exception you hit, it's `max.block.ms` -- you might also look
> into `default.api.timeout.ms`.
>
> In general, the relevant configs are documented in the JavaDocs of the
> corresponding client method.
>
>
> -Matthias
>
> On 11/20/20 9:11 PM, Pushkar Deole wrote:
> > Thanks Matthias... We are already on kafka 2.5.0, and
> > https://issues.apache.org/jira/browse/KAFKA-8803  mentions that these
> type
> > of issues are fixed in 2.5.0
> >
> > Is KIP-572 planned for 2.7.0 ?
> >
> > Also, for timeout and retries, can you provide which parameters should we
> > configure to higher values for now?
> >
> >
> > On Sat, Nov 21, 2020 at 5:15 AM Matthias J. Sax <mjsax@apache.org>
> wrote:
> >
> >> Yes, if brokers are upgraded via rolling bounce, and the embedded
> >> clients are configured with large enough timeouts and retries, they
> >> should just fail over to running brokers if a single broker is bounced.
> >>
> >> If you get a timeout exception, than KafkaStreams dies atm -- we have
> >> KIP-572 in-flight that will improve the situation by adding one more
> >> retry layer within KafkaStreams itself. For now, you would need to
> >> increase the corresponding client timeouts to avoid that the client
> >> throws a timeout exception.
> >>
> >> There is however https://issues.apache.org/jira/browse/KAFKA-8803 that
> >> you could have hit, too.
> >>
> >>
> >> -Matthias
> >>
> >> On 11/18/20 7:05 AM, Pushkar Deole wrote:
> >>> Matthias,
> >>>
> >>> We recently ran into an issue where kafka brokers upgraded (i guess it
> >> was
> >>> rolling update) for Aiven business plan 4 to plan 8. This involves
> change
> >>> to cpu, memory and storage for each broker.
> >>>
> >>> Since this should be rolling upgrade, we expected services to survive,
> >>> however in one service we saw streams ran into Error with below
> >> exception:
> >>> Few questions:
> >>> 1. If a broker goes down, the kafka streams client should handle
> >> internally
> >>> and connect to available broker since we have topic with replicas equal
> >> to
> >>> no. of brokers. Is this correct?
> >>> 2. the below error says timeout expired while awaiting InitProducerId..
> >>> what does this signify and why would this timeout occur when there will
> >> be
> >>> other brokers up and running?
> >>>
> >>>
> >>>
> >>
> {"@timestamp":"2020-11-16T13:42:31.110+00:00","@version":"1","message":"Unexpected
> >>> exception in stream
> >>>
> >>
> processing.","logger_name":"com.avaya.analytics.filter.ApplicationConfig","thread_name":"analytics-event-filter-StreamThread-1","level":"ERROR","level_value":40000,"stack_trace":"org.apache.kafka.streams.errors.StreamsException:
> >>> stream-thread [analytics-event-filter-StreamThread-1] Failed to
> >>> rebalance.\n\tat
> >>>
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:862)\n\tat
> >>>
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)\n\tat
> >>>
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)\n\tat
> >>>
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)\nCaused
> >>> by: org.apache.kafka.streams.errors.StreamsException: stream-thread
> >>> [analytics-event-filter-StreamThread-1] task [1_0] Failed to initialize
> >>> task 1_0 due to timeout.\n\tat
> >>>
> >>
> org.apache.kafka.streams.processor.internals.StreamTask.initializeTransactions(StreamTask.java:923)\n\tat
> >>>
> >>
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:206)\n\tat
> >>>
> >>
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:115)\n\tat
> >>>
> >>
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:352)\n\tat
> >>>
> >>
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:310)\n\tat
> >>>
> >>
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:295)\n\tat
> >>>
> >>
> org.apache.kafka.streams.processor.internals.TaskManager.addNewActiveTasks(TaskManager.java:160)\n\tat
> >>>
> >>
> org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:120)\n\tat
> >>>
> >>
> org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:77)\n\tat
> >>>
> >>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:278)\n\tat
> >>>
> >>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:419)\n\tat
> >>>
> >>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:439)\n\tat
> >>>
> >>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)\n\tat
> >>>
> >>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:497)\n\tat
> >>>
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1274)\n\tat
> >>>
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)\n\tat
> >>>
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1173)\n\tat
> >>> brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:89)\n\tat
> >>> brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:83)\n\tat
> >>>
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:853)\n\t...
> >>> 3 common frames omitted\nCaused by:
> >>> org.apache.kafka.common.errors.TimeoutException: Timeout expired after
> >>> 60000milliseconds while awaiting InitProducerId\n"}
> >>>
> >>
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message