kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From 赖剑清 <laijianq...@tp-link.com.cn>
Subject RE: New increased partitions could not be rebalance, until stop all consumers and start them
Date Fri, 19 Oct 2018 02:09:21 GMT
Hi,
I don't think it's a bug.
1. The increasement of the partitions will marked the metadata.updateNeeded=true, however
this won't really trigger a update till the next metadata.expire time (the default METADATA_MAX_AGE_CONFIG
is 5*60*1000 ms).
2. Before the leader of the group update its metadata, the change of consumer number won't
cause a rebalance neither: 
	a. the leader of the group (a consumer, not broker) assign the partitions and tell the coordinator(broker)
the result. 
	b. when a new consumer join the group, it find the coordinator, join group then send a SyncGroupRequest
to the coordinator and receive the assignment.
	c. for the leader haven't update its metadata yet, the subscription of a new consumer will
not trigger a rebalance for the topic with "only one" partition.
So if you want to make you consumer group much more sensitive to the change of partition number,
you may cut down the CommonClientConfigs.METADATA_MAX_AGE_CONFIG(metadata.max.age.ms).

>-----Original Message-----
>From: 飞翔的加菲猫 [mailto:526564746@qq.com]
>Sent: Thursday, October 18, 2018 10:47 AM
>To: users <users@kafka.apache.org>
>Subject: Re: New increased partitions could not be rebalance, until stop all
>consumers and start them
>
>Sorry for bothering. I don't know whether it is a bug. Maybe something wrong
>in my test or there is explanation for it. Could any Kafka master help take a
>look? Thanks lot.
>
>
>Ruiping Li
>
>
>------------------ 原始邮件 ------------------
>发件人: "526564746"<526564746@qq.com>;
>发送时间: 2018年10月12日(星期五) 下午4:42
>收件人: "users"<users@kafka.apache.org>;
>
>主题: New increased partitions could not be rebalance, until stop all
>consumers and start them
>
>
>
> Hi Kafka team,
>
>
>I meet a strange thing about Kafka rebalance. If I increase partitions of a topic
>which subscribed by some java consumers(in same one group), there is no
>rebalance occur. Furthermore, if I start a new consumer (or stop one) to cause
>a rebalance, the increased partitions could not be assigned, until I stop all
>consumers and start them. Is that normal?
>
>
>Thanks,
>Ruiping Li
>
>
>--------------------------------------------------------------------------------
>Below is my test:
>1. Start Kafka, ZK. Create a normal topic(test-topic) with 1 partitions
>$ ./bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic test-topic --
>partitions 1 --replication-factor 1 --config retention.ms=604800000 2. Start 2
>java consumers (C1, C2), subscribe test-topic 3. Increase 2 partitions of test-
>topic $ ./bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --alter --topic test-
>topic --partitions 3
>WARNING: If partitions are increased for a topic that has a key, the partition
>logic or ordering of the messages will be affected Adding partitions succeeded!
>
>Increasing succeeded:
>$ ./bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe --topic test-topic
>Topic:test-topic    PartitionCount:3    ReplicationFactor:1
>Configs:retention.ms=604800000
>    Topic: test-topic    Partition: 0    Leader: 0    Replicas: 0    Isr: 0
>    Topic: test-topic    Partition: 1    Leader: 0    Replicas: 0    Isr: 0
>    Topic: test-topic    Partition: 2    Leader: 0    Replicas: 0    Isr: 0
>
>There is no rebalance occur in C1, C2.
>4. Start a new consumer C3 to subscribed test-topic. Rebalance occur, but only
>partition test-topic-0 involved in reassigned, no test-topic-1 and test-topic-2.
>5. I try to stop C2, C3, and test-topic-1 and test-topic-2 still not be assigned.
>6. Stop all running consumers, and then start them. All test-topic-0,1,2
>assigned normally.
>
>
>Environment
>kafka & java api version: kafka_2.12-2.0.0 (I also tried kafka_2.11-1.0.0 and
>kafka_2.10-0.10.2.1, same result)
>zookeeper: 3.4.13
>consumer code:
>// consumer
>public class KafkaConsumerThread extends Thread {
>    // consumer settings
>    public static org.apache.kafka.clients.consumer.KafkaConsumer<String,
>String> createNativeConsumer(String groupName, String kafkaBootstrap) {
>        Properties props = new Properties();
>        props.put("bootstrap.servers", kafkaBootstrap);
>        props.put("group.id", groupName);
>        props.put("auto.offset.reset", "earliest");
>        props.put("enable.auto.commit", true);
>
>props.put("key.deserializer","org.apache.kafka.common.serialization.StringD
>eserializer");
>
>props.put("value.deserializer","org.apache.kafka.common.serialization.String
>Deserializer");
>
>
>        return new KafkaConsumer<String, String>(props);
>    }
>
>
>    private static final Logger log =
>LoggerFactory.getLogger(KafkaConsumerThread.class);
>    private boolean stop = false;
>    private KafkaConsumer<String, String> consumer;
>    private String topicName;
>    private ConsumerRebalanceListener consumerRebalanceListener;
>    private AtomicLong receivedRecordNumber = new AtomicLong(0);
>
>
>    public KafkaConsumerThread(String topicName, String groupName,
>ConsumerRebalanceListener consumerRebalanceListener, String
>kafkaBootstrap) {
>        this.consumer = createNativeConsumer(groupName, kafkaBootstrap);
>        this.topicName = topicName;
>        this.consumerRebalanceListener = consumerRebalanceListener;
>    }
>
>
>    @Override
>    public void run() {
>        log.info("Start consumer ..");
>        consumer.subscribe(Collections.singleton(topicName),
>consumerRebalanceListener);
>        while (!stop) {
>            try {
>                ConsumerRecords<String, String> records = consumer.poll(100);
>                receivedRecordNumber.addAndGet(records.count());
>                Iterator<ConsumerRecord<String, String>> iterator =
>records.iterator();
>                while (iterator.hasNext()) {
>                    ConsumerRecord<String, String> record = iterator.next();
>                    log.info("Receive [key:{}][value:{}]", record.key(), record.value());
>                }
>            } catch (TimeoutException e) {
>                log.info("no data");
>            }
>        }
>        consumer.close();
>    }
>
>
>    public void stopConsumer() {
>        this.stop = true;
>    }
>}
Mime
View raw message