kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dmitriy Vsekhvalnov <dvsekhval...@gmail.com>
Subject Consumers re-consuming messages again after re-balance?
Date Mon, 03 Jul 2017 14:56:09 GMT
Hi all,

looking for some explanations. We running 2 instances of consumer (same
consumer group) and getting little bit weird behavior  after 3 days of
inactivity.

Env:

kafka broker 0.10.2.1
consumer java 0.10.2.1 + spring-kafka + enable.auto.commit = true (all
default settings).

Scenario:

1. running volume testing
2. once all messages are processed by consumers we keep env idle (no new
messages) for several days
3. after ~3 days one of consumer start to receive messages again.

Reproduced twice so far.

I understand it's probably due to re-balance, but
 - why after 3 days?
 - and how can we avoid it as much as we can (i know it is "at-least-once")
? (manual offset commits?)

Below is relevant lines of log from both consumer hosts.

Consumer host #1
[2017-07-02 08:20:05,445] [INFO ]
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
[callingRequestId=] [requestId=schedule-dVZt5etOBt]
 [org.springframework.kafka.listener.KafkaMessageListenerContainer]
[partitions assigned:[test.topic-1, test.topic-2, test.topic-0]]
[2017-07-02 08:20:05,432] [INFO ]
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
[callingRequestId=] [requestId=schedule-dVZt5etOBt]
 [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Setting
newly assigned partitions [test.topic-1, test.topic-2, test.topic-0] for
group test-consumer]
[2017-07-02 08:20:05,432] [INFO ]
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
[callingRequestId=] [requestId=schedule-dVZt5etOBt]
 [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
[Successfully joined group test-consumer with generation 43]
[2017-07-02 08:20:05,396] [INFO ]
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
[callingRequestId=] [requestId=schedule-dVZt5etOBt]
 [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
[(Re-)joining group test-consumer]
[2017-07-02 08:20:05,396] [INFO ]
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
[callingRequestId=] [requestId=schedule-dVZt5etOBt]
 [org.springframework.kafka.listener.KafkaMessageListenerContainer]
[partitions revoked:[test.topic-1, test.topic-0]]
[2017-07-02 08:20:05,127] [INFO ]
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
[callingRequestId=] [requestId=schedule-dVZt5etOBt]
 [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
[Revoking previously assigned partitions [test.topic-1, test.topic-0] for
group test-consumer]
[2017-06-29 07:20:37,172] [INFO ]
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
[callingRequestId=] [requestId=schedule-dVZt5etOBt]
 [org.springframework.kafka.listener.KafkaMessageListenerContainer]
[partitions assigned:[test.topic-1, test.topic-0]]
[2017-06-29 07:20:37,164] [INFO ]
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
[callingRequestId=] [requestId=schedule-dVZt5etOBt]
 [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Setting
newly assigned partitions [test.topic-1, test.topic-0] for group
test-consumer]
[2017-06-29 07:20:37,164] [INFO ]
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
[callingRequestId=] [requestId=schedule-dVZt5etOBt]
 [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
[Successfully joined group test-consumer with generation 42]
[2017-06-29 07:20:34,822] [INFO ]
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
[callingRequestId=] [requestId=schedule-dVZt5etOBt]
 [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
[(Re-)joining group test-consumer]



Consumer host #2
[2017-07-02 08:35:16,844] [INFO ]
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
[callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
 [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
[Discovered coordinator 172.18.3.132:9092 (id: 2147482638 rack: null) for
group test-consumer.]
[2017-07-02 08:22:25,526] [INFO ]
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
[callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
 [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Marking
the coordinator 172.18.3.132:9092 (id: 2147482638 rack: null) dead for
group test-consumer]
[2017-06-29 07:20:37,168] [INFO ]
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
[callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
 [org.springframework.kafka.listener.KafkaMessageListenerContainer]
[partitions assigned:[test.topic-2]]
[2017-06-29 07:20:37,163] [INFO ]
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
[callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
 [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Setting
newly assigned partitions [test.topic-2] for group test-consumer]
[2017-06-29 07:20:37,163] [INFO ]
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
[callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
 [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
[Successfully joined group test-consumer with generation 42]
[2017-06-29 07:20:37,156] [INFO ]
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
[callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
 [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
[(Re-)joining group test-consumer]

Thanks all.

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message