blob: 018946ed9b4f1134d3e0f3e4aaebf845547a0b79 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.controller
import java.net.SocketTimeoutException
import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}
import kafka.api._
import kafka.cluster.Broker
import kafka.common.{KafkaException, TopicAndPartition}
import kafka.server.KafkaConfig
import kafka.utils._
import org.apache.kafka.clients.{ClientRequest, ClientResponse, ManualMetadataUpdater, NetworkClient}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{ChannelBuilders, LoginType, Mode, NetworkReceive, Selectable, Selector}
import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
import org.apache.kafka.common.requests.{UpdateMetadataRequest, _}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{Node, TopicPartition}
import scala.collection.JavaConverters._
import scala.collection.{Set, mutable}
import scala.collection.mutable.HashMap
class ControllerChannelManager(controllerContext: ControllerContext, config: KafkaConfig, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging {
protected val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo]
private val brokerLock = new Object
this.logIdent = "[Channel manager on controller " + config.brokerId + "]: "
controllerContext.liveBrokers.foreach(addNewBroker(_))
def startup() = {
brokerLock synchronized {
brokerStateInfo.foreach(brokerState => startRequestSendThread(brokerState._1))
}
}
def shutdown() = {
brokerLock synchronized {
brokerStateInfo.values.foreach(removeExistingBroker)
}
}
def sendRequest(brokerId: Int, apiKey: ApiKeys, apiVersion: Option[Short], request: AbstractRequest, callback: AbstractRequestResponse => Unit = null) {
brokerLock synchronized {
val stateInfoOpt = brokerStateInfo.get(brokerId)
stateInfoOpt match {
case Some(stateInfo) =>
stateInfo.messageQueue.put(QueueItem(apiKey, apiVersion, request, callback))
case None =>
warn("Not sending request %s to broker %d, since it is offline.".format(request, brokerId))
}
}
}
def addBroker(broker: Broker) {
// be careful here. Maybe the startup() API has already started the request send thread
brokerLock synchronized {
if(!brokerStateInfo.contains(broker.id)) {
addNewBroker(broker)
startRequestSendThread(broker.id)
}
}
}
def removeBroker(brokerId: Int) {
brokerLock synchronized {
removeExistingBroker(brokerStateInfo(brokerId))
}
}
private def addNewBroker(broker: Broker) {
val messageQueue = new LinkedBlockingQueue[QueueItem]
debug("Controller %d trying to connect to broker %d".format(config.brokerId, broker.id))
val brokerEndPoint = broker.getBrokerEndPoint(config.interBrokerSecurityProtocol)
val brokerNode = new Node(broker.id, brokerEndPoint.host, brokerEndPoint.port)
val networkClient = {
val channelBuilder = ChannelBuilders.create(
config.interBrokerSecurityProtocol,
Mode.CLIENT,
LoginType.SERVER,
config.values,
config.saslMechanismInterBrokerProtocol,
config.saslInterBrokerHandshakeRequestEnable
)
val selector = new Selector(
NetworkReceive.UNLIMITED,
config.connectionsMaxIdleMs,
metrics,
time,
"controller-channel",
Map("broker-id" -> broker.id.toString).asJava,
false,
channelBuilder
)
new NetworkClient(
selector,
new ManualMetadataUpdater(Seq(brokerNode).asJava),
config.brokerId.toString,
1,
0,
Selectable.USE_DEFAULT_BUFFER_SIZE,
Selectable.USE_DEFAULT_BUFFER_SIZE,
config.requestTimeoutMs,
time
)
}
val threadName = threadNamePrefix match {
case None => "Controller-%d-to-broker-%d-send-thread".format(config.brokerId, broker.id)
case Some(name) => "%s:Controller-%d-to-broker-%d-send-thread".format(name, config.brokerId, broker.id)
}
val requestThread = new RequestSendThread(config.brokerId, controllerContext, messageQueue, networkClient,
brokerNode, config, time, threadName)
requestThread.setDaemon(false)
brokerStateInfo.put(broker.id, new ControllerBrokerStateInfo(networkClient, brokerNode, messageQueue, requestThread))
}
private def removeExistingBroker(brokerState: ControllerBrokerStateInfo) {
try {
brokerState.networkClient.close()
brokerState.messageQueue.clear()
brokerState.requestSendThread.shutdown()
brokerStateInfo.remove(brokerState.brokerNode.id)
} catch {
case e: Throwable => error("Error while removing broker by the controller", e)
}
}
protected def startRequestSendThread(brokerId: Int) {
val requestThread = brokerStateInfo(brokerId).requestSendThread
if(requestThread.getState == Thread.State.NEW)
requestThread.start()
}
}
case class QueueItem(apiKey: ApiKeys, apiVersion: Option[Short], request: AbstractRequest, callback: AbstractRequestResponse => Unit)
class RequestSendThread(val controllerId: Int,
val controllerContext: ControllerContext,
val queue: BlockingQueue[QueueItem],
val networkClient: NetworkClient,
val brokerNode: Node,
val config: KafkaConfig,
val time: Time,
name: String)
extends ShutdownableThread(name = name) {
private val lock = new Object()
private val stateChangeLogger = KafkaController.stateChangeLogger
private val socketTimeoutMs = config.controllerSocketTimeoutMs
override def doWork(): Unit = {
def backoff(): Unit = CoreUtils.swallowTrace(Thread.sleep(300))
val QueueItem(apiKey, apiVersion, request, callback) = queue.take()
import NetworkClientBlockingOps._
var clientResponse: ClientResponse = null
try {
lock synchronized {
var isSendSuccessful = false
while (isRunning.get() && !isSendSuccessful) {
// if a broker goes down for a long time, then at some point the controller's zookeeper listener will trigger a
// removeBroker which will invoke shutdown() on this thread. At that point, we will stop retrying.
try {
if (!brokerReady()) {
isSendSuccessful = false
backoff()
}
else {
val requestHeader = apiVersion.fold(networkClient.nextRequestHeader(apiKey))(networkClient.nextRequestHeader(apiKey, _))
val send = new RequestSend(brokerNode.idString, requestHeader, request.toStruct)
val clientRequest = new ClientRequest(time.milliseconds(), true, send, null)
clientResponse = networkClient.blockingSendAndReceive(clientRequest)(time)
isSendSuccessful = true
}
} catch {
case e: Throwable => // if the send was not successful, reconnect to broker and resend the message
warn(("Controller %d epoch %d fails to send request %s to broker %s. " +
"Reconnecting to broker.").format(controllerId, controllerContext.epoch,
request.toString, brokerNode.toString()), e)
networkClient.close(brokerNode.idString)
isSendSuccessful = false
backoff()
}
}
if (clientResponse != null) {
val response = ApiKeys.forId(clientResponse.request.request.header.apiKey) match {
case ApiKeys.LEADER_AND_ISR => new LeaderAndIsrResponse(clientResponse.responseBody)
case ApiKeys.STOP_REPLICA => new StopReplicaResponse(clientResponse.responseBody)
case ApiKeys.UPDATE_METADATA_KEY => new UpdateMetadataResponse(clientResponse.responseBody)
case apiKey => throw new KafkaException(s"Unexpected apiKey received: $apiKey")
}
stateChangeLogger.trace("Controller %d epoch %d received response %s for a request sent to broker %s"
.format(controllerId, controllerContext.epoch, response.toString, brokerNode.toString))
if (callback != null) {
callback(response)
}
}
}
} catch {
case e: Throwable =>
error("Controller %d fails to send a request to broker %s".format(controllerId, brokerNode.toString()), e)
// If there is any socket error (eg, socket timeout), the connection is no longer usable and needs to be recreated.
networkClient.close(brokerNode.idString)
}
}
private def brokerReady(): Boolean = {
import NetworkClientBlockingOps._
try {
if (networkClient.isReady(brokerNode, time.milliseconds()))
true
else {
val ready = networkClient.blockingReady(brokerNode, socketTimeoutMs)(time)
if (!ready)
throw new SocketTimeoutException(s"Failed to connect within $socketTimeoutMs ms")
info("Controller %d connected to %s for sending state change requests".format(controllerId, brokerNode.toString()))
true
}
} catch {
case e: Throwable =>
warn("Controller %d's connection to broker %s was unsuccessful".format(controllerId, brokerNode.toString()), e)
networkClient.close(brokerNode.idString)
false
}
}
}
class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging {
val controllerContext = controller.controllerContext
val controllerId: Int = controller.config.brokerId
val leaderAndIsrRequestMap = mutable.Map.empty[Int, mutable.Map[TopicPartition, PartitionStateInfo]]
val stopReplicaRequestMap = mutable.Map.empty[Int, Seq[StopReplicaRequestInfo]]
val updateMetadataRequestMap = mutable.Map.empty[Int, mutable.Map[TopicPartition, PartitionStateInfo]]
private val stateChangeLogger = KafkaController.stateChangeLogger
def newBatch() {
// raise error if the previous batch is not empty
if (leaderAndIsrRequestMap.size > 0)
throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating " +
"a new one. Some LeaderAndIsr state changes %s might be lost ".format(leaderAndIsrRequestMap.toString()))
if (stopReplicaRequestMap.size > 0)
throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating a " +
"new one. Some StopReplica state changes %s might be lost ".format(stopReplicaRequestMap.toString()))
if (updateMetadataRequestMap.size > 0)
throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating a " +
"new one. Some UpdateMetadata state changes %s might be lost ".format(updateMetadataRequestMap.toString()))
}
def clear() {
leaderAndIsrRequestMap.clear()
stopReplicaRequestMap.clear()
updateMetadataRequestMap.clear()
}
def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int,
leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
replicas: Seq[Int], callback: AbstractRequestResponse => Unit = null) {
val topicPartition = new TopicPartition(topic, partition)
brokerIds.filter(_ >= 0).foreach { brokerId =>
val result = leaderAndIsrRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty)
result.put(topicPartition, PartitionStateInfo(leaderIsrAndControllerEpoch, replicas.toSet))
}
addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq,
Set(TopicAndPartition(topic, partition)))
}
def addStopReplicaRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, deletePartition: Boolean,
callback: (AbstractRequestResponse, Int) => Unit = null) {
brokerIds.filter(b => b >= 0).foreach { brokerId =>
stopReplicaRequestMap.getOrElseUpdate(brokerId, Seq.empty[StopReplicaRequestInfo])
val v = stopReplicaRequestMap(brokerId)
if(callback != null)
stopReplicaRequestMap(brokerId) = v :+ StopReplicaRequestInfo(PartitionAndReplica(topic, partition, brokerId),
deletePartition, (r: AbstractRequestResponse) => callback(r, brokerId))
else
stopReplicaRequestMap(brokerId) = v :+ StopReplicaRequestInfo(PartitionAndReplica(topic, partition, brokerId),
deletePartition)
}
}
/** Send UpdateMetadataRequest to the given brokers for the given partitions and partitions that are being deleted */
def addUpdateMetadataRequestForBrokers(brokerIds: Seq[Int],
partitions: collection.Set[TopicAndPartition] = Set.empty[TopicAndPartition],
callback: AbstractRequestResponse => Unit = null) {
def updateMetadataRequestMapFor(partition: TopicAndPartition, beingDeleted: Boolean) {
val leaderIsrAndControllerEpochOpt = controllerContext.partitionLeadershipInfo.get(partition)
leaderIsrAndControllerEpochOpt match {
case Some(leaderIsrAndControllerEpoch) =>
val replicas = controllerContext.partitionReplicaAssignment(partition).toSet
val partitionStateInfo = if (beingDeleted) {
val leaderAndIsr = new LeaderAndIsr(LeaderAndIsr.LeaderDuringDelete, leaderIsrAndControllerEpoch.leaderAndIsr.isr)
PartitionStateInfo(LeaderIsrAndControllerEpoch(leaderAndIsr, leaderIsrAndControllerEpoch.controllerEpoch), replicas)
} else {
PartitionStateInfo(leaderIsrAndControllerEpoch, replicas)
}
brokerIds.filter(b => b >= 0).foreach { brokerId =>
updateMetadataRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty[TopicPartition, PartitionStateInfo])
updateMetadataRequestMap(brokerId).put(new TopicPartition(partition.topic, partition.partition), partitionStateInfo)
}
case None =>
info("Leader not yet assigned for partition %s. Skip sending UpdateMetadataRequest.".format(partition))
}
}
val filteredPartitions = {
val givenPartitions = if (partitions.isEmpty)
controllerContext.partitionLeadershipInfo.keySet
else
partitions
if (controller.deleteTopicManager.partitionsToBeDeleted.isEmpty)
givenPartitions
else
givenPartitions -- controller.deleteTopicManager.partitionsToBeDeleted
}
if (filteredPartitions.isEmpty)
brokerIds.filter(b => b >= 0).foreach { brokerId =>
updateMetadataRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty[TopicPartition, PartitionStateInfo])
}
else
filteredPartitions.foreach(partition => updateMetadataRequestMapFor(partition, beingDeleted = false))
controller.deleteTopicManager.partitionsToBeDeleted.foreach(partition => updateMetadataRequestMapFor(partition, beingDeleted = true))
}
def sendRequestsToBrokers(controllerEpoch: Int) {
try {
leaderAndIsrRequestMap.foreach { case (broker, partitionStateInfos) =>
partitionStateInfos.foreach { case (topicPartition, state) =>
val typeOfRequest = if (broker == state.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower"
stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request %s to broker %d " +
"for partition [%s,%d]").format(controllerId, controllerEpoch, typeOfRequest,
state.leaderIsrAndControllerEpoch, broker,
topicPartition.topic, topicPartition.partition))
}
val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet
val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)).map {
_.getNode(controller.config.interBrokerSecurityProtocol)
}
val partitionStates = partitionStateInfos.map { case (topicPartition, partitionStateInfo) =>
val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpoch
val partitionState = new LeaderAndIsrRequest.PartitionState(controllerEpoch, leaderIsr.leader,
leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, leaderIsr.zkVersion,
partitionStateInfo.allReplicas.map(Integer.valueOf).asJava
)
topicPartition -> partitionState
}
val leaderAndIsrRequest = new LeaderAndIsrRequest(controllerId, controllerEpoch, partitionStates.asJava, leaders.asJava)
controller.sendRequest(broker, ApiKeys.LEADER_AND_ISR, None, leaderAndIsrRequest, null)
}
leaderAndIsrRequestMap.clear()
updateMetadataRequestMap.foreach { case (broker, partitionStateInfos) =>
partitionStateInfos.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d sending UpdateMetadata request %s " +
"to broker %d for partition %s").format(controllerId, controllerEpoch, p._2.leaderIsrAndControllerEpoch,
broker, p._1)))
val partitionStates = partitionStateInfos.map { case (topicPartition, partitionStateInfo) =>
val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpoch
val partitionState = new UpdateMetadataRequest.PartitionState(controllerEpoch, leaderIsr.leader,
leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, leaderIsr.zkVersion,
partitionStateInfo.allReplicas.map(Integer.valueOf).asJava
)
topicPartition -> partitionState
}
val version = if (controller.config.interBrokerProtocolVersion >= KAFKA_0_10_0_IV0) 2: Short
else if (controller.config.interBrokerProtocolVersion >= KAFKA_0_9_0) 1: Short
else 0: Short
val updateMetadataRequest =
if (version == 0) {
val liveBrokers = controllerContext.liveOrShuttingDownBrokers.map(_.getNode(SecurityProtocol.PLAINTEXT))
new UpdateMetadataRequest(controllerId, controllerEpoch, liveBrokers.asJava, partitionStates.asJava)
}
else {
val liveBrokers = controllerContext.liveOrShuttingDownBrokers.map { broker =>
val endPoints = broker.endPoints.map { case (securityProtocol, endPoint) =>
securityProtocol -> new UpdateMetadataRequest.EndPoint(endPoint.host, endPoint.port)
}
new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava, broker.rack.orNull)
}
new UpdateMetadataRequest(version, controllerId, controllerEpoch, partitionStates.asJava, liveBrokers.asJava)
}
controller.sendRequest(broker, ApiKeys.UPDATE_METADATA_KEY, Some(version), updateMetadataRequest, null)
}
updateMetadataRequestMap.clear()
stopReplicaRequestMap.foreach { case (broker, replicaInfoList) =>
val stopReplicaWithDelete = replicaInfoList.filter(_.deletePartition).map(_.replica).toSet
val stopReplicaWithoutDelete = replicaInfoList.filterNot(_.deletePartition).map(_.replica).toSet
debug("The stop replica request (delete = true) sent to broker %d is %s"
.format(broker, stopReplicaWithDelete.mkString(",")))
debug("The stop replica request (delete = false) sent to broker %d is %s"
.format(broker, stopReplicaWithoutDelete.mkString(",")))
replicaInfoList.foreach { r =>
val stopReplicaRequest = new StopReplicaRequest(controllerId, controllerEpoch, r.deletePartition,
Set(new TopicPartition(r.replica.topic, r.replica.partition)).asJava)
controller.sendRequest(broker, ApiKeys.STOP_REPLICA, None, stopReplicaRequest, r.callback)
}
}
stopReplicaRequestMap.clear()
} catch {
case e : Throwable => {
if (leaderAndIsrRequestMap.size > 0) {
error("Haven't been able to send leader and isr requests, current state of " +
s"the map is $leaderAndIsrRequestMap")
}
if (updateMetadataRequestMap.size > 0) {
error("Haven't been able to send metadata update requests, current state of " +
s"the map is $updateMetadataRequestMap")
}
if (stopReplicaRequestMap.size > 0) {
error("Haven't been able to send stop replica requests, current state of " +
s"the map is $stopReplicaRequestMap")
}
throw new IllegalStateException(e)
}
}
}
}
case class ControllerBrokerStateInfo(networkClient: NetworkClient,
brokerNode: Node,
messageQueue: BlockingQueue[QueueItem],
requestSendThread: RequestSendThread)
case class StopReplicaRequestInfo(replica: PartitionAndReplica, deletePartition: Boolean, callback: AbstractRequestResponse => Unit = null)
class Callbacks private (var leaderAndIsrResponseCallback: AbstractRequestResponse => Unit = null,
var updateMetadataResponseCallback: AbstractRequestResponse => Unit = null,
var stopReplicaResponseCallback: (AbstractRequestResponse, Int) => Unit = null)
object Callbacks {
class CallbackBuilder {
var leaderAndIsrResponseCbk: AbstractRequestResponse => Unit = null
var updateMetadataResponseCbk: AbstractRequestResponse => Unit = null
var stopReplicaResponseCbk: (AbstractRequestResponse, Int) => Unit = null
def leaderAndIsrCallback(cbk: AbstractRequestResponse => Unit): CallbackBuilder = {
leaderAndIsrResponseCbk = cbk
this
}
def updateMetadataCallback(cbk: AbstractRequestResponse => Unit): CallbackBuilder = {
updateMetadataResponseCbk = cbk
this
}
def stopReplicaCallback(cbk: (AbstractRequestResponse, Int) => Unit): CallbackBuilder = {
stopReplicaResponseCbk = cbk
this
}
def build: Callbacks = {
new Callbacks(leaderAndIsrResponseCbk, updateMetadataResponseCbk, stopReplicaResponseCbk)
}
}
}