kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sachin Mittal <sjmit...@gmail.com>
Subject Re: In kafka streams consumer seems to hang while retrieving the offsets
Date Mon, 10 Apr 2017 14:29:57 GMT
I have done these changes and also set
ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000
Now producer sometimes fails after 3 minutes earlier it used to fail at 30
seconds (default value).

So I was wondering what would be the reason of the same and how high should
this value go.

Thanks
Sachin


On Mon, Apr 10, 2017 at 5:00 PM, Eno Thereska <eno.thereska@gmail.com>
wrote:

> Hi Sachin,
>
> In 0.10.2.1 we've changed the default value of max.poll.interval.ms (to
> avoid rebalancing during recovery) as well as the default value of the
> streams producer retries (to retry during a temporary broker failure). I
> think you are aware of the changes, but just double checking. You don't
> need to wait for 0.10.2.1, you can make the changes directly yourself:
>
> final Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, ID);
> ...
> props.put(ProducerConfig.RETRIES_CONFIG, 10);
> props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
> Integer.toString(Integer.MAX_VALUE));
>
> This doesn't address the RocksDB issue though, still looking into that.
>
> Thanks
> Eno
>
> > On 9 Apr 2017, at 22:55, Sachin Mittal <sjmittal@gmail.com> wrote:
> >
> > Let me try to get the debug log when this error happens.
> >
> > Right now we have three instances each with 4 threads consuming from 12
> > partition topic.
> > So one thread per partition.
> >
> > The application is running fine much better than before. Now it usually
> > runs for a week even during peak load.
> >
> > Sometime out of blue either rocksdb throws an exception with a single
> > character (which I guess is a known issue with rocks db fixed in some
> next
> > release).
> > Or the producer gets timed out while committing some changelog topic
> > record. I had increased the timeout from 30 seconds to 180 seconds, but
> it
> > still throws exception for that time also.
> >
> > Not sure if these are due to VM issue or network.
> >
> > But whenever something like this happens, the application goes into
> > rebalance and soon things take turn for worse. Soon some of the threads
> go
> > into deadlock with above stack trace and application is now in perpetual
> > rebalance state.
> >
> > Only way to resolve this is kill all instances using -9 and restart the
> > instances one by one.
> >
> > So also long as we have a steady state of one thread per partition
> > everything is working fine. I am still working out a way to limit the
> > changelog topic size by more aggressive compaction and let me see if that
> > will make things better.
> >
> > I will try to get the logs when this happens next time.
> >
> > Thanks
> > Sachin
> >
> >
> >
> > On Sun, Apr 9, 2017 at 6:05 PM, Eno Thereska <eno.thereska@gmail.com>
> wrote:
> >
> >> Hi Sachin,
> >>
> >> It's not necessarily a deadlock. Do you have any debug traces from those
> >> nodes? Also would be useful to know the config (e.g., how many
> partitions
> >> do you have and how many app instances.)
> >>
> >> Thanks
> >> Eno
> >>
> >>> On 9 Apr 2017, at 04:45, Sachin Mittal <sjmittal@gmail.com> wrote:
> >>>
> >>> Hi,
> >>> In my streams applications cluster in one or more instances I see some
> >>> threads always waiting with the following stack.
> >>>
> >>> Every time I check on jstack I see the following trace.
> >>>
> >>> Is this some kind of new deadlock that we have failed to identify.
> >>>
> >>> Thanks
> >>> Sachin
> >>>
> >>> here is the stack trace:
> >>> ------------------------------------------------------------
> >> ------------------------------------------------------------
> >> ----------------------------------------------
> >>> "StreamThread-4" #20 prio=5 os_prio=0 tid=0x00007fb814be3000 nid=0x19bf
> >>> runnable [0x00007fb7cb4f6000]
> >>>  java.lang.Thread.State: RUNNABLE
> >>>       at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
> >>>       at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
> >>>       at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.
> >> java:93)
> >>>       at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
> >>>       - locked <0x0000000701c50c98> (a sun.nio.ch.Util$3)
> >>>       - locked <0x0000000701c50c88> (a java.util.Collections$
> >>> UnmodifiableSet)
> >>>       - locked <0x0000000701c4f6a8> (a sun.nio.ch.EPollSelectorImpl)
> >>>       at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
> >>>       at org.apache.kafka.common.network.Selector.select(
> >>> Selector.java:489)
> >>>       at org.apache.kafka.common.network.Selector.poll(
> >> Selector.java:298)
> >>>       at org.apache.kafka.clients.NetworkClient.poll(
> >>> NetworkClient.java:349)
> >>>       at org.apache.kafka.clients.consumer.internals.
> >>> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
> >>>       - locked <0x0000000701c5da48> (a org.apache.kafka.clients.
> >>> consumer.internals.ConsumerNetworkClient)
> >>>       at org.apache.kafka.clients.consumer.internals.
> >>> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:203)
> >>>       at org.apache.kafka.clients.consumer.internals.
> >>> ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:
> >> 138)
> >>>       at org.apache.kafka.clients.consumer.internals.Fetcher.
> >>> retrieveOffsetsByTimes(Fetcher.java:422)
> >>>       at org.apache.kafka.clients.consumer.internals.Fetcher.
> >>> resetOffset(Fetcher.java:370)
> >>>       at org.apache.kafka.clients.consumer.internals.Fetcher.
> >>> resetOffsetsIfNeeded(Fetcher.java:227)
> >>>       at org.apache.kafka.clients.consumer.KafkaConsumer.
> >>> updateFetchPositions(KafkaConsumer.java:1592)
> >>>       at org.apache.kafka.clients.consumer.KafkaConsumer.
> >>> position(KafkaConsumer.java:1265)
> >>>       at org.apache.kafka.streams.processor.internals.
> >>> ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:
> 213)
> >>
> >>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message