blob: 5221855603a3a35561611043e583961d497d545d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.integration
import org.junit.{Test, After, Before}
import kafka.zk.ZooKeeperTestHarness
import kafka.utils.TestUtils._
import org.junit.Assert._
import kafka.utils.{CoreUtils, TestUtils}
import kafka.server.{KafkaConfig, KafkaServer}
class RollingBounceTest extends ZooKeeperTestHarness {
val partitionId = 0
var servers: Seq[KafkaServer] = null
@Before
override def setUp() {
super.setUp()
// controlled.shutdown.enable is true by default
val configs = (0 until 4).map(i => TestUtils.createBrokerConfig(i, zkConnect))
configs(3).put("controlled.shutdown.retry.backoff.ms", "100")
// start all the servers
servers = configs.map(c => TestUtils.createServer(KafkaConfig.fromProps(c)))
}
@After
override def tearDown() {
servers.foreach(_.shutdown())
servers.foreach(server => CoreUtils.delete(server.config.logDirs))
super.tearDown()
}
@Test
def testRollingBounce {
// start all the brokers
val topic1 = "new-topic1"
val topic2 = "new-topic2"
val topic3 = "new-topic3"
val topic4 = "new-topic4"
// create topics with 1 partition, 2 replicas, one on each broker
createTopic(zkUtils, topic1, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers)
createTopic(zkUtils, topic2, partitionReplicaAssignment = Map(0->Seq(1,2)), servers = servers)
createTopic(zkUtils, topic3, partitionReplicaAssignment = Map(0->Seq(2,3)), servers = servers)
createTopic(zkUtils, topic4, partitionReplicaAssignment = Map(0->Seq(0,3)), servers = servers)
// Do a rolling bounce and check if leader transitions happen correctly
// Bring down the leader for the first topic
bounceServer(topic1, 0)
// Bring down the leader for the second topic
bounceServer(topic2, 1)
// Bring down the leader for the third topic
bounceServer(topic3, 2)
// Bring down the leader for the fourth topic
bounceServer(topic4, 3)
}
private def bounceServer(topic: String, startIndex: Int) {
var prevLeader = 0
if (isLeaderLocalOnBroker(topic, partitionId, servers(startIndex))) {
servers(startIndex).shutdown()
prevLeader = startIndex
}
else {
servers((startIndex + 1) % 4).shutdown()
prevLeader = (startIndex + 1) % 4
}
var newleader = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId)
// Ensure the new leader is different from the old
assertTrue("Leader transition did not happen for " + topic, newleader.getOrElse(-1) != -1 && (newleader.getOrElse(-1) != prevLeader))
// Start the server back up again
servers(prevLeader).startup()
}
}