kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ramanan, Buvana (Nokia - US/Murray Hill)" <buvana.rama...@nokia-bell-labs.com>
Subject Replica selection at the time of topic creation
Date Thu, 06 Apr 2017 18:08:01 GMT

I tried to understand the algorithm used in choosing a leader of a topic / partition at the
time of creation - rack unaware mode.

I went thru TopicCommand.scala and AdminUtils.scala, especially assignReplicasToBrokersRackUnaware()
function (pasted below for convenience) and figured that the leader for a topic is chosen
randomly and the first follower is chosen based on an initial random shift from the leader
and the other followers  are chosen based off of the first follower by incrementing the first
follower id.

Can someone please confirm that my understanding is correct?

I note that for the creation a topic with N number of partitions, the assignment algorithm
will arrive at a balanced distribution of the number of partitions wrt the replicas. Whereas
for the creation of N topics with 1 partition each, the resulting replica assignment may NOT
result in a balanced distribution of the number of topics wrt the replicas.

The next part of my question is - are there plans to include an algorithm that does the leader
& follower assignment based the prevailing load of the brokers? Suppose I add a new broker
to the cluster and start creating topics, I would expect the new broker as the leader for
the newly created topics. However, in my experiments this does not turn out to be the case
- there is no bias towards the new broker as the leader.

Are there any ways to introduce a bias towards choosing the new broker as the leader for the
creation of next several topics?

I understand that one can (a) specify the replicas list for each topic during topic creation
(b) use kafka-reassign-partitions.sh to do a reassignment of partitions and (c) set auto.leader.rebalance.enable=true.


private def assignReplicasToBrokersRackUnaware(nPartitions: Int,
                                                 replicationFactor: Int,
                                                 brokerList: Seq[Int],
                                                 fixedStartIndex: Int,
                                                 startPartitionId: Int): Map[Int, Seq[Int]]
= {
    val ret = mutable.Map[Int, Seq[Int]]()
    val brokerArray = brokerList.toArray
    val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
    var currentPartitionId = math.max(0, startPartitionId)
    var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
    for (_ <- 0 until nPartitions) {
      if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length ==
        nextReplicaShift += 1
      val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length
      val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
      for (j <- 0 until replicationFactor - 1)
        replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j,
      ret.put(currentPartitionId, replicaBuffer)
      currentPartitionId += 1

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message