blob: 3d4b40c6a92510765b8906298b013e8af73548ee [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import java.util.Properties
import kafka.utils.TestUtils
import org.apache.kafka.common.internals.TopicConstants
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
import org.junit.Assert._
import org.junit.Test
import scala.collection.JavaConverters._
class MetadataRequestTest extends BaseRequestTest {
override def propertyOverrides(properties: Properties) {
properties.setProperty(KafkaConfig.RackProp, s"rack/${properties.getProperty(KafkaConfig.BrokerIdProp)}")
}
@Test
def testControllerId() {
val controllerServer = servers.find(_.kafkaController.isActive()).get
val controllerId = controllerServer.config.brokerId
val metadataResponse = sendMetadataRequest(MetadataRequest.allTopics(), 1)
assertEquals("Controller id should match the active controller",
controllerId, metadataResponse.controller.id)
// Fail over the controller
controllerServer.shutdown()
controllerServer.startup()
val controllerServer2 = servers.find(_.kafkaController.isActive()).get
val controllerId2 = controllerServer2.config.brokerId
assertNotEquals("Controller id should switch to a new broker", controllerId, controllerId2)
TestUtils.waitUntilTrue(() => {
val metadataResponse2 = sendMetadataRequest(MetadataRequest.allTopics(), 1)
controllerServer2.apis.brokerId == metadataResponse2.controller.id
}, "Controller id should match the active controller after failover", 5000)
}
@Test
def testRack() {
val metadataResponse = sendMetadataRequest(MetadataRequest.allTopics(), 1)
// Validate rack matches what's set in generateConfigs() above
metadataResponse.brokers.asScala.foreach { broker =>
assertEquals("Rack information should match config", s"rack/${broker.id}", broker.rack)
}
}
@Test
def testIsInternal() {
val internalTopic = TopicConstants.GROUP_METADATA_TOPIC_NAME
val notInternalTopic = "notInternal"
// create the topics
TestUtils.createTopic(zkUtils, internalTopic, 3, 2, servers)
TestUtils.createTopic(zkUtils, notInternalTopic, 3, 2, servers)
val metadataResponse = sendMetadataRequest(MetadataRequest.allTopics(), 1)
assertTrue("Response should have no errors", metadataResponse.errors.isEmpty)
val topicMetadata = metadataResponse.topicMetadata.asScala
val internalTopicMetadata = topicMetadata.find(_.topic == internalTopic).get
val notInternalTopicMetadata = topicMetadata.find(_.topic == notInternalTopic).get
assertTrue("internalTopic should show isInternal", internalTopicMetadata.isInternal)
assertFalse("notInternalTopic topic not should show isInternal", notInternalTopicMetadata.isInternal)
}
@Test
def testNoTopicsRequest() {
// create some topics
TestUtils.createTopic(zkUtils, "t1", 3, 2, servers)
TestUtils.createTopic(zkUtils, "t2", 3, 2, servers)
// v0, Doesn't support a "no topics" request
// v1, Empty list represents "no topics"
val metadataResponse = sendMetadataRequest(new MetadataRequest(List[String]().asJava), 1)
assertTrue("Response should have no errors", metadataResponse.errors.isEmpty)
assertTrue("Response should have no topics", metadataResponse.topicMetadata.isEmpty)
}
@Test
def testAllTopicsRequest() {
// create some topics
TestUtils.createTopic(zkUtils, "t1", 3, 2, servers)
TestUtils.createTopic(zkUtils, "t2", 3, 2, servers)
// v0, Empty list represents all topics
val metadataResponseV0 = sendMetadataRequest(new MetadataRequest(List[String]().asJava), 0)
assertTrue("V0 Response should have no errors", metadataResponseV0.errors.isEmpty)
assertEquals("V0 Response should have 2 (all) topics", 2, metadataResponseV0.topicMetadata.size())
// v1, Null represents all topics
val metadataResponseV1 = sendMetadataRequest(MetadataRequest.allTopics(), 1)
assertTrue("V1 Response should have no errors", metadataResponseV1.errors.isEmpty)
assertEquals("V1 Response should have 2 (all) topics", 2, metadataResponseV1.topicMetadata.size())
}
@Test
def testReplicaDownResponse() {
val replicaDownTopic = "replicaDown"
val replicaCount = 3
// create a topic with 3 replicas
TestUtils.createTopic(zkUtils, replicaDownTopic, 1, replicaCount, servers)
// Kill a replica node that is not the leader
val metadataResponse = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava), 1)
val partitionMetadata = metadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head
val downNode = servers.find { server =>
val serverId = server.apis.brokerId
val leaderId = partitionMetadata.leader.id
val replicaIds = partitionMetadata.replicas.asScala.map(_.id)
serverId != leaderId && replicaIds.contains(serverId)
}.get
downNode.shutdown()
TestUtils.waitUntilTrue(() => {
val response = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava), 1)
val metadata = response.topicMetadata.asScala.head.partitionMetadata.asScala.head
val replica = metadata.replicas.asScala.find(_.id == downNode.apis.brokerId).get
replica.host == "" & replica.port == -1
}, "Replica was not found down", 5000)
// Validate version 0 still filters unavailable replicas and contains error
val v0MetadataResponse = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava), 0)
val v0BrokerIds = v0MetadataResponse.brokers().asScala.map(_.id).toSeq
assertTrue("Response should have no errors", v0MetadataResponse.errors.isEmpty)
assertFalse(s"The downed broker should not be in the brokers list", v0BrokerIds.contains(downNode))
assertTrue("Response should have one topic", v0MetadataResponse.topicMetadata.size == 1)
val v0PartitionMetadata = v0MetadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head
assertTrue("PartitionMetadata should have an error", v0PartitionMetadata.error == Errors.REPLICA_NOT_AVAILABLE)
assertTrue(s"Response should have ${replicaCount - 1} replicas", v0PartitionMetadata.replicas.size == replicaCount - 1)
// Validate version 1 returns unavailable replicas with no error
val v1MetadataResponse = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava), 1)
val v1BrokerIds = v1MetadataResponse.brokers().asScala.map(_.id).toSeq
assertTrue("Response should have no errors", v1MetadataResponse.errors.isEmpty)
assertFalse(s"The downed broker should not be in the brokers list", v1BrokerIds.contains(downNode))
assertEquals("Response should have one topic", 1, v1MetadataResponse.topicMetadata.size)
val v1PartitionMetadata = v1MetadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head
assertEquals("PartitionMetadata should have no errors", Errors.NONE, v1PartitionMetadata.error)
assertEquals(s"Response should have $replicaCount replicas", replicaCount, v1PartitionMetadata.replicas.size)
}
private def sendMetadataRequest(request: MetadataRequest, version: Short): MetadataResponse = {
val response = send(request, ApiKeys.METADATA, version)
MetadataResponse.parse(response, version)
}
}