blob: ae07a9fc550340286d8fd3c891984d96280f8102 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.test.harness
import java.io.File
import java.util
import java.util.Properties
import kafka.common.KafkaException
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils.{CoreUtils, TestUtils}
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.utils.Time
import org.junit.{After, Before}
import scala.collection.mutable
/**
* Kafka server integration test harness.
* This is simply a copy of open source Kafka code, we do this because java does not support trait, we are making it
* abstract class so user java test class can extend it.
*/
abstract class AbstractKafkaServerTestHarness extends AbstractZookeeperTestHarness {
var instanceConfigs: Seq[KafkaConfig] = null
var servers: mutable.Buffer[KafkaServer] = null
var brokerList: String = null
var kafkaZkClient: KafkaZkClient = null
var adminZkClient: AdminZkClient = null
var alive: Array[Boolean] = null
val kafkaPrincipalType = KafkaPrincipal.USER_TYPE
val setClusterAcl: Option[() => Unit] = None
/**
* Implementations must override this method to return a set of KafkaConfigs. This method will be invoked for every
* test and should not reuse previous configurations unless they select their ports randomly when servers are started.
*/
def generateConfigs(): Seq[KafkaConfig] = {
TestUtils.createBrokerConfigs(clusterSize, zkConnect, false).map(KafkaConfig.fromProps(_, overridingProps))
}
/**
* User can override this method to return the number of brokers they want.
* By default only one broker will be launched.
*
* @return the number of brokers needed in the Kafka cluster for the test.
*/
def clusterSize = 1
/**
* User can override this method to apply customized configurations to the brokers.
* By default the only configuration is number of partitions when topics get automatically created. The default value
* is 1.
*
* @return The configurations to be used by brokers.
*/
def overridingProps: Properties = {
val props = new Properties
props.setProperty(KafkaConfig.NumPartitionsProp, 1.toString)
props.setProperty(KafkaConfig.DeleteTopicEnableProp, "true")
props
}
def configs: Seq[KafkaConfig] = {
if (instanceConfigs == null)
instanceConfigs = generateConfigs()
instanceConfigs
}
def serverForId(id: Int) = servers.find(s => s.config.brokerId == id)
def bootstrapUrl: String = brokerList
protected def securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT
protected def trustStoreFile: Option[File] = None
protected def saslProperties: Option[Properties] = None
@Before
override def setUp() {
super.setUp()
if (configs.size <= 0)
throw new KafkaException("Must supply at least one server config.")
servers = configs.map {
config => try {
TestUtils.createServer(config)
} catch {
case e: Exception =>
println("Exception in setup")
println(e)
TestUtils.fail(e.getMessage)
}
}.toBuffer
brokerList = TestUtils.getBrokerListStrFromServers(servers, securityProtocol)
alive = new Array[Boolean](servers.length)
util.Arrays.fill(alive, true)
// We need to set a cluster ACL in some cases here
// because of the topic creation in the setup of
// IntegrationTestHarness. If we don't, then tests
// fail with a cluster action authorization exception
// when processing an update metadata request
// (controller -> broker).
//
// The following method does nothing by default, but
// if the test case requires setting up a cluster ACL,
// then it needs to be implemented.
setClusterAcl.foreach(_.apply())
kafkaZkClient = KafkaZkClient(zkConnect, JaasUtils.isZkSecurityEnabled, zkSessionTimeout, zkConnectionTimeout, 100, Time.SYSTEM)
adminZkClient = new AdminZkClient(kafkaZkClient)
}
@After
override def tearDown() {
if (servers != null) {
servers.foreach(_.shutdown())
servers.foreach(server => CoreUtils.delete(server.config.logDirs))
}
if (kafkaZkClient != null)
CoreUtils.swallow(kafkaZkClient.close(), null)
super.tearDown()
}
/**
* Pick a broker at random and kill it if it isn't already dead
* Return the id of the broker killed
*/
def killRandomBroker(): Int = {
val index = TestUtils.random.nextInt(servers.length)
if (alive(index)) {
servers(index).shutdown()
servers(index).awaitShutdown()
alive(index) = false
}
index
}
/**
* Restart any dead brokers
*/
def restartDeadBrokers() {
for (i <- 0 until servers.length if !alive(i)) {
servers(i).startup()
alive(i) = true
}
}
}