blob: 31e8a92cf0ce5e6b744dc96714675e687dd5c5bb [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.utils
import kafka.api.LeaderAndIsr
import kafka.common.TopicAndPartition
import kafka.controller.{IsrChangeNotificationListener, LeaderIsrAndControllerEpoch}
import kafka.utils.ZkUtils._
import org.apache.zookeeper.data.Stat
import scala.collection._
object ReplicationUtils extends Logging {
private val IsrChangeNotificationPrefix = "isr_change_"
def updateLeaderAndIsr(zkUtils: ZkUtils, topic: String, partitionId: Int, newLeaderAndIsr: LeaderAndIsr, controllerEpoch: Int,
zkVersion: Int): (Boolean,Int) = {
debug("Updated ISR for partition [%s,%d] to %s".format(topic, partitionId, newLeaderAndIsr.isr.mkString(",")))
val path = getTopicPartitionLeaderAndIsrPath(topic, partitionId)
val newLeaderData = zkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch)
// use the epoch of the controller that made the leadership decision, instead of the current controller epoch
val updatePersistentPath: (Boolean, Int) = zkUtils.conditionalUpdatePersistentPath(path, newLeaderData, zkVersion, Some(checkLeaderAndIsrZkData))
updatePersistentPath
}
def propagateIsrChanges(zkUtils: ZkUtils, isrChangeSet: Set[TopicAndPartition]): Unit = {
val isrChangeNotificationPath: String = zkUtils.createSequentialPersistentPath(
ZkUtils.IsrChangeNotificationPath + "/" + IsrChangeNotificationPrefix,
generateIsrChangeJson(isrChangeSet))
debug("Added " + isrChangeNotificationPath + " for " + isrChangeSet)
}
def checkLeaderAndIsrZkData(zkUtils: ZkUtils, path: String, expectedLeaderAndIsrInfo: String): (Boolean,Int) = {
try {
val writtenLeaderAndIsrInfo = zkUtils.readDataMaybeNull(path)
val writtenLeaderOpt = writtenLeaderAndIsrInfo._1
val writtenStat = writtenLeaderAndIsrInfo._2
val expectedLeader = parseLeaderAndIsr(expectedLeaderAndIsrInfo, path, writtenStat)
writtenLeaderOpt match {
case Some(writtenData) =>
val writtenLeader = parseLeaderAndIsr(writtenData, path, writtenStat)
(expectedLeader,writtenLeader) match {
case (Some(expectedLeader),Some(writtenLeader)) =>
if(expectedLeader == writtenLeader)
return (true,writtenStat.getVersion())
case _ =>
}
case None =>
}
} catch {
case e1: Exception =>
}
(false,-1)
}
def getLeaderIsrAndEpochForPartition(zkUtils: ZkUtils, topic: String, partition: Int):Option[LeaderIsrAndControllerEpoch] = {
val leaderAndIsrPath = getTopicPartitionLeaderAndIsrPath(topic, partition)
val (leaderAndIsrOpt, stat) = zkUtils.readDataMaybeNull(leaderAndIsrPath)
leaderAndIsrOpt.flatMap(leaderAndIsrStr => parseLeaderAndIsr(leaderAndIsrStr, leaderAndIsrPath, stat))
}
private def parseLeaderAndIsr(leaderAndIsrStr: String, path: String, stat: Stat)
: Option[LeaderIsrAndControllerEpoch] = {
Json.parseFull(leaderAndIsrStr).flatMap {m =>
val leaderIsrAndEpochInfo = m.asInstanceOf[Map[String, Any]]
val leader = leaderIsrAndEpochInfo.get("leader").get.asInstanceOf[Int]
val epoch = leaderIsrAndEpochInfo.get("leader_epoch").get.asInstanceOf[Int]
val isr = leaderIsrAndEpochInfo.get("isr").get.asInstanceOf[List[Int]]
val controllerEpoch = leaderIsrAndEpochInfo.get("controller_epoch").get.asInstanceOf[Int]
val zkPathVersion = stat.getVersion
debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for leaderAndIsrPath %s".format(leader, epoch,
isr.toString(), zkPathVersion, path))
Some(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr, zkPathVersion), controllerEpoch))}
}
private def generateIsrChangeJson(isrChanges: Set[TopicAndPartition]): String = {
val partitions = isrChanges.map(tp => Map("topic" -> tp.topic, "partition" -> tp.partition)).toArray
Json.encode(Map("version" -> IsrChangeNotificationListener.version, "partitions" -> partitions))
}
}