| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package kafka.server |
| |
| import kafka.utils.ZkUtils._ |
| import kafka.utils.CoreUtils._ |
| import kafka.utils.{Json, SystemTime, Logging, ZKCheckedEphemeral} |
| import org.I0Itec.zkclient.exception.ZkNodeExistsException |
| import org.I0Itec.zkclient.IZkDataListener |
| import kafka.controller.ControllerContext |
| import kafka.controller.KafkaController |
| import org.apache.kafka.common.security.JaasUtils |
| |
| /** |
| * This class handles zookeeper based leader election based on an ephemeral path. The election module does not handle |
| * session expiration, instead it assumes the caller will handle it by probably try to re-elect again. If the existing |
| * leader is dead, this class will handle automatic re-election and if it succeeds, it invokes the leader state change |
| * callback |
| */ |
| class ZookeeperLeaderElector(controllerContext: ControllerContext, |
| electionPath: String, |
| onBecomingLeader: () => Unit, |
| onResigningAsLeader: () => Unit, |
| brokerId: Int) |
| extends LeaderElector with Logging { |
| var leaderId = -1 |
| // create the election path in ZK, if one does not exist |
| val index = electionPath.lastIndexOf("/") |
| if (index > 0) |
| controllerContext.zkUtils.makeSurePersistentPathExists(electionPath.substring(0, index)) |
| val leaderChangeListener = new LeaderChangeListener |
| |
| def startup { |
| inLock(controllerContext.controllerLock) { |
| controllerContext.zkUtils.zkClient.subscribeDataChanges(electionPath, leaderChangeListener) |
| elect |
| } |
| } |
| |
| private def getControllerID(): Int = { |
| controllerContext.zkUtils.readDataMaybeNull(electionPath)._1 match { |
| case Some(controller) => KafkaController.parseControllerId(controller) |
| case None => -1 |
| } |
| } |
| |
| def elect: Boolean = { |
| val timestamp = SystemTime.milliseconds.toString |
| val electString = Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp)) |
| |
| leaderId = getControllerID |
| /* |
| * We can get here during the initial startup and the handleDeleted ZK callback. Because of the potential race condition, |
| * it's possible that the controller has already been elected when we get here. This check will prevent the following |
| * createEphemeralPath method from getting into an infinite loop if this broker is already the controller. |
| */ |
| if(leaderId != -1) { |
| debug("Broker %d has been elected as leader, so stopping the election process.".format(leaderId)) |
| return amILeader |
| } |
| |
| try { |
| val zkCheckedEphemeral = new ZKCheckedEphemeral(electionPath, |
| electString, |
| controllerContext.zkUtils.zkConnection.getZookeeper, |
| JaasUtils.isZkSecurityEnabled()) |
| zkCheckedEphemeral.create() |
| info(brokerId + " successfully elected as leader") |
| leaderId = brokerId |
| onBecomingLeader() |
| } catch { |
| case e: ZkNodeExistsException => |
| // If someone else has written the path, then |
| leaderId = getControllerID |
| |
| if (leaderId != -1) |
| debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId)) |
| else |
| warn("A leader has been elected but just resigned, this will result in another round of election") |
| |
| case e2: Throwable => |
| error("Error while electing or becoming leader on broker %d".format(brokerId), e2) |
| resign() |
| } |
| amILeader |
| } |
| |
| def close = { |
| leaderId = -1 |
| } |
| |
| def amILeader : Boolean = leaderId == brokerId |
| |
| def resign() = { |
| leaderId = -1 |
| controllerContext.zkUtils.deletePath(electionPath) |
| } |
| |
| /** |
| * We do not have session expiration listen in the ZkElection, but assuming the caller who uses this module will |
| * have its own session expiration listener and handler |
| */ |
| class LeaderChangeListener extends IZkDataListener with Logging { |
| /** |
| * Called when the leader information stored in zookeeper has changed. Record the new leader in memory |
| * @throws Exception On any error. |
| */ |
| @throws(classOf[Exception]) |
| def handleDataChange(dataPath: String, data: Object) { |
| inLock(controllerContext.controllerLock) { |
| val amILeaderBeforeDataChange = amILeader |
| leaderId = KafkaController.parseControllerId(data.toString) |
| info("New leader is %d".format(leaderId)) |
| // The old leader needs to resign leadership if it is no longer the leader |
| if (amILeaderBeforeDataChange && !amILeader) |
| onResigningAsLeader() |
| } |
| } |
| |
| /** |
| * Called when the leader information stored in zookeeper has been delete. Try to elect as the leader |
| * @throws Exception |
| * On any error. |
| */ |
| @throws(classOf[Exception]) |
| def handleDataDeleted(dataPath: String) { |
| inLock(controllerContext.controllerLock) { |
| debug("%s leader change listener fired for path %s to handle data deleted: trying to elect as a leader" |
| .format(brokerId, dataPath)) |
| if(amILeader) |
| onResigningAsLeader() |
| elect |
| } |
| } |
| } |
| } |