| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package kafka.server |
| |
| import kafka.admin.AdminUtils |
| import kafka.api._ |
| import kafka.message._ |
| import kafka.network._ |
| import kafka.log._ |
| import kafka.utils.ZKGroupTopicDirs |
| import org.apache.log4j.Logger |
| import scala.collection._ |
| import java.util.Properties |
| import java.util.concurrent.TimeUnit |
| import java.util.concurrent.atomic._ |
| import kafka.metrics.KafkaMetricsGroup |
| import org.I0Itec.zkclient.ZkClient |
| import kafka.common._ |
| import kafka.utils.{ZkUtils, Pool, SystemTime, Logging} |
| import kafka.network.RequestChannel.Response |
| import kafka.cluster.Broker |
| import kafka.controller.KafkaController |
| |
| |
| /** |
| * Logic to handle the various Kafka requests |
| */ |
| class KafkaApis(val requestChannel: RequestChannel, |
| val replicaManager: ReplicaManager, |
| val zkClient: ZkClient, |
| val brokerId: Int, |
| val config: KafkaConfig, |
| val controller: KafkaController) extends Logging { |
| |
| private val producerRequestPurgatory = |
| new ProducerRequestPurgatory(replicaManager.config.producerPurgatoryPurgeIntervalRequests) |
| private val fetchRequestPurgatory = |
| new FetchRequestPurgatory(requestChannel, replicaManager.config.fetchPurgatoryPurgeIntervalRequests) |
| private val delayedRequestMetrics = new DelayedRequestMetrics |
| /* following 3 data structures are updated by the update metadata request |
| * and is queried by the topic metadata request. */ |
| var leaderCache: mutable.Map[TopicAndPartition, PartitionStateInfo] = |
| new mutable.HashMap[TopicAndPartition, PartitionStateInfo]() |
| // private var allBrokers: mutable.Map[Int, Broker] = new mutable.HashMap[Int, Broker]() |
| private var aliveBrokers: mutable.Map[Int, Broker] = new mutable.HashMap[Int, Broker]() |
| private val partitionMetadataLock = new Object |
| this.logIdent = "[KafkaApi-%d] ".format(brokerId) |
| |
| /** |
| * Top-level method that handles all requests and multiplexes to the right api |
| */ |
| def handle(request: RequestChannel.Request) { |
| try{ |
| trace("Handling request: " + request.requestObj + " from client: " + request.remoteAddress) |
| request.requestId match { |
| case RequestKeys.ProduceKey => handleProducerRequest(request) |
| case RequestKeys.FetchKey => handleFetchRequest(request) |
| case RequestKeys.OffsetsKey => handleOffsetRequest(request) |
| case RequestKeys.MetadataKey => handleTopicMetadataRequest(request) |
| case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request) |
| case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request) |
| case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request) |
| case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request) |
| case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request) |
| case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request) |
| case requestId => throw new KafkaException("Unknown api code " + requestId) |
| } |
| } catch { |
| case e: Throwable => |
| request.requestObj.handleError(e, requestChannel, request) |
| error("error when handling request %s".format(request.requestObj), e) |
| } finally |
| request.apiLocalCompleteTimeMs = SystemTime.milliseconds |
| } |
| |
| def handleLeaderAndIsrRequest(request: RequestChannel.Request) { |
| val leaderAndIsrRequest = request.requestObj.asInstanceOf[LeaderAndIsrRequest] |
| try { |
| val (response, error) = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest) |
| val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.correlationId, response, error) |
| requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(leaderAndIsrResponse))) |
| } catch { |
| case e: KafkaStorageException => |
| fatal("Disk error during leadership change.", e) |
| Runtime.getRuntime.halt(1) |
| } |
| } |
| |
| def handleStopReplicaRequest(request: RequestChannel.Request) { |
| val stopReplicaRequest = request.requestObj.asInstanceOf[StopReplicaRequest] |
| val (response, error) = replicaManager.stopReplicas(stopReplicaRequest) |
| val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.correlationId, response.toMap, error) |
| requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(stopReplicaResponse))) |
| replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads() |
| } |
| |
| def handleUpdateMetadataRequest(request: RequestChannel.Request) { |
| val updateMetadataRequest = request.requestObj.asInstanceOf[UpdateMetadataRequest] |
| val stateChangeLogger = replicaManager.stateChangeLogger |
| if(updateMetadataRequest.controllerEpoch < replicaManager.controllerEpoch) { |
| val stateControllerEpochErrorMessage = ("Broker %d received update metadata request with correlation id %d from an " + |
| "old controller %d with epoch %d. Latest known controller epoch is %d").format(brokerId, |
| updateMetadataRequest.correlationId, updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, |
| replicaManager.controllerEpoch) |
| stateChangeLogger.warn(stateControllerEpochErrorMessage) |
| throw new ControllerMovedException(stateControllerEpochErrorMessage) |
| } |
| partitionMetadataLock synchronized { |
| replicaManager.controllerEpoch = updateMetadataRequest.controllerEpoch |
| // cache the list of alive brokers in the cluster |
| updateMetadataRequest.aliveBrokers.foreach(b => aliveBrokers.put(b.id, b)) |
| updateMetadataRequest.partitionStateInfos.foreach { partitionState => |
| leaderCache.put(partitionState._1, partitionState._2) |
| if(stateChangeLogger.isTraceEnabled) |
| stateChangeLogger.trace(("Broker %d cached leader info %s for partition %s in response to UpdateMetadata request " + |
| "sent by controller %d epoch %d with correlation id %d").format(brokerId, partitionState._2, partitionState._1, |
| updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId)) |
| } |
| } |
| val updateMetadataResponse = new UpdateMetadataResponse(updateMetadataRequest.correlationId) |
| requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(updateMetadataResponse))) |
| } |
| |
| def handleControlledShutdownRequest(request: RequestChannel.Request) { |
| val controlledShutdownRequest = request.requestObj.asInstanceOf[ControlledShutdownRequest] |
| val partitionsRemaining = controller.shutdownBroker(controlledShutdownRequest.brokerId) |
| val controlledShutdownResponse = new ControlledShutdownResponse(controlledShutdownRequest.correlationId, |
| ErrorMapping.NoError, partitionsRemaining) |
| requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(controlledShutdownResponse))) |
| } |
| |
| /** |
| * Check if a partitionData from a produce request can unblock any |
| * DelayedFetch requests. |
| */ |
| def maybeUnblockDelayedFetchRequests(topic: String, partition: Int, messageSizeInBytes: Int) { |
| val satisfied = fetchRequestPurgatory.update(RequestKey(topic, partition), messageSizeInBytes) |
| trace("Producer request to (%s-%d) unblocked %d fetch requests.".format(topic, partition, satisfied.size)) |
| |
| // send any newly unblocked responses |
| for(fetchReq <- satisfied) { |
| val topicData = readMessageSets(fetchReq.fetch) |
| val response = FetchResponse(fetchReq.fetch.correlationId, topicData) |
| requestChannel.sendResponse(new RequestChannel.Response(fetchReq.request, new FetchResponseSend(response))) |
| } |
| } |
| |
| /** |
| * Handle a produce request |
| */ |
| def handleProducerRequest(request: RequestChannel.Request) { |
| val produceRequest = request.requestObj.asInstanceOf[ProducerRequest] |
| val sTime = SystemTime.milliseconds |
| val localProduceResults = appendToLocalLog(produceRequest) |
| debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime)) |
| |
| val numPartitionsInError = localProduceResults.count(_.error.isDefined) |
| produceRequest.data.foreach(partitionAndData => |
| maybeUnblockDelayedFetchRequests(partitionAndData._1.topic, partitionAndData._1.partition, partitionAndData._2.sizeInBytes)) |
| |
| val allPartitionHaveReplicationFactorOne = |
| !produceRequest.data.keySet.exists( |
| m => replicaManager.getReplicationFactorForPartition(m.topic, m.partition) != 1) |
| if(produceRequest.requiredAcks == 0) { |
| // send a fake producer response if producer request.required.acks = 0. This mimics the behavior of a 0.7 producer |
| // and is tuned for very high throughput |
| requestChannel.sendResponse(new RequestChannel.Response(request.processor, request, null)) |
| } else if (produceRequest.requiredAcks == 1 || |
| produceRequest.numPartitions <= 0 || |
| allPartitionHaveReplicationFactorOne || |
| numPartitionsInError == produceRequest.numPartitions) { |
| val statuses = localProduceResults.map(r => r.key -> ProducerResponseStatus(r.errorCode, r.start)).toMap |
| val response = ProducerResponse(produceRequest.correlationId, statuses) |
| requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) |
| } else { |
| // create a list of (topic, partition) pairs to use as keys for this delayed request |
| val producerRequestKeys = produceRequest.data.keys.map( |
| topicAndPartition => new RequestKey(topicAndPartition)).toSeq |
| val statuses = localProduceResults.map(r => r.key -> ProducerResponseStatus(r.errorCode, r.end + 1)).toMap |
| val delayedProduce = new DelayedProduce(producerRequestKeys, |
| request, |
| statuses, |
| produceRequest, |
| produceRequest.ackTimeoutMs.toLong) |
| producerRequestPurgatory.watch(delayedProduce) |
| |
| /* |
| * Replica fetch requests may have arrived (and potentially satisfied) |
| * delayedProduce requests while they were being added to the purgatory. |
| * Here, we explicitly check if any of them can be satisfied. |
| */ |
| var satisfiedProduceRequests = new mutable.ArrayBuffer[DelayedProduce] |
| producerRequestKeys.foreach(key => |
| satisfiedProduceRequests ++= |
| producerRequestPurgatory.update(key, key)) |
| debug(satisfiedProduceRequests.size + |
| " producer requests unblocked during produce to local log.") |
| satisfiedProduceRequests.foreach(_.respond()) |
| // we do not need the data anymore |
| produceRequest.emptyData() |
| } |
| } |
| |
| case class ProduceResult(key: TopicAndPartition, start: Long, end: Long, error: Option[Throwable] = None) { |
| def this(key: TopicAndPartition, throwable: Throwable) = |
| this(key, -1L, -1L, Some(throwable)) |
| |
| def errorCode = error match { |
| case None => ErrorMapping.NoError |
| case Some(error) => ErrorMapping.codeFor(error.getClass.asInstanceOf[Class[Throwable]]) |
| } |
| } |
| |
| /** |
| * Helper method for handling a parsed producer request |
| */ |
| private def appendToLocalLog(producerRequest: ProducerRequest): Iterable[ProduceResult] = { |
| val partitionAndData: Map[TopicAndPartition, MessageSet] = producerRequest.data |
| trace("Append [%s] to local log ".format(partitionAndData.toString)) |
| partitionAndData.map {case (topicAndPartition, messages) => |
| BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesInRate.mark(messages.sizeInBytes) |
| BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(messages.sizeInBytes) |
| |
| try { |
| val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition) |
| val info = |
| partitionOpt match { |
| case Some(partition) => partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet]) |
| case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d" |
| .format(topicAndPartition, brokerId)) |
| |
| } |
| val numAppendedMessages = if (info.firstOffset == -1L || info.lastOffset == -1L) 0 else (info.lastOffset - info.firstOffset + 1) |
| |
| // update stats |
| BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).messagesInRate.mark(numAppendedMessages) |
| BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numAppendedMessages) |
| |
| trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d" |
| .format(messages.size, topicAndPartition.topic, topicAndPartition.partition, info.firstOffset, info.lastOffset)) |
| ProduceResult(topicAndPartition, info.firstOffset, info.lastOffset) |
| } catch { |
| // NOTE: Failed produce requests is not incremented for UnknownTopicOrPartitionException and NotLeaderForPartitionException |
| // since failed produce requests metric is supposed to indicate failure of a broker in handling a produce request |
| // for a partition it is the leader for |
| case e: KafkaStorageException => |
| fatal("Halting due to unrecoverable I/O error while handling produce request: ", e) |
| Runtime.getRuntime.halt(1) |
| null |
| case utpe: UnknownTopicOrPartitionException => |
| warn("Produce request with correlation id %d from client %s on partition %s failed due to %s".format( |
| producerRequest.correlationId, producerRequest.clientId, topicAndPartition, utpe.getMessage)) |
| new ProduceResult(topicAndPartition, utpe) |
| case nle: NotLeaderForPartitionException => |
| warn("Produce request with correlation id %d from client %s on partition %s failed due to %s".format( |
| producerRequest.correlationId, producerRequest.clientId, topicAndPartition, nle.getMessage)) |
| new ProduceResult(topicAndPartition, nle) |
| case e => |
| BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark() |
| BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark() |
| error("Error processing ProducerRequest with correlation id %d from client %s on partition %s" |
| .format(producerRequest.correlationId, producerRequest.clientId, topicAndPartition), e) |
| new ProduceResult(topicAndPartition, e) |
| } |
| } |
| } |
| |
| /** |
| * Handle a fetch request |
| */ |
| def handleFetchRequest(request: RequestChannel.Request) { |
| val fetchRequest = request.requestObj.asInstanceOf[FetchRequest] |
| if(fetchRequest.isFromFollower) { |
| maybeUpdatePartitionHw(fetchRequest) |
| // after updating HW, some delayed produce requests may be unblocked |
| var satisfiedProduceRequests = new mutable.ArrayBuffer[DelayedProduce] |
| fetchRequest.requestInfo.foreach { |
| case (topicAndPartition, _) => |
| val key = new RequestKey(topicAndPartition) |
| satisfiedProduceRequests ++= producerRequestPurgatory.update(key, key) |
| } |
| debug("Replica %d fetch unblocked %d producer requests." |
| .format(fetchRequest.replicaId, satisfiedProduceRequests.size)) |
| satisfiedProduceRequests.foreach(_.respond()) |
| } |
| |
| val dataRead = readMessageSets(fetchRequest) |
| val bytesReadable = dataRead.values.map(_.messages.sizeInBytes).sum |
| if(fetchRequest.maxWait <= 0 || |
| bytesReadable >= fetchRequest.minBytes || |
| fetchRequest.numPartitions <= 0) { |
| debug("Returning fetch response %s for fetch request with correlation id %d to client %s" |
| .format(dataRead.values.map(_.error).mkString(","), fetchRequest.correlationId, fetchRequest.clientId)) |
| val response = new FetchResponse(fetchRequest.correlationId, dataRead) |
| requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response))) |
| } else { |
| debug("Putting fetch request with correlation id %d from client %s into purgatory".format(fetchRequest.correlationId, |
| fetchRequest.clientId)) |
| // create a list of (topic, partition) pairs to use as keys for this delayed request |
| val delayedFetchKeys = fetchRequest.requestInfo.keys.toSeq.map(new RequestKey(_)) |
| val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest, fetchRequest.maxWait, bytesReadable) |
| fetchRequestPurgatory.watch(delayedFetch) |
| } |
| } |
| |
| private def maybeUpdatePartitionHw(fetchRequest: FetchRequest) { |
| debug("Maybe update partition HW due to fetch request: %s ".format(fetchRequest)) |
| fetchRequest.requestInfo.foreach(info => { |
| val (topic, partition, offset) = (info._1.topic, info._1.partition, info._2.offset) |
| replicaManager.recordFollowerPosition(topic, partition, fetchRequest.replicaId, offset) |
| }) |
| } |
| |
| /** |
| * Read from all the offset details given and return a map of |
| * (topic, partition) -> PartitionData |
| */ |
| private def readMessageSets(fetchRequest: FetchRequest) = { |
| val isFetchFromFollower = fetchRequest.isFromFollower |
| fetchRequest.requestInfo.map |
| { |
| case (TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize)) => |
| val partitionData = |
| try { |
| val (messages, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, fetchRequest.replicaId) |
| BrokerTopicStats.getBrokerTopicStats(topic).bytesOutRate.mark(messages.sizeInBytes) |
| BrokerTopicStats.getBrokerAllTopicsStats.bytesOutRate.mark(messages.sizeInBytes) |
| if (!isFetchFromFollower) { |
| new FetchResponsePartitionData(ErrorMapping.NoError, highWatermark, messages) |
| } else { |
| debug("Leader %d for partition [%s,%d] received fetch request from follower %d" |
| .format(brokerId, topic, partition, fetchRequest.replicaId)) |
| new FetchResponsePartitionData(ErrorMapping.NoError, highWatermark, messages) |
| } |
| } catch { |
| // NOTE: Failed fetch requests is not incremented for UnknownTopicOrPartitionException and NotLeaderForPartitionException |
| // since failed fetch requests metric is supposed to indicate failure of a broker in handling a fetch request |
| // for a partition it is the leader for |
| case utpe: UnknownTopicOrPartitionException => |
| warn("Fetch request with correlation id %d from client %s on partition [%s,%d] failed due to %s".format( |
| fetchRequest.correlationId, fetchRequest.clientId, topic, partition, utpe.getMessage)) |
| new FetchResponsePartitionData(ErrorMapping.codeFor(utpe.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty) |
| case nle: NotLeaderForPartitionException => |
| warn("Fetch request with correlation id %d from client %s on partition [%s,%d] failed due to %s".format( |
| fetchRequest.correlationId, fetchRequest.clientId, topic, partition, nle.getMessage)) |
| new FetchResponsePartitionData(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty) |
| case t => |
| BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark() |
| BrokerTopicStats.getBrokerAllTopicsStats.failedFetchRequestRate.mark() |
| error("Error when processing fetch request for partition [%s,%d] offset %d from %s with correlation id %d" |
| .format(topic, partition, offset, if (isFetchFromFollower) "follower" else "consumer", fetchRequest.correlationId), |
| t) |
| new FetchResponsePartitionData(ErrorMapping.codeFor(t.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty) |
| } |
| (TopicAndPartition(topic, partition), partitionData) |
| } |
| } |
| |
| /** |
| * Read from a single topic/partition at the given offset upto maxSize bytes |
| */ |
| private def readMessageSet(topic: String, |
| partition: Int, |
| offset: Long, |
| maxSize: Int, |
| fromReplicaId: Int): (MessageSet, Long) = { |
| // check if the current broker is the leader for the partitions |
| val localReplica = if(fromReplicaId == Request.DebuggingConsumerId) |
| replicaManager.getReplicaOrException(topic, partition) |
| else |
| replicaManager.getLeaderReplicaIfLocal(topic, partition) |
| trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize)) |
| val maxOffsetOpt = |
| if (fromReplicaId == Request.OrdinaryConsumerId) |
| Some(localReplica.highWatermark) |
| else |
| None |
| val messages = localReplica.log match { |
| case Some(log) => |
| log.read(offset, maxSize, maxOffsetOpt) |
| case None => |
| error("Leader for partition [%s,%d] on broker %d does not have a local log".format(topic, partition, brokerId)) |
| MessageSet.Empty |
| } |
| (messages, localReplica.highWatermark) |
| } |
| |
| /** |
| * Service the offset request API |
| */ |
| def handleOffsetRequest(request: RequestChannel.Request) { |
| val offsetRequest = request.requestObj.asInstanceOf[OffsetRequest] |
| val responseMap = offsetRequest.requestInfo.map(elem => { |
| val (topicAndPartition, partitionOffsetRequestInfo) = elem |
| try { |
| // ensure leader exists |
| val localReplica = if(!offsetRequest.isFromDebuggingClient) |
| replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition) |
| else |
| replicaManager.getReplicaOrException(topicAndPartition.topic, topicAndPartition.partition) |
| val offsets = { |
| val allOffsets = fetchOffsets(replicaManager.logManager, |
| topicAndPartition, |
| partitionOffsetRequestInfo.time, |
| partitionOffsetRequestInfo.maxNumOffsets) |
| if (!offsetRequest.isFromOrdinaryClient) { |
| allOffsets |
| } else { |
| val hw = localReplica.highWatermark |
| if (allOffsets.exists(_ > hw)) |
| hw +: allOffsets.dropWhile(_ > hw) |
| else |
| allOffsets |
| } |
| } |
| (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.NoError, offsets)) |
| } catch { |
| // NOTE: UnknownTopicOrPartitionException and NotLeaderForPartitionException are special cased since these error messages |
| // are typically transient and there is no value in logging the entire stack trace for the same |
| case utpe: UnknownTopicOrPartitionException => |
| warn("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( |
| offsetRequest.correlationId, offsetRequest.clientId, topicAndPartition, utpe.getMessage)) |
| (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(utpe.getClass.asInstanceOf[Class[Throwable]]), Nil) ) |
| case nle: NotLeaderForPartitionException => |
| warn("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( |
| offsetRequest.correlationId, offsetRequest.clientId, topicAndPartition,nle.getMessage)) |
| (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), Nil) ) |
| case e => |
| warn("Error while responding to offset request", e) |
| (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), Nil) ) |
| } |
| }) |
| val response = OffsetResponse(offsetRequest.correlationId, responseMap) |
| requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) |
| } |
| |
| def fetchOffsets(logManager: LogManager, topicAndPartition: TopicAndPartition, timestamp: Long, maxNumOffsets: Int): Seq[Long] = { |
| logManager.getLog(topicAndPartition) match { |
| case Some(log) => |
| fetchOffsetsBefore(log, timestamp, maxNumOffsets) |
| case None => |
| if (timestamp == OffsetRequest.LatestTime || timestamp == OffsetRequest.EarliestTime) |
| Seq(0L) |
| else |
| Nil |
| } |
| } |
| |
| def fetchOffsetsBefore(log: Log, timestamp: Long, maxNumOffsets: Int): Seq[Long] = { |
| val segsArray = log.logSegments.toArray |
| var offsetTimeArray: Array[(Long, Long)] = null |
| if(segsArray.last.size > 0) |
| offsetTimeArray = new Array[(Long, Long)](segsArray.length + 1) |
| else |
| offsetTimeArray = new Array[(Long, Long)](segsArray.length) |
| |
| for(i <- 0 until segsArray.length) |
| offsetTimeArray(i) = (segsArray(i).baseOffset, segsArray(i).lastModified) |
| if(segsArray.last.size > 0) |
| offsetTimeArray(segsArray.length) = (log.logEndOffset, SystemTime.milliseconds) |
| |
| var startIndex = -1 |
| timestamp match { |
| case OffsetRequest.LatestTime => |
| startIndex = offsetTimeArray.length - 1 |
| case OffsetRequest.EarliestTime => |
| startIndex = 0 |
| case _ => |
| var isFound = false |
| debug("Offset time array = " + offsetTimeArray.foreach(o => "%d, %d".format(o._1, o._2))) |
| startIndex = offsetTimeArray.length - 1 |
| while (startIndex >= 0 && !isFound) { |
| if (offsetTimeArray(startIndex)._2 <= timestamp) |
| isFound = true |
| else |
| startIndex -=1 |
| } |
| } |
| |
| val retSize = maxNumOffsets.min(startIndex + 1) |
| val ret = new Array[Long](retSize) |
| for(j <- 0 until retSize) { |
| ret(j) = offsetTimeArray(startIndex)._1 |
| startIndex -= 1 |
| } |
| // ensure that the returned seq is in descending order of offsets |
| ret.toSeq.sortBy(- _) |
| } |
| |
| /** |
| * Service the topic metadata request API |
| */ |
| def handleTopicMetadataRequest(request: RequestChannel.Request) { |
| val metadataRequest = request.requestObj.asInstanceOf[TopicMetadataRequest] |
| val topicsMetadata = new mutable.ArrayBuffer[TopicMetadata]() |
| val config = replicaManager.config |
| var uniqueTopics = Set.empty[String] |
| uniqueTopics = { |
| if(metadataRequest.topics.size > 0) |
| metadataRequest.topics.toSet |
| else |
| ZkUtils.getAllTopics(zkClient).toSet |
| } |
| val topicMetadataList = |
| partitionMetadataLock synchronized { |
| uniqueTopics.map { topic => |
| if(leaderCache.keySet.map(_.topic).contains(topic)) { |
| val partitionStateInfo = leaderCache.filter(p => p._1.topic.equals(topic)) |
| val sortedPartitions = partitionStateInfo.toList.sortWith((m1,m2) => m1._1.partition < m2._1.partition) |
| val partitionMetadata = sortedPartitions.map { case(topicAndPartition, partitionState) => |
| val replicas = leaderCache(topicAndPartition).allReplicas |
| var replicaInfo: Seq[Broker] = replicas.map(aliveBrokers.getOrElse(_, null)).filter(_ != null).toSeq |
| var leaderInfo: Option[Broker] = None |
| var isrInfo: Seq[Broker] = Nil |
| val leaderIsrAndEpoch = partitionState.leaderIsrAndControllerEpoch |
| val leader = leaderIsrAndEpoch.leaderAndIsr.leader |
| val isr = leaderIsrAndEpoch.leaderAndIsr.isr |
| debug("%s".format(topicAndPartition) + ";replicas = " + replicas + ", in sync replicas = " + isr + ", leader = " + leader) |
| try { |
| if(aliveBrokers.keySet.contains(leader)) |
| leaderInfo = Some(aliveBrokers(leader)) |
| else throw new LeaderNotAvailableException("Leader not available for partition %s".format(topicAndPartition)) |
| isrInfo = isr.map(aliveBrokers.getOrElse(_, null)).filter(_ != null) |
| if(replicaInfo.size < replicas.size) |
| throw new ReplicaNotAvailableException("Replica information not available for following brokers: " + |
| replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(",")) |
| if(isrInfo.size < isr.size) |
| throw new ReplicaNotAvailableException("In Sync Replica information not available for following brokers: " + |
| isr.filterNot(isrInfo.map(_.id).contains(_)).mkString(",")) |
| new PartitionMetadata(topicAndPartition.partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError) |
| } catch { |
| case e => |
| error("Error while fetching metadata for partition %s".format(topicAndPartition), e) |
| new PartitionMetadata(topicAndPartition.partition, leaderInfo, replicaInfo, isrInfo, |
| ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) |
| } |
| } |
| new TopicMetadata(topic, partitionMetadata) |
| } else { |
| // topic doesn't exist, send appropriate error code |
| new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode) |
| } |
| } |
| } |
| |
| // handle auto create topics |
| topicMetadataList.foreach { topicMetadata => |
| topicMetadata.errorCode match { |
| case ErrorMapping.NoError => topicsMetadata += topicMetadata |
| case ErrorMapping.UnknownTopicOrPartitionCode => |
| if (config.autoCreateTopicsEnable) { |
| try { |
| AdminUtils.createTopic(zkClient, topicMetadata.topic, config.numPartitions, config.defaultReplicationFactor) |
| info("Auto creation of topic %s with %d partitions and replication factor %d is successful!" |
| .format(topicMetadata.topic, config.numPartitions, config.defaultReplicationFactor)) |
| } catch { |
| case e: TopicExistsException => // let it go, possibly another broker created this topic |
| } |
| topicsMetadata += new TopicMetadata(topicMetadata.topic, topicMetadata.partitionsMetadata, ErrorMapping.LeaderNotAvailableCode) |
| } else { |
| topicsMetadata += topicMetadata |
| } |
| case _ => |
| debug("Error while fetching topic metadata for topic %s due to %s ".format(topicMetadata.topic, |
| ErrorMapping.exceptionFor(topicMetadata.errorCode).getClass.getName)) |
| topicsMetadata += topicMetadata |
| } |
| } |
| trace("Sending topic metadata %s for correlation id %d to client %s".format(topicsMetadata.mkString(","), metadataRequest.correlationId, metadataRequest.clientId)) |
| val response = new TopicMetadataResponse(topicsMetadata.toSeq, metadataRequest.correlationId) |
| requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) |
| } |
| |
| /* |
| * Service the Offset commit API |
| */ |
| def handleOffsetCommitRequest(request: RequestChannel.Request) { |
| val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest] |
| val responseInfo = offsetCommitRequest.requestInfo.map{ |
| case (topicAndPartition, metaAndError) => { |
| val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicAndPartition.topic) |
| try { |
| if(metaAndError.metadata != null && metaAndError.metadata.length > config.offsetMetadataMaxSize) { |
| (topicAndPartition, ErrorMapping.OffsetMetadataTooLargeCode) |
| } else { |
| ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + |
| topicAndPartition.partition, metaAndError.offset.toString) |
| (topicAndPartition, ErrorMapping.NoError) |
| } |
| } catch { |
| case e => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) |
| } |
| } |
| } |
| val response = new OffsetCommitResponse(responseInfo, |
| offsetCommitRequest.correlationId) |
| requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) |
| } |
| |
| /* |
| * Service the Offset fetch API |
| */ |
| def handleOffsetFetchRequest(request: RequestChannel.Request) { |
| val offsetFetchRequest = request.requestObj.asInstanceOf[OffsetFetchRequest] |
| val responseInfo = offsetFetchRequest.requestInfo.map( t => { |
| val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, t.topic) |
| try { |
| val payloadOpt = ZkUtils.readDataMaybeNull(zkClient, topicDirs.consumerOffsetDir + "/" + t.partition)._1 |
| payloadOpt match { |
| case Some(payload) => { |
| (t, OffsetMetadataAndError(offset=payload.toLong, error=ErrorMapping.NoError)) |
| } |
| case None => (t, OffsetMetadataAndError(OffsetMetadataAndError.InvalidOffset, OffsetMetadataAndError.NoMetadata, |
| ErrorMapping.UnknownTopicOrPartitionCode)) |
| } |
| } catch { |
| case e => |
| (t, OffsetMetadataAndError(OffsetMetadataAndError.InvalidOffset, OffsetMetadataAndError.NoMetadata, |
| ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))) |
| } |
| }) |
| val response = new OffsetFetchResponse(collection.immutable.Map(responseInfo: _*), |
| offsetFetchRequest.correlationId) |
| requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) |
| } |
| |
| def close() { |
| debug("Shutting down.") |
| fetchRequestPurgatory.shutdown() |
| producerRequestPurgatory.shutdown() |
| debug("Shut down complete.") |
| } |
| |
| private [kafka] trait MetricKey { |
| def keyLabel: String |
| } |
| private [kafka] object MetricKey { |
| val globalLabel = "All" |
| } |
| |
| private [kafka] case class RequestKey(topic: String, partition: Int) |
| extends MetricKey { |
| |
| def this(topicAndPartition: TopicAndPartition) = this(topicAndPartition.topic, topicAndPartition.partition) |
| |
| def topicAndPartition = TopicAndPartition(topic, partition) |
| |
| override def keyLabel = "%s-%d".format(topic, partition) |
| } |
| |
| /** |
| * A delayed fetch request |
| */ |
| class DelayedFetch(keys: Seq[RequestKey], request: RequestChannel.Request, val fetch: FetchRequest, delayMs: Long, initialSize: Long) |
| extends DelayedRequest(keys, request, delayMs) { |
| val bytesAccumulated = new AtomicLong(initialSize) |
| } |
| |
| /** |
| * A holding pen for fetch requests waiting to be satisfied |
| */ |
| class FetchRequestPurgatory(requestChannel: RequestChannel, purgeInterval: Int) |
| extends RequestPurgatory[DelayedFetch, Int](brokerId, purgeInterval) { |
| this.logIdent = "[FetchRequestPurgatory-%d] ".format(brokerId) |
| |
| /** |
| * A fetch request is satisfied when it has accumulated enough data to meet the min_bytes field |
| */ |
| def checkSatisfied(messageSizeInBytes: Int, delayedFetch: DelayedFetch): Boolean = { |
| val accumulatedSize = delayedFetch.bytesAccumulated.addAndGet(messageSizeInBytes) |
| accumulatedSize >= delayedFetch.fetch.minBytes |
| } |
| |
| /** |
| * When a request expires just answer it with whatever data is present |
| */ |
| def expire(delayed: DelayedFetch) { |
| debug("Expiring fetch request %s.".format(delayed.fetch)) |
| try { |
| val topicData = readMessageSets(delayed.fetch) |
| val response = FetchResponse(delayed.fetch.correlationId, topicData) |
| val fromFollower = delayed.fetch.isFromFollower |
| delayedRequestMetrics.recordDelayedFetchExpired(fromFollower) |
| requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response))) |
| } |
| catch { |
| case e1: LeaderNotAvailableException => |
| debug("Leader changed before fetch request %s expired.".format(delayed.fetch)) |
| case e2: UnknownTopicOrPartitionException => |
| debug("Replica went offline before fetch request %s expired.".format(delayed.fetch)) |
| } |
| } |
| } |
| |
| class DelayedProduce(keys: Seq[RequestKey], |
| request: RequestChannel.Request, |
| initialErrorsAndOffsets: Map[TopicAndPartition, ProducerResponseStatus], |
| val produce: ProducerRequest, |
| delayMs: Long) |
| extends DelayedRequest(keys, request, delayMs) with Logging { |
| |
| /** |
| * Map of (topic, partition) -> partition status |
| * The values in this map don't need to be synchronized since updates to the |
| * values are effectively synchronized by the ProducerRequestPurgatory's |
| * update method |
| */ |
| private [kafka] val partitionStatus = keys.map(requestKey => { |
| val producerResponseStatus = initialErrorsAndOffsets(TopicAndPartition(requestKey.topic, requestKey.partition)) |
| // if there was an error in writing to the local replica's log, then don't |
| // wait for acks on this partition |
| val (acksPending, error, nextOffset) = |
| if (producerResponseStatus.error == ErrorMapping.NoError) { |
| // Timeout error state will be cleared when requiredAcks are received |
| (true, ErrorMapping.RequestTimedOutCode, producerResponseStatus.offset) |
| } |
| else (false, producerResponseStatus.error, producerResponseStatus.offset) |
| |
| val initialStatus = PartitionStatus(acksPending, error, nextOffset) |
| trace("Initial partition status for %s = %s".format(requestKey.keyLabel, initialStatus)) |
| (requestKey, initialStatus) |
| }).toMap |
| |
| def respond() { |
| val finalErrorsAndOffsets = initialErrorsAndOffsets.map( |
| status => { |
| val pstat = partitionStatus(new RequestKey(status._1)) |
| (status._1, ProducerResponseStatus(pstat.error, pstat.requiredOffset)) |
| }) |
| |
| val response = ProducerResponse(produce.correlationId, finalErrorsAndOffsets) |
| |
| requestChannel.sendResponse(new RequestChannel.Response( |
| request, new BoundedByteBufferSend(response))) |
| } |
| |
| /** |
| * Returns true if this delayed produce request is satisfied (or more |
| * accurately, unblocked) -- this is the case if for every partition: |
| * Case A: This broker is not the leader: unblock - should return error. |
| * Case B: This broker is the leader: |
| * B.1 - If there was a localError (when writing to the local log): unblock - should return error |
| * B.2 - else, at least requiredAcks replicas should be caught up to this request. |
| * |
| * As partitions become acknowledged, we may be able to unblock |
| * DelayedFetchRequests that are pending on those partitions. |
| */ |
| def isSatisfied(followerFetchRequestKey: RequestKey) = { |
| val topic = followerFetchRequestKey.topic |
| val partitionId = followerFetchRequestKey.partition |
| val key = RequestKey(topic, partitionId) |
| val fetchPartitionStatus = partitionStatus(key) |
| trace("Checking producer request satisfaction for %s-%d, acksPending = %b" |
| .format(topic, partitionId, fetchPartitionStatus.acksPending)) |
| if (fetchPartitionStatus.acksPending) { |
| val partitionOpt = replicaManager.getPartition(topic, partitionId) |
| val (hasEnough, errorCode) = partitionOpt match { |
| case Some(partition) => |
| partition.checkEnoughReplicasReachOffset(fetchPartitionStatus.requiredOffset, produce.requiredAcks) |
| case None => |
| (false, ErrorMapping.UnknownTopicOrPartitionCode) |
| } |
| if (errorCode != ErrorMapping.NoError) { |
| fetchPartitionStatus.acksPending = false |
| fetchPartitionStatus.error = errorCode |
| } else if (hasEnough) { |
| fetchPartitionStatus.acksPending = false |
| fetchPartitionStatus.error = ErrorMapping.NoError |
| } |
| if (!fetchPartitionStatus.acksPending) { |
| val messageSizeInBytes = produce.topicPartitionMessageSizeMap(followerFetchRequestKey.topicAndPartition) |
| maybeUnblockDelayedFetchRequests(topic, partitionId, messageSizeInBytes) |
| } |
| } |
| |
| // unblocked if there are no partitions with pending acks |
| val satisfied = ! partitionStatus.exists(p => p._2.acksPending) |
| trace("Producer request satisfaction for %s-%d = %b".format(topic, partitionId, satisfied)) |
| satisfied |
| } |
| |
| case class PartitionStatus(var acksPending: Boolean, |
| var error: Short, |
| requiredOffset: Long) { |
| def setThisBrokerNotLeader() { |
| error = ErrorMapping.NotLeaderForPartitionCode |
| acksPending = false |
| } |
| |
| override def toString = |
| "acksPending:%b, error: %d, requiredOffset: %d".format( |
| acksPending, error, requiredOffset |
| ) |
| } |
| } |
| |
| /** |
| * A holding pen for produce requests waiting to be satisfied. |
| */ |
| private [kafka] class ProducerRequestPurgatory(purgeInterval: Int) |
| extends RequestPurgatory[DelayedProduce, RequestKey](brokerId, purgeInterval) { |
| this.logIdent = "[ProducerRequestPurgatory-%d] ".format(brokerId) |
| |
| protected def checkSatisfied(followerFetchRequestKey: RequestKey, |
| delayedProduce: DelayedProduce) = |
| delayedProduce.isSatisfied(followerFetchRequestKey) |
| |
| /** |
| * Handle an expired delayed request |
| */ |
| protected def expire(delayedProduce: DelayedProduce) { |
| for (partitionStatus <- delayedProduce.partitionStatus if partitionStatus._2.acksPending) |
| delayedRequestMetrics.recordDelayedProducerKeyExpired(partitionStatus._1) |
| |
| delayedProduce.respond() |
| } |
| } |
| |
| private class DelayedRequestMetrics { |
| private class DelayedProducerRequestMetrics(keyLabel: String = MetricKey.globalLabel) extends KafkaMetricsGroup { |
| val expiredRequestMeter = newMeter(keyLabel + "ExpiresPerSecond", "requests", TimeUnit.SECONDS) |
| } |
| |
| |
| private class DelayedFetchRequestMetrics(forFollower: Boolean) extends KafkaMetricsGroup { |
| private val metricPrefix = if (forFollower) "Follower" else "Consumer" |
| |
| val expiredRequestMeter = newMeter(metricPrefix + "ExpiresPerSecond", "requests", TimeUnit.SECONDS) |
| } |
| |
| private val producerRequestMetricsForKey = { |
| val valueFactory = (k: MetricKey) => new DelayedProducerRequestMetrics(k.keyLabel + "-") |
| new Pool[MetricKey, DelayedProducerRequestMetrics](Some(valueFactory)) |
| } |
| |
| private val aggregateProduceRequestMetrics = new DelayedProducerRequestMetrics |
| |
| private val aggregateFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = true) |
| private val aggregateNonFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = false) |
| |
| def recordDelayedProducerKeyExpired(key: MetricKey) { |
| val keyMetrics = producerRequestMetricsForKey.getAndMaybePut(key) |
| List(keyMetrics, aggregateProduceRequestMetrics).foreach(_.expiredRequestMeter.mark()) |
| } |
| |
| def recordDelayedFetchExpired(forFollower: Boolean) { |
| val metrics = if (forFollower) aggregateFollowerFetchRequestMetrics |
| else aggregateNonFollowerFetchRequestMetrics |
| |
| metrics.expiredRequestMeter.mark() |
| } |
| } |
| } |
| |