blob: facc7458333dccacc6a1a9ea7483cfdf635c46cc [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin
import scala.collection.{Map, Seq, mutable}
import org.junit.Assert._
trait RackAwareTest {
def checkReplicaDistribution(assignment: Map[Int, Seq[Int]],
brokerRackMapping: Map[Int, String],
numBrokers: Int,
numPartitions: Int,
replicationFactor: Int,
verifyRackAware: Boolean = true,
verifyLeaderDistribution: Boolean = true,
verifyReplicasDistribution: Boolean = true) {
// always verify that no broker will be assigned for more than one replica
for ((_, brokerList) <- assignment) {
assertEquals("More than one replica is assigned to same broker for the same partition", brokerList.toSet.size, brokerList.size)
}
val distribution = getReplicaDistribution(assignment, brokerRackMapping)
if (verifyRackAware) {
val partitionRackMap = distribution.partitionRacks
assertEquals("More than one replica of the same partition is assigned to the same rack",
List.fill(numPartitions)(replicationFactor), partitionRackMap.values.toList.map(_.distinct.size))
}
if (verifyLeaderDistribution) {
val leaderCount = distribution.brokerLeaderCount
val leaderCountPerBroker = numPartitions / numBrokers
assertEquals("Preferred leader count is not even for brokers", List.fill(numBrokers)(leaderCountPerBroker), leaderCount.values.toList)
}
if (verifyReplicasDistribution) {
val replicasCount = distribution.brokerReplicasCount
val numReplicasPerBroker = numPartitions * replicationFactor / numBrokers
assertEquals("Replica count is not even for broker", List.fill(numBrokers)(numReplicasPerBroker), replicasCount.values.toList)
}
}
def getReplicaDistribution(assignment: Map[Int, Seq[Int]], brokerRackMapping: Map[Int, String]): ReplicaDistributions = {
val leaderCount = mutable.Map[Int, Int]()
val partitionCount = mutable.Map[Int, Int]()
val partitionRackMap = mutable.Map[Int, List[String]]()
assignment.foreach { case (partitionId, replicaList) =>
val leader = replicaList.head
leaderCount(leader) = leaderCount.getOrElse(leader, 0) + 1
for (brokerId <- replicaList) {
partitionCount(brokerId) = partitionCount.getOrElse(brokerId, 0) + 1
val rack = brokerRackMapping.getOrElse(brokerId, sys.error(s"No mapping found for $brokerId in `brokerRackMapping`"))
partitionRackMap(partitionId) = rack :: partitionRackMap.getOrElse(partitionId, List())
}
}
ReplicaDistributions(partitionRackMap, leaderCount, partitionCount)
}
def toBrokerMetadata(rackMap: Map[Int, String], brokersWithoutRack: Seq[Int] = Seq.empty): Seq[BrokerMetadata] =
rackMap.toSeq.map { case (brokerId, rack) =>
BrokerMetadata(brokerId, Some(rack))
} ++ brokersWithoutRack.map { brokerId =>
BrokerMetadata(brokerId, None)
}.sortBy(_.id)
}
case class ReplicaDistributions(partitionRacks: Map[Int, Seq[String]], brokerLeaderCount: Map[Int, Int], brokerReplicasCount: Map[Int, Int])