blob: 2ca64f2fc1143cf15354d7b7d997c8b2fa444f72 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.integration
import java.io.File
import java.util.Arrays
import kafka.common.KafkaException
import kafka.server._
import kafka.utils.{CoreUtils, TestUtils}
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.protocol.SecurityProtocol
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.junit.{After, Before}
import scala.collection.mutable.Buffer
/**
* A test harness that brings up some number of broker nodes
*/
trait KafkaServerTestHarness extends ZooKeeperTestHarness {
var instanceConfigs: Seq[KafkaConfig] = null
var servers: Buffer[KafkaServer] = null
var brokerList: String = null
var alive: Array[Boolean] = null
val kafkaPrincipalType = KafkaPrincipal.USER_TYPE
val setClusterAcl: Option[() => Unit] = None
/**
* Implementations must override this method to return a set of KafkaConfigs. This method will be invoked for every
* test and should not reuse previous configurations unless they select their ports randomly when servers are started.
*/
def generateConfigs(): Seq[KafkaConfig]
def configs: Seq[KafkaConfig] = {
if (instanceConfigs == null)
instanceConfigs = generateConfigs()
instanceConfigs
}
def serverForId(id: Int) = servers.find(s => s.config.brokerId == id)
protected def securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT
protected def trustStoreFile: Option[File] = None
@Before
override def setUp() {
super.setUp
if (configs.size <= 0)
throw new KafkaException("Must supply at least one server config.")
servers = configs.map(TestUtils.createServer(_)).toBuffer
brokerList = TestUtils.getBrokerListStrFromServers(servers, securityProtocol)
alive = new Array[Boolean](servers.length)
Arrays.fill(alive, true)
// We need to set a cluster ACL in some cases here
// because of the topic creation in the setup of
// IntegrationTestHarness. If we don't, then tests
// fail with a cluster action authorization exception
// when processing an update metadata request
// (controller -> broker).
//
// The following method does nothing by default, but
// if the test case requires setting up a cluster ACL,
// then it needs to be implemented.
setClusterAcl.foreach(_.apply)
}
@After
override def tearDown() {
if (servers != null) {
servers.foreach(_.shutdown())
servers.foreach(server => CoreUtils.delete(server.config.logDirs))
}
super.tearDown
}
/**
* Pick a broker at random and kill it if it isn't already dead
* Return the id of the broker killed
*/
def killRandomBroker(): Int = {
val index = TestUtils.random.nextInt(servers.length)
if(alive(index)) {
servers(index).shutdown()
servers(index).awaitShutdown()
alive(index) = false
}
index
}
/**
* Restart any dead brokers
*/
def restartDeadBrokers() {
for(i <- 0 until servers.length if !alive(i)) {
servers(i).startup()
alive(i) = true
}
}
}