kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Damian Guy <damian....@gmail.com>
Subject Re: Consumers re-consuming messages again after re-balance?
Date Mon, 03 Jul 2017 15:25:08 GMT
Hi Dmitriy,

It is possibly related to the broker setting `offsets.retention.minutes` -
this defaults to 24 hours. If an offset hasn't been updated within that
time it will be removed. So if your env was sitting idle for longer than
this period, then rebalanced, you will likely start consuming the messages
from the earliest offset again. I'd recommend setting this higher than the
default of 24 hours.

Thanks,
Damian

On Mon, 3 Jul 2017 at 15:56 Dmitriy Vsekhvalnov <dvsekhvalnov@gmail.com>
wrote:

> Hi all,
>
> looking for some explanations. We running 2 instances of consumer (same
> consumer group) and getting little bit weird behavior  after 3 days of
> inactivity.
>
> Env:
>
> kafka broker 0.10.2.1
> consumer java 0.10.2.1 + spring-kafka + enable.auto.commit = true (all
> default settings).
>
> Scenario:
>
> 1. running volume testing
> 2. once all messages are processed by consumers we keep env idle (no new
> messages) for several days
> 3. after ~3 days one of consumer start to receive messages again.
>
> Reproduced twice so far.
>
> I understand it's probably due to re-balance, but
>  - why after 3 days?
>  - and how can we avoid it as much as we can (i know it is "at-least-once")
> ? (manual offset commits?)
>
> Below is relevant lines of log from both consumer hosts.
>
> Consumer host #1
> [2017-07-02 08:20:05,445] [INFO ]
>
> [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
> [callingRequestId=] [requestId=schedule-dVZt5etOBt]
>  [org.springframework.kafka.listener.KafkaMessageListenerContainer]
> [partitions assigned:[test.topic-1, test.topic-2, test.topic-0]]
> [2017-07-02 08:20:05,432] [INFO ]
>
> [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
> [callingRequestId=] [requestId=schedule-dVZt5etOBt]
>  [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Setting
> newly assigned partitions [test.topic-1, test.topic-2, test.topic-0] for
> group test-consumer]
> [2017-07-02 08:20:05,432] [INFO ]
>
> [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
> [callingRequestId=] [requestId=schedule-dVZt5etOBt]
>  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
> [Successfully joined group test-consumer with generation 43]
> [2017-07-02 08:20:05,396] [INFO ]
>
> [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
> [callingRequestId=] [requestId=schedule-dVZt5etOBt]
>  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
> [(Re-)joining group test-consumer]
> [2017-07-02 08:20:05,396] [INFO ]
>
> [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
> [callingRequestId=] [requestId=schedule-dVZt5etOBt]
>  [org.springframework.kafka.listener.KafkaMessageListenerContainer]
> [partitions revoked:[test.topic-1, test.topic-0]]
> [2017-07-02 08:20:05,127] [INFO ]
>
> [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
> [callingRequestId=] [requestId=schedule-dVZt5etOBt]
>  [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
> [Revoking previously assigned partitions [test.topic-1, test.topic-0] for
> group test-consumer]
> [2017-06-29 07:20:37,172] [INFO ]
>
> [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
> [callingRequestId=] [requestId=schedule-dVZt5etOBt]
>  [org.springframework.kafka.listener.KafkaMessageListenerContainer]
> [partitions assigned:[test.topic-1, test.topic-0]]
> [2017-06-29 07:20:37,164] [INFO ]
>
> [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
> [callingRequestId=] [requestId=schedule-dVZt5etOBt]
>  [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Setting
> newly assigned partitions [test.topic-1, test.topic-0] for group
> test-consumer]
> [2017-06-29 07:20:37,164] [INFO ]
>
> [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
> [callingRequestId=] [requestId=schedule-dVZt5etOBt]
>  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
> [Successfully joined group test-consumer with generation 42]
> [2017-06-29 07:20:34,822] [INFO ]
>
> [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
> [callingRequestId=] [requestId=schedule-dVZt5etOBt]
>  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
> [(Re-)joining group test-consumer]
>
>
>
> Consumer host #2
> [2017-07-02 08:35:16,844] [INFO ]
>
> [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
> [callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
>  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
> [Discovered coordinator 172.18.3.132:9092 (id: 2147482638
> <(214)%20748-2638> rack: null) for
> group test-consumer.]
> [2017-07-02 08:22:25,526] [INFO ]
>
> [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
> [callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
>  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Marking
> the coordinator 172.18.3.132:9092 (id: 2147482638 <(214)%20748-2638>
> rack: null) dead for
> group test-consumer]
> [2017-06-29 07:20:37,168] [INFO ]
>
> [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
> [callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
>  [org.springframework.kafka.listener.KafkaMessageListenerContainer]
> [partitions assigned:[test.topic-2]]
> [2017-06-29 07:20:37,163] [INFO ]
>
> [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
> [callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
>  [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Setting
> newly assigned partitions [test.topic-2] for group test-consumer]
> [2017-06-29 07:20:37,163] [INFO ]
>
> [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
> [callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
>  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
> [Successfully joined group test-consumer with generation 42]
> [2017-06-29 07:20:37,156] [INFO ]
>
> [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
> [callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
>  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
> [(Re-)joining group test-consumer]
>
> Thanks all.
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message