Hello,
I tried to understand the algorithm used in choosing a leader of a topic / partition at the
time of creation  rack unaware mode.
I went thru TopicCommand.scala and AdminUtils.scala, especially assignReplicasToBrokersRackUnaware()
function (pasted below for convenience) and figured that the leader for a topic is chosen
randomly and the first follower is chosen based on an initial random shift from the leader
and the other followers are chosen based off of the first follower by incrementing the first
follower id.
Can someone please confirm that my understanding is correct?
I note that for the creation a topic with N number of partitions, the assignment algorithm
will arrive at a balanced distribution of the number of partitions wrt the replicas. Whereas
for the creation of N topics with 1 partition each, the resulting replica assignment may NOT
result in a balanced distribution of the number of topics wrt the replicas.
The next part of my question is  are there plans to include an algorithm that does the leader
& follower assignment based the prevailing load of the brokers? Suppose I add a new broker
to the cluster and start creating topics, I would expect the new broker as the leader for
the newly created topics. However, in my experiments this does not turn out to be the case
 there is no bias towards the new broker as the leader.
Are there any ways to introduce a bias towards choosing the new broker as the leader for the
creation of next several topics?
I understand that one can (a) specify the replicas list for each topic during topic creation
(b) use kafkareassignpartitions.sh to do a reassignment of partitions and (c) set auto.leader.rebalance.enable=true.
Thanks,
Buvana
private def assignReplicasToBrokersRackUnaware(nPartitions: Int,
replicationFactor: Int,
brokerList: Seq[Int],
fixedStartIndex: Int,
startPartitionId: Int): Map[Int, Seq[Int]]
= {
val ret = mutable.Map[Int, Seq[Int]]()
val brokerArray = brokerList.toArray
val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
var currentPartitionId = math.max(0, startPartitionId)
var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
for (_ < 0 until nPartitions) {
if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length ==
0))
nextReplicaShift += 1
val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length
val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
for (j < 0 until replicationFactor  1)
replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j,
brokerArray.length))
ret.put(currentPartitionId, replicaBuffer)
currentPartitionId += 1
}
ret
}
