blob: 017faeabaa617df3477f468fbe9d3af684809ac1 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package kafka.server
import java.util
import util.Arrays.asList
import kafka.common.BrokerEndPointNotAvailableException
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
import org.apache.kafka.common.requests.UpdateMetadataRequest
import org.apache.kafka.common.requests.UpdateMetadataRequest.{Broker, EndPoint, PartitionState}
import org.junit.Test
import org.junit.Assert._
import scala.collection.JavaConverters._
class MetadataCacheTest {
private def asSet[T](elems: T*): util.Set[T] = new util.HashSet(elems.asJava)
def getTopicMetadataNonExistingTopics() {
val topic = "topic"
val cache = new MetadataCache(1)
val topicMetadata = cache.getTopicMetadata(Set(topic), SecurityProtocol.PLAINTEXT)
def getTopicMetadata() {
val topic = "topic"
val cache = new MetadataCache(1)
val zkVersion = 3
val controllerId = 2
val controllerEpoch = 1
def securityProtocolToEndPoint(brokerId: Int): Map[SecurityProtocol, EndPoint] = {
val host = s"foo-$brokerId"
SecurityProtocol.PLAINTEXT -> new EndPoint(host, 9092),
SecurityProtocol.SSL -> new EndPoint(host, 9093)
val brokers = (0 to 2).map { brokerId =>
new Broker(brokerId, securityProtocolToEndPoint(brokerId).asJava, "rack1")
val partitionStates = Map(
new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, 0, 0, asList(0), zkVersion, asSet(0)),
new TopicPartition(topic, 1) -> new PartitionState(controllerEpoch, 1, 1, asList(1), zkVersion, asSet(1)),
new TopicPartition(topic, 2) -> new PartitionState(controllerEpoch, 2, 2, asList(2), zkVersion, asSet(2)))
val updateMetadataRequest = new UpdateMetadataRequest(controllerId, controllerEpoch, partitionStates.asJava, brokers.asJava)
cache.updateCache(15, updateMetadataRequest)
for (securityProtocol <- Seq(SecurityProtocol.PLAINTEXT, SecurityProtocol.SSL)) {
val topicMetadatas = cache.getTopicMetadata(Set(topic), securityProtocol)
assertEquals(1, topicMetadatas.size)
val topicMetadata = topicMetadatas.head
assertEquals(Errors.NONE, topicMetadata.error)
assertEquals(topic, topicMetadata.topic)
val partitionMetadatas = topicMetadata.partitionMetadata.asScala.sortBy(_.partition)
assertEquals(3, partitionMetadatas.size)
for (i <- 0 to 2) {
val partitionMetadata = partitionMetadatas(i)
assertEquals(Errors.NONE, partitionMetadata.error)
assertEquals(i, partitionMetadata.partition)
val leader = partitionMetadata.leader
val endPoint = securityProtocolToEndPoint(
assertEquals(endPoint.port, leader.port)
def getTopicMetadataPartitionLeaderNotAvailable() {
val topic = "topic"
val cache = new MetadataCache(1)
val zkVersion = 3
val controllerId = 2
val controllerEpoch = 1
val brokers = Set(new Broker(0, Map(SecurityProtocol.PLAINTEXT -> new EndPoint("foo", 9092)).asJava, null))
val leader = 1
val leaderEpoch = 1
val partitionStates = Map(
new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, leader, leaderEpoch, asList(0), zkVersion, asSet(0)))
val updateMetadataRequest = new UpdateMetadataRequest(controllerId, controllerEpoch, partitionStates.asJava, brokers.asJava)
cache.updateCache(15, updateMetadataRequest)
val topicMetadatas = cache.getTopicMetadata(Set(topic), SecurityProtocol.PLAINTEXT)
assertEquals(1, topicMetadatas.size)
val topicMetadata = topicMetadatas.head
assertEquals(Errors.NONE, topicMetadata.error)
val partitionMetadatas = topicMetadata.partitionMetadata
assertEquals(1, partitionMetadatas.size)
val partitionMetadata = partitionMetadatas.get(0)
assertEquals(0, partitionMetadata.partition)
assertEquals(Errors.LEADER_NOT_AVAILABLE, partitionMetadata.error)
assertEquals(1, partitionMetadata.replicas.size)
assertEquals(0, partitionMetadata.replicas.get(0).id)
def getTopicMetadataReplicaNotAvailable() {
val topic = "topic"
val cache = new MetadataCache(1)
val zkVersion = 3
val controllerId = 2
val controllerEpoch = 1
val brokers = Set(new Broker(0, Map(SecurityProtocol.PLAINTEXT -> new EndPoint("foo", 9092)).asJava, null))
// replica 1 is not available
val leader = 0
val leaderEpoch = 0
val replicas = asSet[Integer](0, 1)
val isr = asList[Integer](0)
val partitionStates = Map(
new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, leader, leaderEpoch, isr, zkVersion, replicas))
val updateMetadataRequest = new UpdateMetadataRequest(controllerId, controllerEpoch, partitionStates.asJava, brokers.asJava)
cache.updateCache(15, updateMetadataRequest)
val topicMetadatas = cache.getTopicMetadata(Set(topic), SecurityProtocol.PLAINTEXT)
assertEquals(1, topicMetadatas.size)
val topicMetadata = topicMetadatas.head
assertEquals(Errors.NONE, topicMetadata.error)
val partitionMetadatas = topicMetadata.partitionMetadata
assertEquals(1, partitionMetadatas.size)
val partitionMetadata = partitionMetadatas.get(0)
assertEquals(0, partitionMetadata.partition)
assertEquals(Errors.REPLICA_NOT_AVAILABLE, partitionMetadata.error)
def getTopicMetadataIsrNotAvailable() {
val topic = "topic"
val cache = new MetadataCache(1)
val zkVersion = 3
val controllerId = 2
val controllerEpoch = 1
val brokers = Set(new Broker(0, Map(SecurityProtocol.PLAINTEXT -> new EndPoint("foo", 9092)).asJava, "rack1"))
// replica 1 is not available
val leader = 0
val leaderEpoch = 0
val replicas = asSet[Integer](0)
val isr = asList[Integer](0, 1)
val partitionStates = Map(
new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, leader, leaderEpoch, isr, zkVersion, replicas))
val updateMetadataRequest = new UpdateMetadataRequest(controllerId, controllerEpoch, partitionStates.asJava, brokers.asJava)
cache.updateCache(15, updateMetadataRequest)
val topicMetadatas = cache.getTopicMetadata(Set(topic), SecurityProtocol.PLAINTEXT)
assertEquals(1, topicMetadatas.size)
val topicMetadata = topicMetadatas.head
assertEquals(Errors.NONE, topicMetadata.error)
val partitionMetadatas = topicMetadata.partitionMetadata
assertEquals(1, partitionMetadatas.size)
val partitionMetadata = partitionMetadatas.get(0)
assertEquals(0, partitionMetadata.partition)
assertEquals(Errors.REPLICA_NOT_AVAILABLE, partitionMetadata.error)
def getTopicMetadataWithNonSupportedSecurityProtocol() {
val topic = "topic"
val cache = new MetadataCache(1)
val brokers = Set(new Broker(0, Map(SecurityProtocol.PLAINTEXT -> new EndPoint("foo", 9092)).asJava, ""))
val controllerEpoch = 1
val leader = 0
val leaderEpoch = 0
val replicas = asSet[Integer](0)
val isr = asList[Integer](0, 1)
val partitionStates = Map(
new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, leader, leaderEpoch, isr, 3, replicas))
val updateMetadataRequest = new UpdateMetadataRequest(2, controllerEpoch, partitionStates.asJava, brokers.asJava)
cache.updateCache(15, updateMetadataRequest)
try {
val result = cache.getTopicMetadata(Set(topic), SecurityProtocol.SSL)
fail(s"Exception should be thrown by `getTopicMetadata` with non-supported SecurityProtocol, $result was returned instead")
catch {
case e: BrokerEndPointNotAvailableException => //expected
def getAliveBrokersShouldNotBeMutatedByUpdateCache() {
val topic = "topic"
val cache = new MetadataCache(1)
def updateCache(brokerIds: Set[Int]) {
val brokers = { brokerId =>
new Broker(brokerId, Map(SecurityProtocol.PLAINTEXT -> new EndPoint("foo", 9092)).asJava, "")
val controllerEpoch = 1
val leader = 0
val leaderEpoch = 0
val replicas = asSet[Integer](0)
val isr = asList[Integer](0, 1)
val partitionStates = Map(
new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, leader, leaderEpoch, isr, 3, replicas))
val updateMetadataRequest = new UpdateMetadataRequest(2, controllerEpoch, partitionStates.asJava, brokers.asJava)
cache.updateCache(15, updateMetadataRequest)
val initialBrokerIds = (0 to 2).toSet
val aliveBrokersFromCache = cache.getAliveBrokers
// This should not change `aliveBrokersFromCache`
updateCache((0 to 3).toSet)