blob: e537afb453fc921d4c665043b8fd3ac64e09ecc6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import org.apache.log4j.Logger
import kafka.log._
import kafka.network._
import kafka.message._
import kafka.api._
import kafka.common.{MessageSizeTooLargeException, ErrorMapping}
import java.util.concurrent.atomic.AtomicLong
import kafka.utils._
/**
* Logic to handle the various Kafka requests
*/
private[kafka] class KafkaRequestHandlers(val logManager: LogManager) extends Logging {
private val requestLogger = Logger.getLogger("kafka.request.logger")
def handlerFor(requestTypeId: Short, request: Receive): Handler.Handler = {
requestTypeId match {
case RequestKeys.Produce => handleProducerRequest _
case RequestKeys.Fetch => handleFetchRequest _
case RequestKeys.MultiFetch => handleMultiFetchRequest _
case RequestKeys.MultiProduce => handleMultiProducerRequest _
case RequestKeys.Offsets => handleOffsetRequest _
case _ => throw new IllegalStateException("No mapping found for handler id " + requestTypeId)
}
}
def handleProducerRequest(receive: Receive): Option[Send] = {
val sTime = SystemTime.milliseconds
val request = ProducerRequest.readFrom(receive.buffer)
if(requestLogger.isTraceEnabled)
requestLogger.trace("Producer request " + request.toString)
handleProducerRequest(request, "ProduceRequest")
debug("kafka produce time " + (SystemTime.milliseconds - sTime) + " ms")
None
}
def handleMultiProducerRequest(receive: Receive): Option[Send] = {
val request = MultiProducerRequest.readFrom(receive.buffer)
if(requestLogger.isTraceEnabled)
requestLogger.trace("Multiproducer request " + request.toString)
request.produces.map(handleProducerRequest(_, "MultiProducerRequest"))
None
}
private def handleProducerRequest(request: ProducerRequest, requestHandlerName: String) = {
val partition = request.getTranslatedPartition(logManager.chooseRandomPartition)
try {
logManager.getOrCreateLog(request.topic, partition).append(request.messages)
trace(request.messages.sizeInBytes + " bytes written to logs.")
request.messages.foreach(m => trace("wrote message %s to disk".format(m.message.checksum)))
BrokerTopicStat.getBrokerTopicStat(request.topic).recordBytesIn(request.messages.sizeInBytes)
BrokerTopicStat.getBrokerAllTopicStat.recordBytesIn(request.messages.sizeInBytes)
}
catch {
case e: MessageSizeTooLargeException =>
warn(e.getMessage() + " on " + request.topic + ":" + partition)
BrokerTopicStat.getBrokerTopicStat(request.topic).recordFailedProduceRequest
BrokerTopicStat.getBrokerAllTopicStat.recordFailedProduceRequest
case t =>
error("Error processing " + requestHandlerName + " on " + request.topic + ":" + partition, t)
BrokerTopicStat.getBrokerTopicStat(request.topic).recordFailedProduceRequest
BrokerTopicStat.getBrokerAllTopicStat.recordFailedProduceRequest
throw t
}
}
def handleFetchRequest(request: Receive): Option[Send] = {
val fetchRequest = FetchRequest.readFrom(request.buffer)
if(requestLogger.isTraceEnabled)
requestLogger.trace("Fetch request " + fetchRequest.toString)
Some(readMessageSet(fetchRequest))
}
def handleMultiFetchRequest(request: Receive): Option[Send] = {
val multiFetchRequest = MultiFetchRequest.readFrom(request.buffer)
if(requestLogger.isTraceEnabled)
requestLogger.trace("Multifetch request")
multiFetchRequest.fetches.foreach(req => requestLogger.trace(req.toString))
var responses = multiFetchRequest.fetches.map(fetch =>
readMessageSet(fetch)).toList
Some(new MultiMessageSetSend(responses))
}
private def readMessageSet(fetchRequest: FetchRequest): MessageSetSend = {
var response: MessageSetSend = null
try {
trace("Fetching log segment for topic, partition, offset, maxSize = " + fetchRequest)
val log = logManager.getLog(fetchRequest.topic, fetchRequest.partition)
if (log != null) {
response = new MessageSetSend(log.read(fetchRequest.offset, fetchRequest.maxSize))
BrokerTopicStat.getBrokerTopicStat(fetchRequest.topic).recordBytesOut(response.messages.sizeInBytes)
BrokerTopicStat.getBrokerAllTopicStat.recordBytesOut(response.messages.sizeInBytes)
}
else
response = new MessageSetSend()
}
catch {
case e =>
error("error when processing request " + fetchRequest, e)
BrokerTopicStat.getBrokerTopicStat(fetchRequest.topic).recordFailedFetchRequest
BrokerTopicStat.getBrokerAllTopicStat.recordFailedFetchRequest
response=new MessageSetSend(MessageSet.Empty, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
}
response
}
def handleOffsetRequest(request: Receive): Option[Send] = {
val offsetRequest = OffsetRequest.readFrom(request.buffer)
if(requestLogger.isTraceEnabled)
requestLogger.trace("Offset request " + offsetRequest.toString)
val offsets = logManager.getOffsets(offsetRequest)
val response = new OffsetArraySend(offsets)
Some(response)
}
}
trait BrokerTopicStatMBean {
def getMessagesIn: Long
def getBytesIn: Long
def getBytesOut: Long
def getFailedProduceRequest: Long
def getFailedFetchRequest: Long
}
@threadsafe
class BrokerTopicStat extends BrokerTopicStatMBean {
private val numCumulatedMessagesIn = new AtomicLong(0)
private val numCumulatedBytesIn = new AtomicLong(0)
private val numCumulatedBytesOut = new AtomicLong(0)
private val numCumulatedFailedProduceRequests = new AtomicLong(0)
private val numCumulatedFailedFetchRequests = new AtomicLong(0)
def getMessagesIn: Long = numCumulatedMessagesIn.get
def recordMessagesIn(nMessages: Int) = numCumulatedMessagesIn.getAndAdd(nMessages)
def getBytesIn: Long = numCumulatedBytesIn.get
def recordBytesIn(nBytes: Long) = numCumulatedBytesIn.getAndAdd(nBytes)
def getBytesOut: Long = numCumulatedBytesOut.get
def recordBytesOut(nBytes: Long) = numCumulatedBytesOut.getAndAdd(nBytes)
def recordFailedProduceRequest = numCumulatedFailedProduceRequests.getAndIncrement
def getFailedProduceRequest = numCumulatedFailedProduceRequests.get()
def recordFailedFetchRequest = numCumulatedFailedFetchRequests.getAndIncrement
def getFailedFetchRequest = numCumulatedFailedFetchRequests.get()
}
object BrokerTopicStat extends Logging {
private val stats = new Pool[String, BrokerTopicStat]
private val allTopicStat = new BrokerTopicStat
Utils.registerMBean(allTopicStat, "kafka:type=kafka.BrokerAllTopicStat")
def getBrokerAllTopicStat(): BrokerTopicStat = allTopicStat
def getBrokerTopicStat(topic: String): BrokerTopicStat = {
var stat = stats.get(topic)
if (stat == null) {
stat = new BrokerTopicStat
if (stats.putIfNotExists(topic, stat) == null)
Utils.registerMBean(stat, "kafka:type=kafka.BrokerTopicStat." + topic)
else
stat = stats.get(topic)
}
return stat
}
}