kafka-1971; starting a broker with a conflicting id will delete the previous broker registration; patched by Jun Rao; reviewed by Neha Narkhede
diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
index 4acdd70..7907987 100644
--- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
+++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
@@ -39,17 +39,12 @@
 
   val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + brokerId
   val sessionExpireListener = new SessionExpireListener
-  
+
   def startup() {
     zkClient.subscribeStateChanges(sessionExpireListener)
     register()
   }
 
-  def shutdown() {
-    zkClient.unsubscribeStateChanges(sessionExpireListener)
-    ZkUtils.deregisterBrokerInZk(zkClient, brokerId)
-  }
-
   /**
    * Register this broker as "alive" in zookeeper
    */
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 1691ad7..5cd4c84 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -268,8 +268,6 @@
       if (canShutdown) {
         Utils.swallow(controlledShutdown())
         brokerState.newState(BrokerShuttingDown)
-        if(kafkaHealthcheck != null)
-          Utils.swallow(kafkaHealthcheck.shutdown())
         if(socketServer != null)
           Utils.swallow(socketServer.shutdown())
         if(requestHandlerPool != null)
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 56e3e88..a6f4d46 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -179,12 +179,6 @@
     info("Registered broker %d at path %s with address %s:%d.".format(id, brokerIdPath, host, port))
   }
 
-  def deregisterBrokerInZk(zkClient: ZkClient, id: Int) {
-    val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
-    deletePath(zkClient, brokerIdPath)
-    info("Deregistered broker %d at path %s.".format(id, brokerIdPath))
-  }
-
   def getConsumerPartitionOwnerPath(group: String, topic: String, partition: Int): String = {
     val topicDirs = new ZKGroupTopicDirs(group, topic)
     topicDirs.consumerOwnerDir + "/" + partition
diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
index a0ed485..93af7df 100644
--- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
@@ -18,7 +18,6 @@
 package kafka.server
 
 import org.scalatest.junit.JUnit3Suite
-import kafka.zk
 import kafka.utils.ZkUtils
 import kafka.utils.Utils
 import kafka.utils.TestUtils
@@ -27,28 +26,44 @@
 import junit.framework.Assert._
 
 class ServerStartupTest extends JUnit3Suite with ZooKeeperTestHarness {
-  var server : KafkaServer = null
-  val brokerId = 0
-  val zookeeperChroot = "/kafka-chroot-for-unittest"
 
-  override def setUp() {
-    super.setUp()
+  def testBrokerCreatesZKChroot {
+    val brokerId = 0
+    val zookeeperChroot = "/kafka-chroot-for-unittest"
     val props = TestUtils.createBrokerConfig(brokerId, TestUtils.choosePort())
     val zooKeeperConnect = props.get("zookeeper.connect")
     props.put("zookeeper.connect", zooKeeperConnect + zookeeperChroot)
+    val server = TestUtils.createServer(new KafkaConfig(props))
 
-    server = TestUtils.createServer(new KafkaConfig(props))
-  }
-
-  override def tearDown() {
-    server.shutdown()
-    Utils.rm(server.config.logDirs)
-    super.tearDown()
-  }
-
-  def testBrokerCreatesZKChroot {
     val pathExists = ZkUtils.pathExists(zkClient, zookeeperChroot)
     assertTrue(pathExists)
+
+    server.shutdown()
+    Utils.rm(server.config.logDirs)
   }
 
+  def testConflictBrokerRegistration {
+    // Try starting a broker with the a conflicting broker id.
+    // This shouldn't affect the existing broker registration.
+
+    val brokerId = 0
+    val props1 = TestUtils.createBrokerConfig(brokerId)
+    val server1 = TestUtils.createServer(new KafkaConfig(props1))
+    val brokerRegistration = ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + brokerId)._1
+
+    val props2 = TestUtils.createBrokerConfig(brokerId)
+    try {
+      TestUtils.createServer(new KafkaConfig(props2))
+      fail("Registering a broker with a conflicting id should fail")
+    } catch {
+      case e : RuntimeException =>
+      // this is expected
+    }
+
+    // broker registration shouldn't change
+    assertEquals(brokerRegistration, ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + brokerId)._1)
+
+    server1.shutdown()
+    Utils.rm(server1.config.logDirs)
+  }
 }
\ No newline at end of file