kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dmitriy Vsekhvalnov <dvsekhval...@gmail.com>
Subject Re: Consumers re-consuming messages again after re-balance?
Date Mon, 03 Jul 2017 16:04:36 GMT
Ouch, interesting.

If by chance auto offset commit failed? Is there is way to prove it
(something to search in a logs)?

On Mon, Jul 3, 2017 at 6:29 PM, Tom Bentley <t.j.bentley@gmail.com> wrote:

> Hi Dmitriy,
>
> FTR, https://issues.apache.org/jira/browse/KAFKA-3806 is the issue Damian
> is referring to, but it doesn't quite fit what you describe because you
> said your consumer was configured with enable.auto.commit = true, which
> should keep committing even if there are no messages being consumed.
>
> On 3 July 2017 at 16:25, Damian Guy <damian.guy@gmail.com> wrote:
>
> > Hi Dmitriy,
> >
> > It is possibly related to the broker setting `offsets.retention.minutes`
> -
> > this defaults to 24 hours. If an offset hasn't been updated within that
> > time it will be removed. So if your env was sitting idle for longer than
> > this period, then rebalanced, you will likely start consuming the
> messages
> > from the earliest offset again. I'd recommend setting this higher than
> the
> > default of 24 hours.
> >
> > Thanks,
> > Damian
> >
> > On Mon, 3 Jul 2017 at 15:56 Dmitriy Vsekhvalnov <dvsekhvalnov@gmail.com>
> > wrote:
> >
> > > Hi all,
> > >
> > > looking for some explanations. We running 2 instances of consumer (same
> > > consumer group) and getting little bit weird behavior  after 3 days of
> > > inactivity.
> > >
> > > Env:
> > >
> > > kafka broker 0.10.2.1
> > > consumer java 0.10.2.1 + spring-kafka + enable.auto.commit = true (all
> > > default settings).
> > >
> > > Scenario:
> > >
> > > 1. running volume testing
> > > 2. once all messages are processed by consumers we keep env idle (no
> new
> > > messages) for several days
> > > 3. after ~3 days one of consumer start to receive messages again.
> > >
> > > Reproduced twice so far.
> > >
> > > I understand it's probably due to re-balance, but
> > >  - why after 3 days?
> > >  - and how can we avoid it as much as we can (i know it is
> > "at-least-once")
> > > ? (manual offset commits?)
> > >
> > > Below is relevant lines of log from both consumer hosts.
> > >
> > > Consumer host #1
> > > [2017-07-02 08:20:05,445] [INFO ]
> > >
> > > [org.springframework.kafka.KafkaListenerEndpointContainer
> > #0-0-kafka-consumer-1]
> > > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
> > >  [org.springframework.kafka.listener.KafkaMessageListenerContainer]
> > > [partitions assigned:[test.topic-1, test.topic-2, test.topic-0]]
> > > [2017-07-02 08:20:05,432] [INFO ]
> > >
> > > [org.springframework.kafka.KafkaListenerEndpointContainer
> > #0-0-kafka-consumer-1]
> > > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
> > >  [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
> > [Setting
> > > newly assigned partitions [test.topic-1, test.topic-2, test.topic-0]
> for
> > > group test-consumer]
> > > [2017-07-02 08:20:05,432] [INFO ]
> > >
> > > [org.springframework.kafka.KafkaListenerEndpointContainer
> > #0-0-kafka-consumer-1]
> > > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
> > >  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
> > > [Successfully joined group test-consumer with generation 43]
> > > [2017-07-02 08:20:05,396] [INFO ]
> > >
> > > [org.springframework.kafka.KafkaListenerEndpointContainer
> > #0-0-kafka-consumer-1]
> > > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
> > >  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
> > > [(Re-)joining group test-consumer]
> > > [2017-07-02 08:20:05,396] [INFO ]
> > >
> > > [org.springframework.kafka.KafkaListenerEndpointContainer
> > #0-0-kafka-consumer-1]
> > > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
> > >  [org.springframework.kafka.listener.KafkaMessageListenerContainer]
> > > [partitions revoked:[test.topic-1, test.topic-0]]
> > > [2017-07-02 08:20:05,127] [INFO ]
> > >
> > > [org.springframework.kafka.KafkaListenerEndpointContainer
> > #0-0-kafka-consumer-1]
> > > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
> > >  [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
> > > [Revoking previously assigned partitions [test.topic-1, test.topic-0]
> for
> > > group test-consumer]
> > > [2017-06-29 07:20:37,172] [INFO ]
> > >
> > > [org.springframework.kafka.KafkaListenerEndpointContainer
> > #0-0-kafka-consumer-1]
> > > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
> > >  [org.springframework.kafka.listener.KafkaMessageListenerContainer]
> > > [partitions assigned:[test.topic-1, test.topic-0]]
> > > [2017-06-29 07:20:37,164] [INFO ]
> > >
> > > [org.springframework.kafka.KafkaListenerEndpointContainer
> > #0-0-kafka-consumer-1]
> > > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
> > >  [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
> > [Setting
> > > newly assigned partitions [test.topic-1, test.topic-0] for group
> > > test-consumer]
> > > [2017-06-29 07:20:37,164] [INFO ]
> > >
> > > [org.springframework.kafka.KafkaListenerEndpointContainer
> > #0-0-kafka-consumer-1]
> > > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
> > >  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
> > > [Successfully joined group test-consumer with generation 42]
> > > [2017-06-29 07:20:34,822] [INFO ]
> > >
> > > [org.springframework.kafka.KafkaListenerEndpointContainer
> > #0-0-kafka-consumer-1]
> > > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
> > >  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
> > > [(Re-)joining group test-consumer]
> > >
> > >
> > >
> > > Consumer host #2
> > > [2017-07-02 08:35:16,844] [INFO ]
> > >
> > > [org.springframework.kafka.KafkaListenerEndpointContainer
> > #0-0-kafka-consumer-1]
> > > [callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
> > >  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
> > > [Discovered coordinator 172.18.3.132:9092 (id: 2147482638
> > > <(214)%20748-2638> rack: null) for
> > > group test-consumer.]
> > > [2017-07-02 08:22:25,526] [INFO ]
> > >
> > > [org.springframework.kafka.KafkaListenerEndpointContainer
> > #0-0-kafka-consumer-1]
> > > [callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
> > >  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
> > [Marking
> > > the coordinator 172.18.3.132:9092 (id: 2147482638 <(214)%20748-2638>
> > > rack: null) dead for
> > > group test-consumer]
> > > [2017-06-29 07:20:37,168] [INFO ]
> > >
> > > [org.springframework.kafka.KafkaListenerEndpointContainer
> > #0-0-kafka-consumer-1]
> > > [callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
> > >  [org.springframework.kafka.listener.KafkaMessageListenerContainer]
> > > [partitions assigned:[test.topic-2]]
> > > [2017-06-29 07:20:37,163] [INFO ]
> > >
> > > [org.springframework.kafka.KafkaListenerEndpointContainer
> > #0-0-kafka-consumer-1]
> > > [callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
> > >  [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
> > [Setting
> > > newly assigned partitions [test.topic-2] for group test-consumer]
> > > [2017-06-29 07:20:37,163] [INFO ]
> > >
> > > [org.springframework.kafka.KafkaListenerEndpointContainer
> > #0-0-kafka-consumer-1]
> > > [callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
> > >  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
> > > [Successfully joined group test-consumer with generation 42]
> > > [2017-06-29 07:20:37,156] [INFO ]
> > >
> > > [org.springframework.kafka.KafkaListenerEndpointContainer
> > #0-0-kafka-consumer-1]
> > > [callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
> > >  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
> > > [(Re-)joining group test-consumer]
> > >
> > > Thanks all.
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message