kafka-1971; starting a broker with a conflicting id will delete the previous broker registration; patched by Jun Rao; reviewed by Neha Narkhede
diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
index 4acdd70..7907987 100644
--- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
+++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
@@ -39,17 +39,12 @@
val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + brokerId
val sessionExpireListener = new SessionExpireListener
-
+
def startup() {
zkClient.subscribeStateChanges(sessionExpireListener)
register()
}
- def shutdown() {
- zkClient.unsubscribeStateChanges(sessionExpireListener)
- ZkUtils.deregisterBrokerInZk(zkClient, brokerId)
- }
-
/**
* Register this broker as "alive" in zookeeper
*/
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 1691ad7..5cd4c84 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -268,8 +268,6 @@
if (canShutdown) {
Utils.swallow(controlledShutdown())
brokerState.newState(BrokerShuttingDown)
- if(kafkaHealthcheck != null)
- Utils.swallow(kafkaHealthcheck.shutdown())
if(socketServer != null)
Utils.swallow(socketServer.shutdown())
if(requestHandlerPool != null)
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 56e3e88..a6f4d46 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -179,12 +179,6 @@
info("Registered broker %d at path %s with address %s:%d.".format(id, brokerIdPath, host, port))
}
- def deregisterBrokerInZk(zkClient: ZkClient, id: Int) {
- val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
- deletePath(zkClient, brokerIdPath)
- info("Deregistered broker %d at path %s.".format(id, brokerIdPath))
- }
-
def getConsumerPartitionOwnerPath(group: String, topic: String, partition: Int): String = {
val topicDirs = new ZKGroupTopicDirs(group, topic)
topicDirs.consumerOwnerDir + "/" + partition
diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
index a0ed485..93af7df 100644
--- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
@@ -18,7 +18,6 @@
package kafka.server
import org.scalatest.junit.JUnit3Suite
-import kafka.zk
import kafka.utils.ZkUtils
import kafka.utils.Utils
import kafka.utils.TestUtils
@@ -27,28 +26,44 @@
import junit.framework.Assert._
class ServerStartupTest extends JUnit3Suite with ZooKeeperTestHarness {
- var server : KafkaServer = null
- val brokerId = 0
- val zookeeperChroot = "/kafka-chroot-for-unittest"
- override def setUp() {
- super.setUp()
+ def testBrokerCreatesZKChroot {
+ val brokerId = 0
+ val zookeeperChroot = "/kafka-chroot-for-unittest"
val props = TestUtils.createBrokerConfig(brokerId, TestUtils.choosePort())
val zooKeeperConnect = props.get("zookeeper.connect")
props.put("zookeeper.connect", zooKeeperConnect + zookeeperChroot)
+ val server = TestUtils.createServer(new KafkaConfig(props))
- server = TestUtils.createServer(new KafkaConfig(props))
- }
-
- override def tearDown() {
- server.shutdown()
- Utils.rm(server.config.logDirs)
- super.tearDown()
- }
-
- def testBrokerCreatesZKChroot {
val pathExists = ZkUtils.pathExists(zkClient, zookeeperChroot)
assertTrue(pathExists)
+
+ server.shutdown()
+ Utils.rm(server.config.logDirs)
}
+ def testConflictBrokerRegistration {
+ // Try starting a broker with the a conflicting broker id.
+ // This shouldn't affect the existing broker registration.
+
+ val brokerId = 0
+ val props1 = TestUtils.createBrokerConfig(brokerId)
+ val server1 = TestUtils.createServer(new KafkaConfig(props1))
+ val brokerRegistration = ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + brokerId)._1
+
+ val props2 = TestUtils.createBrokerConfig(brokerId)
+ try {
+ TestUtils.createServer(new KafkaConfig(props2))
+ fail("Registering a broker with a conflicting id should fail")
+ } catch {
+ case e : RuntimeException =>
+ // this is expected
+ }
+
+ // broker registration shouldn't change
+ assertEquals(brokerRegistration, ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + brokerId)._1)
+
+ server1.shutdown()
+ Utils.rm(server1.config.logDirs)
+ }
}
\ No newline at end of file