blob: eb492f00449744bc8d63f55b393e2a1659d38454 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.controller
import kafka.network.{Receive, BlockingChannel}
import kafka.utils.{Utils, Logging, ShutdownableThread}
import collection.mutable.HashMap
import kafka.cluster.Broker
import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue}
import kafka.server.KafkaConfig
import collection.mutable
import kafka.api._
import org.apache.log4j.Logger
import scala.Some
import kafka.common.TopicAndPartition
import kafka.api.RequestOrResponse
import collection.Set
class ControllerChannelManager (private val controllerContext: ControllerContext, config: KafkaConfig) extends Logging {
private val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo]
private val brokerLock = new Object
this.logIdent = "[Channel manager on controller " + config.brokerId + "]: "
controllerContext.liveBrokers.foreach(addNewBroker(_))
def startup() = {
brokerLock synchronized {
brokerStateInfo.foreach(brokerState => startRequestSendThread(brokerState._1))
}
}
def shutdown() = {
brokerLock synchronized {
brokerStateInfo.foreach(brokerState => removeExistingBroker(brokerState._1))
}
}
def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) => Unit = null) {
brokerLock synchronized {
val stateInfoOpt = brokerStateInfo.get(brokerId)
stateInfoOpt match {
case Some(stateInfo) =>
stateInfo.messageQueue.put((request, callback))
case None =>
warn("Not sending request %s to broker %d, since it is offline.".format(request, brokerId))
}
}
}
def addBroker(broker: Broker) {
// be careful here. Maybe the startup() API has already started the request send thread
brokerLock synchronized {
if(!brokerStateInfo.contains(broker.id)) {
addNewBroker(broker)
startRequestSendThread(broker.id)
}
}
}
def removeBroker(brokerId: Int) {
brokerLock synchronized {
removeExistingBroker(brokerId)
}
}
private def addNewBroker(broker: Broker) {
val messageQueue = new LinkedBlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)](config.controllerMessageQueueSize)
debug("Controller %d trying to connect to broker %d".format(config.brokerId,broker.id))
val channel = new BlockingChannel(broker.host, broker.port,
BlockingChannel.UseDefaultBufferSize,
BlockingChannel.UseDefaultBufferSize,
config.controllerSocketTimeoutMs)
val requestThread = new RequestSendThread(config.brokerId, controllerContext, broker, messageQueue, channel)
requestThread.setDaemon(false)
brokerStateInfo.put(broker.id, new ControllerBrokerStateInfo(channel, broker, messageQueue, requestThread))
}
private def removeExistingBroker(brokerId: Int) {
try {
brokerStateInfo(brokerId).channel.disconnect()
brokerStateInfo(brokerId).messageQueue.clear()
brokerStateInfo(brokerId).requestSendThread.shutdown()
brokerStateInfo.remove(brokerId)
}catch {
case e: Throwable => error("Error while removing broker by the controller", e)
}
}
private def startRequestSendThread(brokerId: Int) {
val requestThread = brokerStateInfo(brokerId).requestSendThread
if(requestThread.getState == Thread.State.NEW)
requestThread.start()
}
}
class RequestSendThread(val controllerId: Int,
val controllerContext: ControllerContext,
val toBroker: Broker,
val queue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)],
val channel: BlockingChannel)
extends ShutdownableThread("Controller-%d-to-broker-%d-send-thread".format(controllerId, toBroker.id)) {
private val lock = new Object()
private val stateChangeLogger = KafkaController.stateChangeLogger
connectToBroker(toBroker, channel)
override def doWork(): Unit = {
val queueItem = queue.take()
val request = queueItem._1
val callback = queueItem._2
var receive: Receive = null
try {
lock synchronized {
var isSendSuccessful = false
while(isRunning.get() && !isSendSuccessful) {
// if a broker goes down for a long time, then at some point the controller's zookeeper listener will trigger a
// removeBroker which will invoke shutdown() on this thread. At that point, we will stop retrying.
try {
channel.send(request)
receive = channel.receive()
isSendSuccessful = true
} catch {
case e: Throwable => // if the send was not successful, reconnect to broker and resend the message
warn(("Controller %d epoch %d fails to send request %s to broker %s. " +
"Reconnecting to broker.").format(controllerId, controllerContext.epoch,
request.toString, toBroker.toString()), e)
channel.disconnect()
connectToBroker(toBroker, channel)
isSendSuccessful = false
// backoff before retrying the connection and send
Utils.swallow(Thread.sleep(300))
}
}
var response: RequestOrResponse = null
request.requestId.get match {
case RequestKeys.LeaderAndIsrKey =>
response = LeaderAndIsrResponse.readFrom(receive.buffer)
case RequestKeys.StopReplicaKey =>
response = StopReplicaResponse.readFrom(receive.buffer)
case RequestKeys.UpdateMetadataKey =>
response = UpdateMetadataResponse.readFrom(receive.buffer)
}
stateChangeLogger.trace("Controller %d epoch %d received response %s for a request sent to broker %s"
.format(controllerId, controllerContext.epoch, response.toString, toBroker.toString))
if(callback != null) {
callback(response)
}
}
} catch {
case e: Throwable =>
error("Controller %d fails to send a request to broker %s".format(controllerId, toBroker.toString()), e)
// If there is any socket error (eg, socket timeout), the channel is no longer usable and needs to be recreated.
channel.disconnect()
}
}
private def connectToBroker(broker: Broker, channel: BlockingChannel) {
try {
channel.connect()
info("Controller %d connected to %s for sending state change requests".format(controllerId, broker.toString()))
} catch {
case e: Throwable => {
channel.disconnect()
error("Controller %d's connection to broker %s was unsuccessful".format(controllerId, broker.toString()), e)
}
}
}
}
class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging {
val controllerContext = controller.controllerContext
val controllerId: Int = controller.config.brokerId
val clientId: String = controller.clientId
val leaderAndIsrRequestMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), PartitionStateInfo]]
val stopReplicaRequestMap = new mutable.HashMap[Int, Seq[StopReplicaRequestInfo]]
val updateMetadataRequestMap = new mutable.HashMap[Int, mutable.HashMap[TopicAndPartition, PartitionStateInfo]]
private val stateChangeLogger = KafkaController.stateChangeLogger
def newBatch() {
// raise error if the previous batch is not empty
if(leaderAndIsrRequestMap.size > 0)
throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating " +
"a new one. Some LeaderAndIsr state changes %s might be lost ".format(leaderAndIsrRequestMap.toString()))
if(stopReplicaRequestMap.size > 0)
throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating a " +
"new one. Some StopReplica state changes %s might be lost ".format(stopReplicaRequestMap.toString()))
if(updateMetadataRequestMap.size > 0)
throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating a " +
"new one. Some UpdateMetadata state changes %s might be lost ".format(updateMetadataRequestMap.toString()))
}
def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int,
leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
replicas: Seq[Int], callback: (RequestOrResponse) => Unit = null) {
val topicAndPartition: TopicAndPartition = TopicAndPartition(topic, partition)
brokerIds.filter(b => b >= 0).foreach {
brokerId =>
leaderAndIsrRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[(String, Int), PartitionStateInfo])
leaderAndIsrRequestMap(brokerId).put((topic, partition),
PartitionStateInfo(leaderIsrAndControllerEpoch, replicas.toSet))
}
addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq,
Set(topicAndPartition))
}
def addStopReplicaRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, deletePartition: Boolean,
callback: (RequestOrResponse, Int) => Unit = null) {
brokerIds.filter(b => b >= 0).foreach { brokerId =>
stopReplicaRequestMap.getOrElseUpdate(brokerId, Seq.empty[StopReplicaRequestInfo])
val v = stopReplicaRequestMap(brokerId)
if(callback != null)
stopReplicaRequestMap(brokerId) = v :+ StopReplicaRequestInfo(PartitionAndReplica(topic, partition, brokerId),
deletePartition, (r: RequestOrResponse) => { callback(r, brokerId) })
else
stopReplicaRequestMap(brokerId) = v :+ StopReplicaRequestInfo(PartitionAndReplica(topic, partition, brokerId),
deletePartition)
}
}
/** Send UpdateMetadataRequest to the given brokers for the given partitions and partitions that are being deleted */
def addUpdateMetadataRequestForBrokers(brokerIds: Seq[Int],
partitions: collection.Set[TopicAndPartition] = Set.empty[TopicAndPartition],
callback: (RequestOrResponse) => Unit = null) {
def updateMetadataRequestMapFor(partition: TopicAndPartition, beingDeleted: Boolean) {
val leaderIsrAndControllerEpochOpt = controllerContext.partitionLeadershipInfo.get(partition)
leaderIsrAndControllerEpochOpt match {
case Some(leaderIsrAndControllerEpoch) =>
val replicas = controllerContext.partitionReplicaAssignment(partition).toSet
val partitionStateInfo = if (beingDeleted) {
val leaderAndIsr = new LeaderAndIsr(LeaderAndIsr.LeaderDuringDelete, leaderIsrAndControllerEpoch.leaderAndIsr.isr)
PartitionStateInfo(LeaderIsrAndControllerEpoch(leaderAndIsr, leaderIsrAndControllerEpoch.controllerEpoch), replicas)
} else {
PartitionStateInfo(leaderIsrAndControllerEpoch, replicas)
}
brokerIds.filter(b => b >= 0).foreach { brokerId =>
updateMetadataRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[TopicAndPartition, PartitionStateInfo])
updateMetadataRequestMap(brokerId).put(partition, partitionStateInfo)
}
case None =>
info("Leader not yet assigned for partition %s. Skip sending UpdateMetadataRequest.".format(partition))
}
}
val filteredPartitions = {
val givenPartitions = if (partitions.isEmpty)
controllerContext.partitionLeadershipInfo.keySet
else
partitions
if (controller.deleteTopicManager.partitionsToBeDeleted.isEmpty)
givenPartitions
else
givenPartitions -- controller.deleteTopicManager.partitionsToBeDeleted
}
filteredPartitions.foreach(partition => updateMetadataRequestMapFor(partition, beingDeleted = false))
controller.deleteTopicManager.partitionsToBeDeleted.foreach(partition => updateMetadataRequestMapFor(partition, beingDeleted = true))
}
def sendRequestsToBrokers(controllerEpoch: Int, correlationId: Int) {
leaderAndIsrRequestMap.foreach { m =>
val broker = m._1
val partitionStateInfos = m._2.toMap
val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet
val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id))
val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerId, controllerEpoch, correlationId, clientId)
for (p <- partitionStateInfos) {
val typeOfRequest = if (broker == p._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower"
stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request %s with correlationId %d to broker %d " +
"for partition [%s,%d]").format(controllerId, controllerEpoch, typeOfRequest,
p._2.leaderIsrAndControllerEpoch, correlationId, broker,
p._1._1, p._1._2))
}
controller.sendRequest(broker, leaderAndIsrRequest, null)
}
leaderAndIsrRequestMap.clear()
updateMetadataRequestMap.foreach { m =>
val broker = m._1
val partitionStateInfos = m._2.toMap
val updateMetadataRequest = new UpdateMetadataRequest(controllerId, controllerEpoch, correlationId, clientId,
partitionStateInfos, controllerContext.liveOrShuttingDownBrokers)
partitionStateInfos.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d sending UpdateMetadata request %s with " +
"correlationId %d to broker %d for partition %s").format(controllerId, controllerEpoch, p._2.leaderIsrAndControllerEpoch,
correlationId, broker, p._1)))
controller.sendRequest(broker, updateMetadataRequest, null)
}
updateMetadataRequestMap.clear()
stopReplicaRequestMap foreach { case(broker, replicaInfoList) =>
val stopReplicaWithDelete = replicaInfoList.filter(p => p.deletePartition == true).map(i => i.replica).toSet
val stopReplicaWithoutDelete = replicaInfoList.filter(p => p.deletePartition == false).map(i => i.replica).toSet
debug("The stop replica request (delete = true) sent to broker %d is %s"
.format(broker, stopReplicaWithDelete.mkString(",")))
debug("The stop replica request (delete = false) sent to broker %d is %s"
.format(broker, stopReplicaWithoutDelete.mkString(",")))
replicaInfoList.foreach { r =>
val stopReplicaRequest = new StopReplicaRequest(r.deletePartition,
Set(TopicAndPartition(r.replica.topic, r.replica.partition)), controllerId, controllerEpoch, correlationId)
controller.sendRequest(broker, stopReplicaRequest, r.callback)
}
}
stopReplicaRequestMap.clear()
}
}
case class ControllerBrokerStateInfo(channel: BlockingChannel,
broker: Broker,
messageQueue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)],
requestSendThread: RequestSendThread)
case class StopReplicaRequestInfo(replica: PartitionAndReplica, deletePartition: Boolean, callback: (RequestOrResponse) => Unit = null)
class Callbacks private (var leaderAndIsrResponseCallback:(RequestOrResponse) => Unit = null,
var updateMetadataResponseCallback:(RequestOrResponse) => Unit = null,
var stopReplicaResponseCallback:(RequestOrResponse, Int) => Unit = null)
object Callbacks {
class CallbackBuilder {
var leaderAndIsrResponseCbk:(RequestOrResponse) => Unit = null
var updateMetadataResponseCbk:(RequestOrResponse) => Unit = null
var stopReplicaResponseCbk:(RequestOrResponse, Int) => Unit = null
def leaderAndIsrCallback(cbk: (RequestOrResponse) => Unit): CallbackBuilder = {
leaderAndIsrResponseCbk = cbk
this
}
def updateMetadataCallback(cbk: (RequestOrResponse) => Unit): CallbackBuilder = {
updateMetadataResponseCbk = cbk
this
}
def stopReplicaCallback(cbk: (RequestOrResponse, Int) => Unit): CallbackBuilder = {
stopReplicaResponseCbk = cbk
this
}
def build: Callbacks = {
new Callbacks(leaderAndIsrResponseCbk, updateMetadataResponseCbk, stopReplicaResponseCbk)
}
}
}