blob: 72589800e111c91ce61cd3ba774c960fedcd390d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.requests.LeaderAndIsrRequest.PartitionState
import scala.collection.JavaConverters._
import kafka.api.LeaderAndIsr
import org.apache.kafka.common.requests.{AbstractRequestResponse, LeaderAndIsrRequest, LeaderAndIsrResponse}
import org.junit.Assert._
import kafka.utils.{CoreUtils, TestUtils}
import kafka.cluster.Broker
import kafka.controller.{ControllerChannelManager, ControllerContext}
import kafka.utils.TestUtils._
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
import org.apache.kafka.common.utils.SystemTime
import org.junit.{After, Before, Test}
class LeaderElectionTest extends ZooKeeperTestHarness {
val brokerId1 = 0
val brokerId2 = 1
var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
var staleControllerEpochDetected = false
@Before
override def setUp() {
super.setUp()
val configProps1 = TestUtils.createBrokerConfig(brokerId1, zkConnect, enableControlledShutdown = false)
val configProps2 = TestUtils.createBrokerConfig(brokerId2, zkConnect, enableControlledShutdown = false)
// start both servers
val server1 = TestUtils.createServer(KafkaConfig.fromProps(configProps1))
val server2 = TestUtils.createServer(KafkaConfig.fromProps(configProps2))
servers ++= List(server1, server2)
}
@After
override def tearDown() {
servers.foreach(_.shutdown())
servers.foreach(server => CoreUtils.delete(server.config.logDirs))
super.tearDown()
}
@Test
def testLeaderElectionAndEpoch {
// start 2 brokers
val topic = "new-topic"
val partitionId = 0
// create topic with 1 partition, 2 replicas, one on each broker
val leader1 = createTopic(zkUtils, topic, partitionReplicaAssignment = Map(0 -> Seq(0, 1)), servers = servers)(0)
val leaderEpoch1 = zkUtils.getEpochForPartition(topic, partitionId)
debug("leader Epoc: " + leaderEpoch1)
debug("Leader is elected to be: %s".format(leader1.getOrElse(-1)))
assertTrue("Leader should get elected", leader1.isDefined)
// NOTE: this is to avoid transient test failures
assertTrue("Leader could be broker 0 or broker 1", (leader1.getOrElse(-1) == 0) || (leader1.getOrElse(-1) == 1))
assertEquals("First epoch value should be 0", 0, leaderEpoch1)
// kill the server hosting the preferred replica
servers.last.shutdown()
// check if leader moves to the other server
val leader2 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId,
oldLeaderOpt = if(leader1.get == 0) None else leader1)
val leaderEpoch2 = zkUtils.getEpochForPartition(topic, partitionId)
debug("Leader is elected to be: %s".format(leader1.getOrElse(-1)))
debug("leader Epoc: " + leaderEpoch2)
assertEquals("Leader must move to broker 0", 0, leader2.getOrElse(-1))
if(leader1.get == leader2.get)
assertEquals("Second epoch value should be " + leaderEpoch1+1, leaderEpoch1+1, leaderEpoch2)
else
assertEquals("Second epoch value should be %d".format(leaderEpoch1+1) , leaderEpoch1+1, leaderEpoch2)
servers.last.startup()
servers.head.shutdown()
Thread.sleep(zookeeper.tickTime)
val leader3 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId,
oldLeaderOpt = if(leader2.get == 1) None else leader2)
val leaderEpoch3 = zkUtils.getEpochForPartition(topic, partitionId)
debug("leader Epoc: " + leaderEpoch3)
debug("Leader is elected to be: %s".format(leader3.getOrElse(-1)))
assertEquals("Leader must return to 1", 1, leader3.getOrElse(-1))
if(leader2.get == leader3.get)
assertEquals("Second epoch value should be " + leaderEpoch2, leaderEpoch2, leaderEpoch3)
else
assertEquals("Second epoch value should be %d".format(leaderEpoch2+1) , leaderEpoch2+1, leaderEpoch3)
}
@Test
def testLeaderElectionWithStaleControllerEpoch() {
// start 2 brokers
val topic = "new-topic"
val partitionId = 0
// create topic with 1 partition, 2 replicas, one on each broker
val leader1 = createTopic(zkUtils, topic, partitionReplicaAssignment = Map(0 -> Seq(0, 1)), servers = servers)(0)
val leaderEpoch1 = zkUtils.getEpochForPartition(topic, partitionId)
debug("leader Epoc: " + leaderEpoch1)
debug("Leader is elected to be: %s".format(leader1.getOrElse(-1)))
assertTrue("Leader should get elected", leader1.isDefined)
// NOTE: this is to avoid transient test failures
assertTrue("Leader could be broker 0 or broker 1", (leader1.getOrElse(-1) == 0) || (leader1.getOrElse(-1) == 1))
assertEquals("First epoch value should be 0", 0, leaderEpoch1)
// start another controller
val controllerId = 2
val controllerConfig = KafkaConfig.fromProps(TestUtils.createBrokerConfig(controllerId, zkConnect))
val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", s.boundPort()))
val nodes = brokers.map(_.getNode(SecurityProtocol.PLAINTEXT))
val controllerContext = new ControllerContext(zkUtils, 6000)
controllerContext.liveBrokers = brokers.toSet
val metrics = new Metrics
val controllerChannelManager = new ControllerChannelManager(controllerContext, controllerConfig, new SystemTime, metrics)
controllerChannelManager.startup()
try {
val staleControllerEpoch = 0
val partitionStates = Map(
new TopicPartition(topic, partitionId) -> new PartitionState(2, brokerId2, LeaderAndIsr.initialLeaderEpoch,
Seq(brokerId1, brokerId2).map(Integer.valueOf).asJava, LeaderAndIsr.initialZKVersion,
Set(0, 1).map(Integer.valueOf).asJava)
)
val leaderAndIsrRequest = new LeaderAndIsrRequest(controllerId, staleControllerEpoch, partitionStates.asJava,
nodes.toSet.asJava)
controllerChannelManager.sendRequest(brokerId2, ApiKeys.LEADER_AND_ISR, None, leaderAndIsrRequest,
staleControllerEpochCallback)
TestUtils.waitUntilTrue(() => staleControllerEpochDetected == true,
"Controller epoch should be stale")
assertTrue("Stale controller epoch not detected by the broker", staleControllerEpochDetected)
} finally {
controllerChannelManager.shutdown()
metrics.close()
}
}
private def staleControllerEpochCallback(response: AbstractRequestResponse): Unit = {
val leaderAndIsrResponse = response.asInstanceOf[LeaderAndIsrResponse]
staleControllerEpochDetected = Errors.forCode(leaderAndIsrResponse.errorCode) match {
case Errors.STALE_CONTROLLER_EPOCH => true
case _ => false
}
}
}