kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "飞翔的加菲猫" <526564...@qq.com>
Subject New increased partitions could not be rebalance, until stop all consumers and start them
Date Fri, 12 Oct 2018 08:42:25 GMT
Hi Kafka team, 


I meet a strange thing about Kafka rebalance. If I increase partitions of a topic which subscribed
by some java consumers(in same one group), there is no rebalance occur. Furthermore, if I
start a new consumer (or stop one) to cause a rebalance, the increased partitions could not
be assigned, until I stop all consumers and start them. Is that normal?


Thanks,
Ruiping Li


--------------------------------------------------------------------------------
Below is my test: 
1. Start Kafka, ZK. Create a normal topic(test-topic) with 1 partitions 
./bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic test-topic --partitions
1 --replication-factor 1 --config retention.ms=604800000 
2. Start 2 java consumers (C1, C2), subscribe test-topic 
3. Increase 2 partitions of test-topic  
rpli@rpli-mac:~/Softwares/Tools/kafka_2.11-1.0.0$ ./bin/kafka-topics.sh --zookeeper 127.0.0.1:2181
--alter --topic test-topic --partitions 3 
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering
of the messages will be affected 
Adding partitions succeeded! 

Increasing succeeded: 
rpli@rpli-mac:~/Softwares/Tools/kafka_2.11-1.0.0$ ./bin/kafka-topics.sh --zookeeper 127.0.0.1:2181
--describe --topic test-topic 
Topic:test-topic    PartitionCount:3    ReplicationFactor:1    Configs:retention.ms=604800000

    Topic: test-topic    Partition: 0    Leader: 0    Replicas: 0    Isr: 0 
    Topic: test-topic    Partition: 1    Leader: 0    Replicas: 0    Isr: 0 
    Topic: test-topic    Partition: 2    Leader: 0    Replicas: 0    Isr: 0 

There is no rebalance occur in C1, C2. 
4. Start a new consumer C3 to subscribed test-topic. Rebalance occur, but only partition test-topic-0
involved in reassigned, no test-topic-1 and test-topic-2.  
5. I try to stop C2, C3, and test-topic-1 and test-topic-2 still not involved. 
6. Stop all running consumers, and then start them. All test-topic-0,1,2 assigned normally.



Environment
kafka & java api version: kafka_2.12-2.0.0 (I also tried kafka_2.11-1.0.0 and kafka_2.10-0.10.2.1,
same result) 
zookeeper: 3.4.13 
consumer code: 
// consumer
public class KafkaConsumerThread extends Thread { 
    // consumer settings 
    public static org.apache.kafka.clients.consumer.KafkaConsumer<String, String> createNativeConsumer(String
groupName, String kafkaBootstrap) { 
        Properties props = new Properties(); 
        props.put("bootstrap.servers", kafkaBootstrap); 
        props.put("group.id", groupName); 
        props.put("auto.offset.reset", "earliest"); 
        props.put("enable.auto.commit", true); 
        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");



        return new KafkaConsumer<String, String>(props); 
    } 


    private static final Logger log = LoggerFactory.getLogger(KafkaConsumerThread.class);

    private boolean stop = false; 
    private KafkaConsumer<String, String> consumer; 
    private String topicName; 
    private ConsumerRebalanceListener consumerRebalanceListener; 
    private AtomicLong receivedRecordNumber = new AtomicLong(0); 


    public KafkaConsumerThread(String topicName, String groupName, ConsumerRebalanceListener
consumerRebalanceListener, String kafkaBootstrap) { 
        this.consumer = createNativeConsumer(groupName, kafkaBootstrap); 
        this.topicName = topicName; 
        this.consumerRebalanceListener = consumerRebalanceListener; 
    } 


    @Override 
    public void run() { 
        log.info("Start consumer .."); 
        consumer.subscribe(Collections.singleton(topicName), consumerRebalanceListener); 
        while (!stop) { 
            try { 
                ConsumerRecords<String, String> records = consumer.poll(100); 
                receivedRecordNumber.addAndGet(records.count()); 
                Iterator<ConsumerRecord<String, String>> iterator = records.iterator();

                while (iterator.hasNext()) { 
                    ConsumerRecord<String, String> record = iterator.next(); 
                    log.info("Receive [key:{}][value:{}]", record.key(), record.value());

                } 
            } catch (TimeoutException e) { 
                log.info("no data"); 
            } 
        } 
        consumer.close(); 
    } 


    public void stopConsumer() { 
        this.stop = true; 
    } 
}
Mime
  • Unnamed multipart/alternative (inline, 8-Bit, 0 bytes)
View raw message