kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dmitriy Vsekhvalnov <dvsekhval...@gmail.com>
Subject Re: Consumers re-consuming messages again after re-balance?
Date Tue, 04 Jul 2017 17:05:37 GMT
Thanks guys,

was exactly `offsets.retention.minutes`.

Figured out that `enable.auto.commit` was set to false in reality,
somewhere deep in spring properties and that's what have been causing
offsets removal when idle.



On Mon, Jul 3, 2017 at 7:04 PM, Dmitriy Vsekhvalnov <dvsekhvalnov@gmail.com>
wrote:

> Ouch, interesting.
>
> If by chance auto offset commit failed? Is there is way to prove it
> (something to search in a logs)?
>
> On Mon, Jul 3, 2017 at 6:29 PM, Tom Bentley <t.j.bentley@gmail.com> wrote:
>
>> Hi Dmitriy,
>>
>> FTR, https://issues.apache.org/jira/browse/KAFKA-3806 is the issue Damian
>> is referring to, but it doesn't quite fit what you describe because you
>> said your consumer was configured with enable.auto.commit = true, which
>> should keep committing even if there are no messages being consumed.
>>
>> On 3 July 2017 at 16:25, Damian Guy <damian.guy@gmail.com> wrote:
>>
>> > Hi Dmitriy,
>> >
>> > It is possibly related to the broker setting
>> `offsets.retention.minutes` -
>> > this defaults to 24 hours. If an offset hasn't been updated within that
>> > time it will be removed. So if your env was sitting idle for longer than
>> > this period, then rebalanced, you will likely start consuming the
>> messages
>> > from the earliest offset again. I'd recommend setting this higher than
>> the
>> > default of 24 hours.
>> >
>> > Thanks,
>> > Damian
>> >
>> > On Mon, 3 Jul 2017 at 15:56 Dmitriy Vsekhvalnov <dvsekhvalnov@gmail.com
>> >
>> > wrote:
>> >
>> > > Hi all,
>> > >
>> > > looking for some explanations. We running 2 instances of consumer
>> (same
>> > > consumer group) and getting little bit weird behavior  after 3 days of
>> > > inactivity.
>> > >
>> > > Env:
>> > >
>> > > kafka broker 0.10.2.1
>> > > consumer java 0.10.2.1 + spring-kafka + enable.auto.commit = true (all
>> > > default settings).
>> > >
>> > > Scenario:
>> > >
>> > > 1. running volume testing
>> > > 2. once all messages are processed by consumers we keep env idle (no
>> new
>> > > messages) for several days
>> > > 3. after ~3 days one of consumer start to receive messages again.
>> > >
>> > > Reproduced twice so far.
>> > >
>> > > I understand it's probably due to re-balance, but
>> > >  - why after 3 days?
>> > >  - and how can we avoid it as much as we can (i know it is
>> > "at-least-once")
>> > > ? (manual offset commits?)
>> > >
>> > > Below is relevant lines of log from both consumer hosts.
>> > >
>> > > Consumer host #1
>> > > [2017-07-02 08:20:05,445] [INFO ]
>> > >
>> > > [org.springframework.kafka.KafkaListenerEndpointContainer
>> > #0-0-kafka-consumer-1]
>> > > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
>> > >  [org.springframework.kafka.listener.KafkaMessageListenerContainer]
>> > > [partitions assigned:[test.topic-1, test.topic-2, test.topic-0]]
>> > > [2017-07-02 08:20:05,432] [INFO ]
>> > >
>> > > [org.springframework.kafka.KafkaListenerEndpointContainer
>> > #0-0-kafka-consumer-1]
>> > > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
>> > >  [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
>> > [Setting
>> > > newly assigned partitions [test.topic-1, test.topic-2, test.topic-0]
>> for
>> > > group test-consumer]
>> > > [2017-07-02 08:20:05,432] [INFO ]
>> > >
>> > > [org.springframework.kafka.KafkaListenerEndpointContainer
>> > #0-0-kafka-consumer-1]
>> > > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
>> > >  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
>> > > [Successfully joined group test-consumer with generation 43]
>> > > [2017-07-02 08:20:05,396] [INFO ]
>> > >
>> > > [org.springframework.kafka.KafkaListenerEndpointContainer
>> > #0-0-kafka-consumer-1]
>> > > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
>> > >  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
>> > > [(Re-)joining group test-consumer]
>> > > [2017-07-02 08:20:05,396] [INFO ]
>> > >
>> > > [org.springframework.kafka.KafkaListenerEndpointContainer
>> > #0-0-kafka-consumer-1]
>> > > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
>> > >  [org.springframework.kafka.listener.KafkaMessageListenerContainer]
>> > > [partitions revoked:[test.topic-1, test.topic-0]]
>> > > [2017-07-02 08:20:05,127] [INFO ]
>> > >
>> > > [org.springframework.kafka.KafkaListenerEndpointContainer
>> > #0-0-kafka-consumer-1]
>> > > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
>> > >  [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
>> > > [Revoking previously assigned partitions [test.topic-1, test.topic-0]
>> for
>> > > group test-consumer]
>> > > [2017-06-29 07:20:37,172] [INFO ]
>> > >
>> > > [org.springframework.kafka.KafkaListenerEndpointContainer
>> > #0-0-kafka-consumer-1]
>> > > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
>> > >  [org.springframework.kafka.listener.KafkaMessageListenerContainer]
>> > > [partitions assigned:[test.topic-1, test.topic-0]]
>> > > [2017-06-29 07:20:37,164] [INFO ]
>> > >
>> > > [org.springframework.kafka.KafkaListenerEndpointContainer
>> > #0-0-kafka-consumer-1]
>> > > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
>> > >  [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
>> > [Setting
>> > > newly assigned partitions [test.topic-1, test.topic-0] for group
>> > > test-consumer]
>> > > [2017-06-29 07:20:37,164] [INFO ]
>> > >
>> > > [org.springframework.kafka.KafkaListenerEndpointContainer
>> > #0-0-kafka-consumer-1]
>> > > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
>> > >  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
>> > > [Successfully joined group test-consumer with generation 42]
>> > > [2017-06-29 07:20:34,822] [INFO ]
>> > >
>> > > [org.springframework.kafka.KafkaListenerEndpointContainer
>> > #0-0-kafka-consumer-1]
>> > > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
>> > >  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
>> > > [(Re-)joining group test-consumer]
>> > >
>> > >
>> > >
>> > > Consumer host #2
>> > > [2017-07-02 08:35:16,844] [INFO ]
>> > >
>> > > [org.springframework.kafka.KafkaListenerEndpointContainer
>> > #0-0-kafka-consumer-1]
>> > > [callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
>> > >  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
>> > > [Discovered coordinator 172.18.3.132:9092 (id: 2147482638
>> > > <(214)%20748-2638> rack: null) for
>> > > group test-consumer.]
>> > > [2017-07-02 08:22:25,526] [INFO ]
>> > >
>> > > [org.springframework.kafka.KafkaListenerEndpointContainer
>> > #0-0-kafka-consumer-1]
>> > > [callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
>> > >  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
>> > [Marking
>> > > the coordinator 172.18.3.132:9092 (id: 2147482638 <(214)%20748-2638>
>> > > rack: null) dead for
>> > > group test-consumer]
>> > > [2017-06-29 07:20:37,168] [INFO ]
>> > >
>> > > [org.springframework.kafka.KafkaListenerEndpointContainer
>> > #0-0-kafka-consumer-1]
>> > > [callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
>> > >  [org.springframework.kafka.listener.KafkaMessageListenerContainer]
>> > > [partitions assigned:[test.topic-2]]
>> > > [2017-06-29 07:20:37,163] [INFO ]
>> > >
>> > > [org.springframework.kafka.KafkaListenerEndpointContainer
>> > #0-0-kafka-consumer-1]
>> > > [callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
>> > >  [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
>> > [Setting
>> > > newly assigned partitions [test.topic-2] for group test-consumer]
>> > > [2017-06-29 07:20:37,163] [INFO ]
>> > >
>> > > [org.springframework.kafka.KafkaListenerEndpointContainer
>> > #0-0-kafka-consumer-1]
>> > > [callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
>> > >  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
>> > > [Successfully joined group test-consumer with generation 42]
>> > > [2017-06-29 07:20:37,156] [INFO ]
>> > >
>> > > [org.springframework.kafka.KafkaListenerEndpointContainer
>> > #0-0-kafka-consumer-1]
>> > > [callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
>> > >  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
>> > > [(Re-)joining group test-consumer]
>> > >
>> > > Thanks all.
>> > >
>> >
>>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message