blob: ac2c1aea7835b196626fc30b2cd2f0863433ec62 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package unit.kafka.admin
import kafka.admin.ReassignPartitionsCommand
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils.TestUtils._
import kafka.utils.ZkUtils._
import kafka.utils.{CoreUtils, Logging}
import kafka.zk.ZooKeeperTestHarness
import org.junit.{After, Before, Test}
import org.junit.Assert.assertEquals
import scala.collection.Seq
class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
val partitionId = 0
var servers: Seq[KafkaServer] = null
val topicName = "my-topic"
@Before
override def setUp() {
super.setUp()
}
def startBrokers(brokerIds: Seq[Int]) {
servers = brokerIds.map(i => createBrokerConfig(i, zkConnect))
.map(c => createServer(KafkaConfig.fromProps(c)))
}
@After
override def tearDown() {
servers.foreach(_.shutdown())
servers.foreach(server => CoreUtils.delete(server.config.logDirs))
super.tearDown()
}
@Test
def shouldMoveSinglePartition {
//Given a single replica on server 100
startBrokers(Seq(100, 101))
val partition = 0
createTopic(zkUtils, topicName, Map(partition -> Seq(100)), servers = servers)
//When we move the replica on 100 to broker 101
ReassignPartitionsCommand.executeAssignment(zkUtils, s"""{"version":1,"partitions":[{"topic":"$topicName","partition":0,"replicas":[101]}]}""")
waitForReasignmentToComplete()
//Then the replica should be on 101
assertEquals(zkUtils.getPartitionAssignmentForTopics(Seq(topicName)).get(topicName).get(partition), Seq(101))
}
@Test
def shouldExpandCluster() {
//Given partitions on 2 of 3 brokers
val brokers = Array(100, 101, 102)
startBrokers(brokers)
createTopic(zkUtils, topicName, Map(
0 -> Seq(100, 101),
1 -> Seq(100, 101),
2 -> Seq(100, 101)
), servers = servers)
//When rebalancing
val newAssignment = ReassignPartitionsCommand.generateAssignment(zkUtils, brokers, json(topicName), true)._1
ReassignPartitionsCommand.executeAssignment(zkUtils, zkUtils.formatAsReassignmentJson(newAssignment))
waitForReasignmentToComplete()
//Then the replicas should span all three brokers
val actual = zkUtils.getPartitionAssignmentForTopics(Seq(topicName))(topicName)
assertEquals(actual.values.flatten.toSeq.distinct.sorted, Seq(100, 101, 102))
}
@Test
def shouldShrinkCluster() {
//Given partitions on 3 of 3 brokers
val brokers = Array(100, 101, 102)
startBrokers(brokers)
createTopic(zkUtils, topicName, Map(
0 -> Seq(100, 101),
1 -> Seq(101, 102),
2 -> Seq(102, 100)
), servers = servers)
//When rebalancing
val newAssignment = ReassignPartitionsCommand.generateAssignment(zkUtils, Array(100, 101), json(topicName), true)._1
ReassignPartitionsCommand.executeAssignment(zkUtils, zkUtils.formatAsReassignmentJson(newAssignment))
waitForReasignmentToComplete()
//Then replicas should only span the first two brokers
val actual = zkUtils.getPartitionAssignmentForTopics(Seq(topicName))(topicName)
assertEquals(actual.values.flatten.toSeq.distinct.sorted, Seq(100, 101))
}
def waitForReasignmentToComplete() {
waitUntilTrue(() => !zkUtils.pathExists(ReassignPartitionsPath), s"Znode $zkUtils.ReassignPartitionsPath wasn't deleted")
}
def json(topic: String): String = {
s"""{"topics": [{"topic": "$topic"}],"version":1}"""
}
}