package kafka.server
import java.util.concurrent.locks.ReentrantReadWriteLock
import scala.collection.{Seq, Set, mutable}
import scala.collection.JavaConverters._
import kafka.cluster.{Broker, EndPoint}
import kafka.api._
import kafka.common.{BrokerEndPointNotAvailableException, Topic, TopicAndPartition}
import kafka.controller.{KafkaController, LeaderIsrAndControllerEpoch}
import kafka.utils.CoreUtils._
import kafka.utils.Logging
import org.apache.kafka.common.{Node, TopicPartition}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{MetadataResponse, PartitionState, UpdateMetadataRequest}
* A cache for the state (e.g., current leader) of each partition. This cache is updated through
* UpdateMetadataRequest from the controller. Every broker maintains the same cache, asynchronously.
private[server] class MetadataCache(brokerId: Int) extends Logging {
private val stateChangeLogger = KafkaController.stateChangeLogger
private val cache = mutable.Map[String, mutable.Map[Int, PartitionStateInfo]]()
private var controllerId: Option[Int] = None
private val aliveBrokers = mutable.Map[Int, Broker]()
private val aliveNodes = mutable.Map[Int, collection.Map[ListenerName, Node]]()
private val partitionMetadataLock = new ReentrantReadWriteLock()
this.logIdent = s"[Kafka Metadata Cache on broker $brokerId] "
// This method is the main hotspot when it comes to the performance of metadata requests,
// we should be careful about adding additional logic here.
// filterUnavailableEndpoints exists to support v0 MetadataResponses
private def getEndpoints(brokers: Iterable[Int], listenerName: ListenerName, filterUnavailableEndpoints: Boolean): Seq[Node] = {
val result = new mutable.ArrayBuffer[Node](math.min(aliveBrokers.size, brokers.size))
brokers.foreach { brokerId =>
val endpoint = getAliveEndpoint(brokerId, listenerName) match {
case None => if (!filterUnavailableEndpoints) Some(new Node(brokerId, "", -1)) else None
case Some(node) => Some(node)
endpoint.foreach(result +=)
private def getAliveEndpoint(brokerId: Int, listenerName: ListenerName): Option[Node] =
aliveNodes.get(brokerId).map { nodeMap =>
throw new BrokerEndPointNotAvailableException(s"Broker `$brokerId` does not have listener with name `$listenerName`"))
// errorUnavailableEndpoints exists to support v0 MetadataResponses
private def getPartitionMetadata(topic: String, listenerName: ListenerName, errorUnavailableEndpoints: Boolean): Option[Iterable[MetadataResponse.PartitionMetadata]] = {
cache.get(topic).map { partitions => { case (partitionId, partitionState) =>
val topicPartition = TopicAndPartition(topic, partitionId)
val leaderAndIsr = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr
val maybeLeader = getAliveEndpoint(leaderAndIsr.leader, listenerName)
val replicas = partitionState.allReplicas
val replicaInfo = getEndpoints(replicas, listenerName, errorUnavailableEndpoints)
maybeLeader match {
case None =>
debug(s"Error while fetching metadata for $topicPartition: leader not available")
new MetadataResponse.PartitionMetadata(Errors.LEADER_NOT_AVAILABLE, partitionId, Node.noNode(),
replicaInfo.asJava, java.util.Collections.emptyList())
case Some(leader) =>
val isr = leaderAndIsr.isr
val isrInfo = getEndpoints(isr, listenerName, errorUnavailableEndpoints)
if (replicaInfo.size < replicas.size) {
debug(s"Error while fetching metadata for $topicPartition: replica information not available for " +
s"following brokers ${replicas.filterNot(",")}")
new MetadataResponse.PartitionMetadata(Errors.REPLICA_NOT_AVAILABLE, partitionId, leader,
replicaInfo.asJava, isrInfo.asJava)
} else if (isrInfo.size < isr.size) {
debug(s"Error while fetching metadata for $topicPartition: in sync replica information not available for " +
s"following brokers ${isr.filterNot(",")}")
new MetadataResponse.PartitionMetadata(Errors.REPLICA_NOT_AVAILABLE, partitionId, leader,
replicaInfo.asJava, isrInfo.asJava)
} else {
new MetadataResponse.PartitionMetadata(Errors.NONE, partitionId, leader, replicaInfo.asJava,
// errorUnavailableEndpoints exists to support v0 MetadataResponses
def getTopicMetadata(topics: Set[String], listenerName: ListenerName, errorUnavailableEndpoints: Boolean = false): Seq[MetadataResponse.TopicMetadata] = {
inReadLock(partitionMetadataLock) {
topics.toSeq.flatMap { topic =>
getPartitionMetadata(topic, listenerName, errorUnavailableEndpoints).map { partitionMetadata =>
new MetadataResponse.TopicMetadata(Errors.NONE, topic, Topic.isInternal(topic), partitionMetadata.toBuffer.asJava)
def getAllTopics(): Set[String] = {
inReadLock(partitionMetadataLock) {
def getNonExistingTopics(topics: Set[String]): Set[String] = {
inReadLock(partitionMetadataLock) {
topics -- cache.keySet
def getAliveBrokers: Seq[Broker] = {
inReadLock(partitionMetadataLock) {
private def addOrUpdatePartitionInfo(topic: String,
partitionId: Int,
stateInfo: PartitionStateInfo) {
inWriteLock(partitionMetadataLock) {
val infos = cache.getOrElseUpdate(topic, mutable.Map())
infos(partitionId) = stateInfo
def getPartitionInfo(topic: String, partitionId: Int): Option[PartitionStateInfo] = {
inReadLock(partitionMetadataLock) {
def getControllerId: Option[Int] = controllerId
// This method returns the deleted TopicPartitions received from UpdateMetadataRequest
def updateCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest): Seq[TopicPartition] = {
inWriteLock(partitionMetadataLock) {
controllerId = updateMetadataRequest.controllerId match {
case id if id < 0 => None
case id => Some(id)
updateMetadataRequest.liveBrokers.asScala.foreach { broker =>
// `aliveNodes` is a hot path for metadata requests for large clusters, so we use java.util.HashMap which
// is a bit faster than scala.collection.mutable.HashMap. When we drop support for Scala 2.10, we could
// move to `AnyRefMap`, which has comparable performance.
val nodes = new java.util.HashMap[ListenerName, Node]
val endPoints = new mutable.ArrayBuffer[EndPoint]
broker.endPoints.asScala.foreach { ep =>
endPoints += EndPoint(, ep.port, ep.listenerName, ep.securityProtocol)
nodes.put(ep.listenerName, new Node(,, ep.port))
aliveBrokers( = Broker(, endPoints, Option(broker.rack))
aliveNodes( = nodes.asScala
val deletedPartitions = new mutable.ArrayBuffer[TopicPartition]
updateMetadataRequest.partitionStates.asScala.foreach { case (tp, info) =>
val controllerId = updateMetadataRequest.controllerId
val controllerEpoch = updateMetadataRequest.controllerEpoch
if (info.leader == LeaderAndIsr.LeaderDuringDelete) {
removePartitionInfo(tp.topic, tp.partition)
stateChangeLogger.trace(s"Broker $brokerId deleted partition $tp from metadata cache in response to UpdateMetadata " +
s"request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId")
deletedPartitions += tp
} else {
val partitionInfo = partitionStateToPartitionStateInfo(info)
addOrUpdatePartitionInfo(tp.topic, tp.partition, partitionInfo)
stateChangeLogger.trace(s"Broker $brokerId cached leader info $partitionInfo for partition $tp in response to " +
s"UpdateMetadata request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId")
private def partitionStateToPartitionStateInfo(partitionState: PartitionState): PartitionStateInfo = {
val leaderAndIsr = LeaderAndIsr(partitionState.leader, partitionState.leaderEpoch,, partitionState.zkVersion)
val leaderInfo = LeaderIsrAndControllerEpoch(leaderAndIsr, partitionState.controllerEpoch)
def contains(topic: String): Boolean = {
inReadLock(partitionMetadataLock) {
private def removePartitionInfo(topic: String, partitionId: Int): Boolean = {
cache.get(topic).map { infos =>
if (infos.isEmpty) cache.remove(topic)