kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tom Bentley <t.j.bent...@gmail.com>
Subject Re: Consumers re-consuming messages again after re-balance?
Date Mon, 03 Jul 2017 15:29:58 GMT
Hi Dmitriy,

FTR, https://issues.apache.org/jira/browse/KAFKA-3806 is the issue Damian
is referring to, but it doesn't quite fit what you describe because you
said your consumer was configured with enable.auto.commit = true, which
should keep committing even if there are no messages being consumed.

On 3 July 2017 at 16:25, Damian Guy <damian.guy@gmail.com> wrote:

> Hi Dmitriy,
>
> It is possibly related to the broker setting `offsets.retention.minutes` -
> this defaults to 24 hours. If an offset hasn't been updated within that
> time it will be removed. So if your env was sitting idle for longer than
> this period, then rebalanced, you will likely start consuming the messages
> from the earliest offset again. I'd recommend setting this higher than the
> default of 24 hours.
>
> Thanks,
> Damian
>
> On Mon, 3 Jul 2017 at 15:56 Dmitriy Vsekhvalnov <dvsekhvalnov@gmail.com>
> wrote:
>
> > Hi all,
> >
> > looking for some explanations. We running 2 instances of consumer (same
> > consumer group) and getting little bit weird behavior  after 3 days of
> > inactivity.
> >
> > Env:
> >
> > kafka broker 0.10.2.1
> > consumer java 0.10.2.1 + spring-kafka + enable.auto.commit = true (all
> > default settings).
> >
> > Scenario:
> >
> > 1. running volume testing
> > 2. once all messages are processed by consumers we keep env idle (no new
> > messages) for several days
> > 3. after ~3 days one of consumer start to receive messages again.
> >
> > Reproduced twice so far.
> >
> > I understand it's probably due to re-balance, but
> >  - why after 3 days?
> >  - and how can we avoid it as much as we can (i know it is
> "at-least-once")
> > ? (manual offset commits?)
> >
> > Below is relevant lines of log from both consumer hosts.
> >
> > Consumer host #1
> > [2017-07-02 08:20:05,445] [INFO ]
> >
> > [org.springframework.kafka.KafkaListenerEndpointContainer
> #0-0-kafka-consumer-1]
> > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
> >  [org.springframework.kafka.listener.KafkaMessageListenerContainer]
> > [partitions assigned:[test.topic-1, test.topic-2, test.topic-0]]
> > [2017-07-02 08:20:05,432] [INFO ]
> >
> > [org.springframework.kafka.KafkaListenerEndpointContainer
> #0-0-kafka-consumer-1]
> > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
> >  [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
> [Setting
> > newly assigned partitions [test.topic-1, test.topic-2, test.topic-0] for
> > group test-consumer]
> > [2017-07-02 08:20:05,432] [INFO ]
> >
> > [org.springframework.kafka.KafkaListenerEndpointContainer
> #0-0-kafka-consumer-1]
> > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
> >  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
> > [Successfully joined group test-consumer with generation 43]
> > [2017-07-02 08:20:05,396] [INFO ]
> >
> > [org.springframework.kafka.KafkaListenerEndpointContainer
> #0-0-kafka-consumer-1]
> > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
> >  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
> > [(Re-)joining group test-consumer]
> > [2017-07-02 08:20:05,396] [INFO ]
> >
> > [org.springframework.kafka.KafkaListenerEndpointContainer
> #0-0-kafka-consumer-1]
> > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
> >  [org.springframework.kafka.listener.KafkaMessageListenerContainer]
> > [partitions revoked:[test.topic-1, test.topic-0]]
> > [2017-07-02 08:20:05,127] [INFO ]
> >
> > [org.springframework.kafka.KafkaListenerEndpointContainer
> #0-0-kafka-consumer-1]
> > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
> >  [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
> > [Revoking previously assigned partitions [test.topic-1, test.topic-0] for
> > group test-consumer]
> > [2017-06-29 07:20:37,172] [INFO ]
> >
> > [org.springframework.kafka.KafkaListenerEndpointContainer
> #0-0-kafka-consumer-1]
> > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
> >  [org.springframework.kafka.listener.KafkaMessageListenerContainer]
> > [partitions assigned:[test.topic-1, test.topic-0]]
> > [2017-06-29 07:20:37,164] [INFO ]
> >
> > [org.springframework.kafka.KafkaListenerEndpointContainer
> #0-0-kafka-consumer-1]
> > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
> >  [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
> [Setting
> > newly assigned partitions [test.topic-1, test.topic-0] for group
> > test-consumer]
> > [2017-06-29 07:20:37,164] [INFO ]
> >
> > [org.springframework.kafka.KafkaListenerEndpointContainer
> #0-0-kafka-consumer-1]
> > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
> >  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
> > [Successfully joined group test-consumer with generation 42]
> > [2017-06-29 07:20:34,822] [INFO ]
> >
> > [org.springframework.kafka.KafkaListenerEndpointContainer
> #0-0-kafka-consumer-1]
> > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
> >  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
> > [(Re-)joining group test-consumer]
> >
> >
> >
> > Consumer host #2
> > [2017-07-02 08:35:16,844] [INFO ]
> >
> > [org.springframework.kafka.KafkaListenerEndpointContainer
> #0-0-kafka-consumer-1]
> > [callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
> >  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
> > [Discovered coordinator 172.18.3.132:9092 (id: 2147482638
> > <(214)%20748-2638> rack: null) for
> > group test-consumer.]
> > [2017-07-02 08:22:25,526] [INFO ]
> >
> > [org.springframework.kafka.KafkaListenerEndpointContainer
> #0-0-kafka-consumer-1]
> > [callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
> >  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
> [Marking
> > the coordinator 172.18.3.132:9092 (id: 2147482638 <(214)%20748-2638>
> > rack: null) dead for
> > group test-consumer]
> > [2017-06-29 07:20:37,168] [INFO ]
> >
> > [org.springframework.kafka.KafkaListenerEndpointContainer
> #0-0-kafka-consumer-1]
> > [callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
> >  [org.springframework.kafka.listener.KafkaMessageListenerContainer]
> > [partitions assigned:[test.topic-2]]
> > [2017-06-29 07:20:37,163] [INFO ]
> >
> > [org.springframework.kafka.KafkaListenerEndpointContainer
> #0-0-kafka-consumer-1]
> > [callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
> >  [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
> [Setting
> > newly assigned partitions [test.topic-2] for group test-consumer]
> > [2017-06-29 07:20:37,163] [INFO ]
> >
> > [org.springframework.kafka.KafkaListenerEndpointContainer
> #0-0-kafka-consumer-1]
> > [callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
> >  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
> > [Successfully joined group test-consumer with generation 42]
> > [2017-06-29 07:20:37,156] [INFO ]
> >
> > [org.springframework.kafka.KafkaListenerEndpointContainer
> #0-0-kafka-consumer-1]
> > [callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
> >  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
> > [(Re-)joining group test-consumer]
> >
> > Thanks all.
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message