kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Eno Thereska <eno.there...@gmail.com>
Subject Re: In kafka streams consumer seems to hang while retrieving the offsets
Date Mon, 10 Apr 2017 11:30:55 GMT
Hi Sachin,

In 0.10.2.1 we've changed the default value of max.poll.interval.ms (to avoid rebalancing
during recovery) as well as the default value of the streams producer retries (to retry during
a temporary broker failure). I think you are aware of the changes, but just double checking.
You don't need to wait for 0.10.2.1, you can make the changes directly yourself:

final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, ID);
...
props.put(ProducerConfig.RETRIES_CONFIG, 10);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));

This doesn't address the RocksDB issue though, still looking into that.

Thanks
Eno

> On 9 Apr 2017, at 22:55, Sachin Mittal <sjmittal@gmail.com> wrote:
> 
> Let me try to get the debug log when this error happens.
> 
> Right now we have three instances each with 4 threads consuming from 12
> partition topic.
> So one thread per partition.
> 
> The application is running fine much better than before. Now it usually
> runs for a week even during peak load.
> 
> Sometime out of blue either rocksdb throws an exception with a single
> character (which I guess is a known issue with rocks db fixed in some next
> release).
> Or the producer gets timed out while committing some changelog topic
> record. I had increased the timeout from 30 seconds to 180 seconds, but it
> still throws exception for that time also.
> 
> Not sure if these are due to VM issue or network.
> 
> But whenever something like this happens, the application goes into
> rebalance and soon things take turn for worse. Soon some of the threads go
> into deadlock with above stack trace and application is now in perpetual
> rebalance state.
> 
> Only way to resolve this is kill all instances using -9 and restart the
> instances one by one.
> 
> So also long as we have a steady state of one thread per partition
> everything is working fine. I am still working out a way to limit the
> changelog topic size by more aggressive compaction and let me see if that
> will make things better.
> 
> I will try to get the logs when this happens next time.
> 
> Thanks
> Sachin
> 
> 
> 
> On Sun, Apr 9, 2017 at 6:05 PM, Eno Thereska <eno.thereska@gmail.com> wrote:
> 
>> Hi Sachin,
>> 
>> It's not necessarily a deadlock. Do you have any debug traces from those
>> nodes? Also would be useful to know the config (e.g., how many partitions
>> do you have and how many app instances.)
>> 
>> Thanks
>> Eno
>> 
>>> On 9 Apr 2017, at 04:45, Sachin Mittal <sjmittal@gmail.com> wrote:
>>> 
>>> Hi,
>>> In my streams applications cluster in one or more instances I see some
>>> threads always waiting with the following stack.
>>> 
>>> Every time I check on jstack I see the following trace.
>>> 
>>> Is this some kind of new deadlock that we have failed to identify.
>>> 
>>> Thanks
>>> Sachin
>>> 
>>> here is the stack trace:
>>> ------------------------------------------------------------
>> ------------------------------------------------------------
>> ----------------------------------------------
>>> "StreamThread-4" #20 prio=5 os_prio=0 tid=0x00007fb814be3000 nid=0x19bf
>>> runnable [0x00007fb7cb4f6000]
>>>  java.lang.Thread.State: RUNNABLE
>>>       at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>>>       at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
>>>       at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.
>> java:93)
>>>       at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
>>>       - locked <0x0000000701c50c98> (a sun.nio.ch.Util$3)
>>>       - locked <0x0000000701c50c88> (a java.util.Collections$
>>> UnmodifiableSet)
>>>       - locked <0x0000000701c4f6a8> (a sun.nio.ch.EPollSelectorImpl)
>>>       at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
>>>       at org.apache.kafka.common.network.Selector.select(
>>> Selector.java:489)
>>>       at org.apache.kafka.common.network.Selector.poll(
>> Selector.java:298)
>>>       at org.apache.kafka.clients.NetworkClient.poll(
>>> NetworkClient.java:349)
>>>       at org.apache.kafka.clients.consumer.internals.
>>> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
>>>       - locked <0x0000000701c5da48> (a org.apache.kafka.clients.
>>> consumer.internals.ConsumerNetworkClient)
>>>       at org.apache.kafka.clients.consumer.internals.
>>> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:203)
>>>       at org.apache.kafka.clients.consumer.internals.
>>> ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:
>> 138)
>>>       at org.apache.kafka.clients.consumer.internals.Fetcher.
>>> retrieveOffsetsByTimes(Fetcher.java:422)
>>>       at org.apache.kafka.clients.consumer.internals.Fetcher.
>>> resetOffset(Fetcher.java:370)
>>>       at org.apache.kafka.clients.consumer.internals.Fetcher.
>>> resetOffsetsIfNeeded(Fetcher.java:227)
>>>       at org.apache.kafka.clients.consumer.KafkaConsumer.
>>> updateFetchPositions(KafkaConsumer.java:1592)
>>>       at org.apache.kafka.clients.consumer.KafkaConsumer.
>>> position(KafkaConsumer.java:1265)
>>>       at org.apache.kafka.streams.processor.internals.
>>> ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:213)
>> 
>> 


Mime
View raw message