blob: 4f77d30273427538d038b6cdbf26446c79c7e91f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import java.nio.ByteBuffer
import java.lang.{Long => JLong, Short => JShort}
import java.util.Properties
import kafka.admin.{RackAwareMode, AdminUtils}
import kafka.api._
import kafka.cluster.Partition
import kafka.common._
import kafka.controller.KafkaController
import kafka.coordinator.{GroupCoordinator, JoinGroupResult}
import kafka.log._
import kafka.message.{ByteBufferMessageSet, Message, MessageSet}
import kafka.network._
import kafka.network.RequestChannel.{Session, Response}
import kafka.security.auth.{Authorizer, ClusterAction, Group, Create, Describe, Operation, Read, Resource, Topic, Write}
import kafka.utils.{Logging, SystemTime, ZKGroupTopicDirs, ZkUtils}
import org.apache.kafka.common.errors.{InvalidTopicException, NotLeaderForPartitionException, UnknownTopicOrPartitionException,
ClusterAuthorizationException}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
import org.apache.kafka.common.requests.{ListOffsetRequest, ListOffsetResponse, GroupCoordinatorRequest, GroupCoordinatorResponse, ListGroupsResponse,
DescribeGroupsRequest, DescribeGroupsResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse,
LeaveGroupRequest, LeaveGroupResponse, ResponseHeader, ResponseSend, SyncGroupRequest, SyncGroupResponse, LeaderAndIsrRequest, LeaderAndIsrResponse,
StopReplicaRequest, StopReplicaResponse, ProduceRequest, ProduceResponse, UpdateMetadataRequest, UpdateMetadataResponse,
MetadataRequest, MetadataResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetFetchRequest, OffsetFetchResponse}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{TopicPartition, Node}
import org.apache.kafka.common.internals.TopicConstants
import scala.collection._
import scala.collection.JavaConverters._
/**
* Logic to handle the various Kafka requests
*/
class KafkaApis(val requestChannel: RequestChannel,
val replicaManager: ReplicaManager,
val coordinator: GroupCoordinator,
val controller: KafkaController,
val zkUtils: ZkUtils,
val brokerId: Int,
val config: KafkaConfig,
val metadataCache: MetadataCache,
val metrics: Metrics,
val authorizer: Option[Authorizer]) extends Logging {
this.logIdent = "[KafkaApi-%d] ".format(brokerId)
// Store all the quota managers for each type of request
val quotaManagers: Map[Short, ClientQuotaManager] = instantiateQuotaManagers(config)
/**
* Top-level method that handles all requests and multiplexes to the right api
*/
def handle(request: RequestChannel.Request) {
try{
trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s".
format(request.requestObj, request.connectionId, request.securityProtocol, request.session.principal))
ApiKeys.forId(request.requestId) match {
case ApiKeys.PRODUCE => handleProducerRequest(request)
case ApiKeys.FETCH => handleFetchRequest(request)
case ApiKeys.LIST_OFFSETS => handleOffsetRequest(request)
case ApiKeys.METADATA => handleTopicMetadataRequest(request)
case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
case ApiKeys.UPDATE_METADATA_KEY => handleUpdateMetadataRequest(request)
case ApiKeys.CONTROLLED_SHUTDOWN_KEY => handleControlledShutdownRequest(request)
case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
case ApiKeys.GROUP_COORDINATOR => handleGroupCoordinatorRequest(request)
case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
case requestId => throw new KafkaException("Unknown api code " + requestId)
}
} catch {
case e: Throwable =>
if (request.requestObj != null) {
request.requestObj.handleError(e, requestChannel, request)
error("Error when handling request %s".format(request.requestObj), e)
} else {
val response = request.body.getErrorResponse(request.header.apiVersion, e)
val respHeader = new ResponseHeader(request.header.correlationId)
/* If request doesn't have a default error response, we just close the connection.
For example, when produce request has acks set to 0 */
if (response == null)
requestChannel.closeConnection(request.processor, request)
else
requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, response)))
error("Error when handling request %s".format(request.body), e)
}
} finally
request.apiLocalCompleteTimeMs = SystemTime.milliseconds
}
def handleLeaderAndIsrRequest(request: RequestChannel.Request) {
// ensureTopicExists is only for client facing requests
// We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
// stop serving data to clients for the topic being deleted
val correlationId = request.header.correlationId
val leaderAndIsrRequest = request.body.asInstanceOf[LeaderAndIsrRequest]
try {
def onLeadershipChange(updatedLeaders: Iterable[Partition], updatedFollowers: Iterable[Partition]) {
// for each new leader or follower, call coordinator to handle consumer group migration.
// this callback is invoked under the replica state change lock to ensure proper order of
// leadership changes
updatedLeaders.foreach { partition =>
if (partition.topic == TopicConstants.GROUP_METADATA_TOPIC_NAME)
coordinator.handleGroupImmigration(partition.partitionId)
}
updatedFollowers.foreach { partition =>
if (partition.topic == TopicConstants.GROUP_METADATA_TOPIC_NAME)
coordinator.handleGroupEmigration(partition.partitionId)
}
}
val responseHeader = new ResponseHeader(correlationId)
val leaderAndIsrResponse=
if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, metadataCache, onLeadershipChange)
new LeaderAndIsrResponse(result.errorCode, result.responseMap.mapValues(new JShort(_)).asJava)
} else {
val result = leaderAndIsrRequest.partitionStates.asScala.keys.map((_, new JShort(Errors.CLUSTER_AUTHORIZATION_FAILED.code))).toMap
new LeaderAndIsrResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.code, result.asJava)
}
requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, responseHeader, leaderAndIsrResponse)))
} catch {
case e: KafkaStorageException =>
fatal("Disk error during leadership change.", e)
Runtime.getRuntime.halt(1)
}
}
def handleStopReplicaRequest(request: RequestChannel.Request) {
// ensureTopicExists is only for client facing requests
// We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
// stop serving data to clients for the topic being deleted
val stopReplicaRequest = request.body.asInstanceOf[StopReplicaRequest]
val responseHeader = new ResponseHeader(request.header.correlationId)
val response =
if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
val (result, error) = replicaManager.stopReplicas(stopReplicaRequest)
new StopReplicaResponse(error, result.asInstanceOf[Map[TopicPartition, JShort]].asJava)
} else {
val result = stopReplicaRequest.partitions.asScala.map((_, new JShort(Errors.CLUSTER_AUTHORIZATION_FAILED.code))).toMap
new StopReplicaResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.code, result.asJava)
}
requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, response)))
replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads()
}
def handleUpdateMetadataRequest(request: RequestChannel.Request) {
val correlationId = request.header.correlationId
val updateMetadataRequest = request.body.asInstanceOf[UpdateMetadataRequest]
val updateMetadataResponse =
if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
replicaManager.maybeUpdateMetadataCache(correlationId, updateMetadataRequest, metadataCache)
new UpdateMetadataResponse(Errors.NONE.code)
} else {
new UpdateMetadataResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.code)
}
val responseHeader = new ResponseHeader(correlationId)
requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, responseHeader, updateMetadataResponse)))
}
def handleControlledShutdownRequest(request: RequestChannel.Request) {
// ensureTopicExists is only for client facing requests
// We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
// stop serving data to clients for the topic being deleted
val controlledShutdownRequest = request.requestObj.asInstanceOf[ControlledShutdownRequest]
authorizeClusterAction(request)
val partitionsRemaining = controller.shutdownBroker(controlledShutdownRequest.brokerId)
val controlledShutdownResponse = new ControlledShutdownResponse(controlledShutdownRequest.correlationId,
Errors.NONE.code, partitionsRemaining)
requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, controlledShutdownResponse)))
}
/**
* Handle an offset commit request
*/
def handleOffsetCommitRequest(request: RequestChannel.Request) {
val header = request.header
val offsetCommitRequest = request.body.asInstanceOf[OffsetCommitRequest]
// reject the request if not authorized to the group
if (!authorize(request.session, Read, new Resource(Group, offsetCommitRequest.groupId))) {
val errorCode = new JShort(Errors.GROUP_AUTHORIZATION_FAILED.code)
val results = offsetCommitRequest.offsetData.keySet.asScala.map { topicPartition =>
(topicPartition, errorCode)
}.toMap
val responseHeader = new ResponseHeader(header.correlationId)
val responseBody = new OffsetCommitResponse(results.asJava)
requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
} else {
// filter non-existent topics
val invalidRequestsInfo = offsetCommitRequest.offsetData.asScala.filter { case (topicPartition, _) =>
!metadataCache.contains(topicPartition.topic)
}
val filteredRequestInfo = offsetCommitRequest.offsetData.asScala.toMap -- invalidRequestsInfo.keys
val (authorizedRequestInfo, unauthorizedRequestInfo) = filteredRequestInfo.partition {
case (topicPartition, offsetMetadata) => authorize(request.session, Read, new Resource(Topic, topicPartition.topic))
}
// the callback for sending an offset commit response
def sendResponseCallback(commitStatus: immutable.Map[TopicPartition, Short]) {
val mergedCommitStatus = commitStatus ++ unauthorizedRequestInfo.mapValues(_ => Errors.TOPIC_AUTHORIZATION_FAILED.code)
mergedCommitStatus.foreach { case (topicPartition, errorCode) =>
if (errorCode != Errors.NONE.code) {
debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " +
s"on partition $topicPartition failed due to ${Errors.forCode(errorCode).exceptionName}")
}
}
val combinedCommitStatus = mergedCommitStatus.mapValues(new JShort(_)) ++ invalidRequestsInfo.map(_._1 -> new JShort(Errors.UNKNOWN_TOPIC_OR_PARTITION.code))
val responseHeader = new ResponseHeader(header.correlationId)
val responseBody = new OffsetCommitResponse(combinedCommitStatus.asJava)
requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
}
if (authorizedRequestInfo.isEmpty)
sendResponseCallback(Map.empty)
else if (header.apiVersion == 0) {
// for version 0 always store offsets to ZK
val responseInfo = authorizedRequestInfo.map {
case (topicPartition, partitionData) =>
val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicPartition.topic)
try {
if (!metadataCache.hasTopicMetadata(topicPartition.topic))
(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
else if (partitionData.metadata != null && partitionData.metadata.length > config.offsetMetadataMaxSize)
(topicPartition, Errors.OFFSET_METADATA_TOO_LARGE.code)
else {
zkUtils.updatePersistentPath(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}", partitionData.offset.toString)
(topicPartition, Errors.NONE.code)
}
} catch {
case e: Throwable => (topicPartition, Errors.forException(e).code)
}
}
sendResponseCallback(responseInfo)
} else {
// for version 1 and beyond store offsets in offset manager
// compute the retention time based on the request version:
// if it is v1 or not specified by user, we can use the default retention
val offsetRetention =
if (header.apiVersion <= 1 ||
offsetCommitRequest.retentionTime == OffsetCommitRequest.DEFAULT_RETENTION_TIME)
coordinator.offsetConfig.offsetsRetentionMs
else
offsetCommitRequest.retentionTime
// commit timestamp is always set to now.
// "default" expiration timestamp is now + retention (and retention may be overridden if v2)
// expire timestamp is computed differently for v1 and v2.
// - If v1 and no explicit commit timestamp is provided we use default expiration timestamp.
// - If v1 and explicit commit timestamp is provided we calculate retention from that explicit commit timestamp
// - If v2 we use the default expiration timestamp
val currentTimestamp = SystemTime.milliseconds
val defaultExpireTimestamp = offsetRetention + currentTimestamp
val partitionData = authorizedRequestInfo.mapValues { partitionData =>
val metadata = if (partitionData.metadata == null) OffsetMetadata.NoMetadata else partitionData.metadata;
new OffsetAndMetadata(
offsetMetadata = OffsetMetadata(partitionData.offset, metadata),
commitTimestamp = currentTimestamp,
expireTimestamp = {
if (partitionData.timestamp == OffsetCommitRequest.DEFAULT_TIMESTAMP)
defaultExpireTimestamp
else
offsetRetention + partitionData.timestamp
}
)
}
// call coordinator to handle commit offset
coordinator.handleCommitOffsets(
offsetCommitRequest.groupId,
offsetCommitRequest.memberId,
offsetCommitRequest.generationId,
partitionData,
sendResponseCallback)
}
}
}
private def authorize(session: Session, operation: Operation, resource: Resource): Boolean =
authorizer.map(_.authorize(session, operation, resource)).getOrElse(true)
/**
* Handle a produce request
*/
def handleProducerRequest(request: RequestChannel.Request) {
val produceRequest = request.body.asInstanceOf[ProduceRequest]
val numBytesAppended = request.header.sizeOf + produceRequest.sizeOf
val (authorizedRequestInfo, unauthorizedRequestInfo) = produceRequest.partitionRecords.asScala.partition {
case (topicPartition, _) => authorize(request.session, Write, new Resource(Topic, topicPartition.topic))
}
// the callback for sending a produce response
def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
val mergedResponseStatus = responseStatus ++ unauthorizedRequestInfo.mapValues(_ =>
new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1, Message.NoTimestamp))
var errorInResponse = false
mergedResponseStatus.foreach { case (topicPartition, status) =>
if (status.errorCode != Errors.NONE.code) {
errorInResponse = true
debug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(
request.header.correlationId,
request.header.clientId,
topicPartition,
Errors.forCode(status.errorCode).exceptionName))
}
}
def produceResponseCallback(delayTimeMs: Int) {
if (produceRequest.acks == 0) {
// no operation needed if producer request.required.acks = 0; however, if there is any error in handling
// the request, since no response is expected by the producer, the server will close socket server so that
// the producer client will know that some error has happened and will refresh its metadata
if (errorInResponse) {
val exceptionsSummary = mergedResponseStatus.map { case (topicPartition, status) =>
topicPartition -> Errors.forCode(status.errorCode).exceptionName
}.mkString(", ")
info(
s"Closing connection due to error during produce request with correlation id ${request.header.correlationId} " +
s"from client id ${request.header.clientId} with ack=0\n" +
s"Topic and partition to exceptions: $exceptionsSummary"
)
requestChannel.closeConnection(request.processor, request)
} else {
requestChannel.noOperation(request.processor, request)
}
} else {
val respHeader = new ResponseHeader(request.header.correlationId)
val respBody = request.header.apiVersion match {
case 0 => new ProduceResponse(mergedResponseStatus.asJava)
case version@ (1 | 2) => new ProduceResponse(mergedResponseStatus.asJava, delayTimeMs, version)
// This case shouldn't happen unless a new version of ProducerRequest is added without
// updating this part of the code to handle it properly.
case version => throw new IllegalArgumentException(s"Version `$version` of ProduceRequest is not handled. Code must be updated.")
}
requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, respHeader, respBody)))
}
}
// When this callback is triggered, the remote API call has completed
request.apiRemoteCompleteTimeMs = SystemTime.milliseconds
quotaManagers(ApiKeys.PRODUCE.id).recordAndMaybeThrottle(
request.header.clientId,
numBytesAppended,
produceResponseCallback)
}
if (authorizedRequestInfo.isEmpty)
sendResponseCallback(Map.empty)
else {
val internalTopicsAllowed = request.header.clientId == AdminUtils.AdminClientId
// Convert ByteBuffer to ByteBufferMessageSet
val authorizedMessagesPerPartition = authorizedRequestInfo.map {
case (topicPartition, buffer) => (topicPartition, new ByteBufferMessageSet(buffer))
}
// call the replica manager to append messages to the replicas
replicaManager.appendMessages(
produceRequest.timeout.toLong,
produceRequest.acks,
internalTopicsAllowed,
authorizedMessagesPerPartition,
sendResponseCallback)
// if the request is put into the purgatory, it will have a held reference
// and hence cannot be garbage collected; hence we clear its data here in
// order to let GC re-claim its memory since it is already appended to log
produceRequest.clearPartitionRecords()
}
}
/**
* Handle a fetch request
*/
def handleFetchRequest(request: RequestChannel.Request) {
val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]
val (authorizedRequestInfo, unauthorizedRequestInfo) = fetchRequest.requestInfo.partition {
case (topicAndPartition, _) => authorize(request.session, Read, new Resource(Topic, topicAndPartition.topic))
}
val unauthorizedPartitionData = unauthorizedRequestInfo.mapValues { _ =>
FetchResponsePartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1, MessageSet.Empty)
}
// the callback for sending a fetch response
def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, FetchResponsePartitionData]) {
val convertedPartitionData =
// Need to down-convert message when consumer only takes magic value 0.
if (fetchRequest.versionId <= 1) {
responsePartitionData.map { case (tp, data) =>
// We only do down-conversion when:
// 1. The message format version configured for the topic is using magic value > 0, and
// 2. The message set contains message whose magic > 0
// This is to reduce the message format conversion as much as possible. The conversion will only occur
// when new message format is used for the topic and we see an old request.
// Please note that if the message format is changed from a higher version back to lower version this
// test might break because some messages in new message format can be delivered to consumers before 0.10.0.0
// without format down conversion.
val convertedData = if (replicaManager.getMessageFormatVersion(tp).exists(_ > Message.MagicValue_V0) &&
!data.messages.isMagicValueInAllWrapperMessages(Message.MagicValue_V0)) {
trace(s"Down converting message to V0 for fetch request from ${fetchRequest.clientId}")
new FetchResponsePartitionData(data.error, data.hw, data.messages.asInstanceOf[FileMessageSet].toMessageFormat(Message.MagicValue_V0))
} else data
tp -> convertedData
}
} else responsePartitionData
val mergedPartitionData = convertedPartitionData ++ unauthorizedPartitionData
mergedPartitionData.foreach { case (topicAndPartition, data) =>
if (data.error != Errors.NONE.code)
debug(s"Fetch request with correlation id ${fetchRequest.correlationId} from client ${fetchRequest.clientId} " +
s"on partition $topicAndPartition failed due to ${Errors.forCode(data.error).exceptionName}")
// record the bytes out metrics only when the response is being sent
BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesOutRate.mark(data.messages.sizeInBytes)
BrokerTopicStats.getBrokerAllTopicsStats().bytesOutRate.mark(data.messages.sizeInBytes)
}
def fetchResponseCallback(delayTimeMs: Int) {
trace(s"Sending fetch response to client ${fetchRequest.clientId} of " +
s"${convertedPartitionData.values.map(_.messages.sizeInBytes).sum} bytes")
val response = FetchResponse(fetchRequest.correlationId, mergedPartitionData, fetchRequest.versionId, delayTimeMs)
requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, response)))
}
// When this callback is triggered, the remote API call has completed
request.apiRemoteCompleteTimeMs = SystemTime.milliseconds
// Do not throttle replication traffic
if (fetchRequest.isFromFollower) {
fetchResponseCallback(0)
} else {
quotaManagers(ApiKeys.FETCH.id).recordAndMaybeThrottle(fetchRequest.clientId,
FetchResponse.responseSize(mergedPartitionData.groupBy(_._1.topic),
fetchRequest.versionId),
fetchResponseCallback)
}
}
if (authorizedRequestInfo.isEmpty)
sendResponseCallback(Map.empty)
else {
// call the replica manager to fetch messages from the local replica
replicaManager.fetchMessages(
fetchRequest.maxWait.toLong,
fetchRequest.replicaId,
fetchRequest.minBytes,
authorizedRequestInfo,
sendResponseCallback)
}
}
/**
* Handle an offset request
*/
def handleOffsetRequest(request: RequestChannel.Request) {
val correlationId = request.header.correlationId
val clientId = request.header.clientId
val offsetRequest = request.body.asInstanceOf[ListOffsetRequest]
val (authorizedRequestInfo, unauthorizedRequestInfo) = offsetRequest.offsetData.asScala.partition {
case (topicPartition, _) => authorize(request.session, Describe, new Resource(Topic, topicPartition.topic))
}
val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ =>
new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code, List[JLong]().asJava)
)
val responseMap = authorizedRequestInfo.map(elem => {
val (topicPartition, partitionData) = elem
try {
// ensure leader exists
val localReplica = if (offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID)
replicaManager.getLeaderReplicaIfLocal(topicPartition.topic, topicPartition.partition)
else
replicaManager.getReplicaOrException(topicPartition.topic, topicPartition.partition)
val offsets = {
val allOffsets = fetchOffsets(replicaManager.logManager,
topicPartition,
partitionData.timestamp,
partitionData.maxNumOffsets)
if (offsetRequest.replicaId != ListOffsetRequest.CONSUMER_REPLICA_ID) {
allOffsets
} else {
val hw = localReplica.highWatermark.messageOffset
if (allOffsets.exists(_ > hw))
hw +: allOffsets.dropWhile(_ > hw)
else
allOffsets
}
}
(topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE.code, offsets.map(new JLong(_)).asJava))
} catch {
// NOTE: UnknownTopicOrPartitionException and NotLeaderForPartitionException are special cased since these error messages
// are typically transient and there is no value in logging the entire stack trace for the same
case utpe: UnknownTopicOrPartitionException =>
debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format(
correlationId, clientId, topicPartition, utpe.getMessage))
(topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(utpe).code, List[JLong]().asJava))
case nle: NotLeaderForPartitionException =>
debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format(
correlationId, clientId, topicPartition,nle.getMessage))
(topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(nle).code, List[JLong]().asJava))
case e: Throwable =>
error("Error while responding to offset request", e)
(topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e).code, List[JLong]().asJava))
}
})
val mergedResponseMap = responseMap ++ unauthorizedResponseStatus
val responseHeader = new ResponseHeader(correlationId)
val response = new ListOffsetResponse(mergedResponseMap.asJava)
requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, response)))
}
def fetchOffsets(logManager: LogManager, topicPartition: TopicPartition, timestamp: Long, maxNumOffsets: Int): Seq[Long] = {
logManager.getLog(TopicAndPartition(topicPartition.topic, topicPartition.partition)) match {
case Some(log) =>
fetchOffsetsBefore(log, timestamp, maxNumOffsets)
case None =>
if (timestamp == ListOffsetRequest.LATEST_TIMESTAMP || timestamp == ListOffsetRequest.EARLIEST_TIMESTAMP)
Seq(0L)
else
Nil
}
}
private def fetchOffsetsBefore(log: Log, timestamp: Long, maxNumOffsets: Int): Seq[Long] = {
val segsArray = log.logSegments.toArray
var offsetTimeArray: Array[(Long, Long)] = null
if (segsArray.last.size > 0)
offsetTimeArray = new Array[(Long, Long)](segsArray.length + 1)
else
offsetTimeArray = new Array[(Long, Long)](segsArray.length)
for(i <- 0 until segsArray.length)
offsetTimeArray(i) = (segsArray(i).baseOffset, segsArray(i).lastModified)
if (segsArray.last.size > 0)
offsetTimeArray(segsArray.length) = (log.logEndOffset, SystemTime.milliseconds)
var startIndex = -1
timestamp match {
case ListOffsetRequest.LATEST_TIMESTAMP =>
startIndex = offsetTimeArray.length - 1
case ListOffsetRequest.EARLIEST_TIMESTAMP =>
startIndex = 0
case _ =>
var isFound = false
debug("Offset time array = " + offsetTimeArray.foreach(o => "%d, %d".format(o._1, o._2)))
startIndex = offsetTimeArray.length - 1
while (startIndex >= 0 && !isFound) {
if (offsetTimeArray(startIndex)._2 <= timestamp)
isFound = true
else
startIndex -=1
}
}
val retSize = maxNumOffsets.min(startIndex + 1)
val ret = new Array[Long](retSize)
for(j <- 0 until retSize) {
ret(j) = offsetTimeArray(startIndex)._1
startIndex -= 1
}
// ensure that the returned seq is in descending order of offsets
ret.toSeq.sortBy(- _)
}
private def createTopic(topic: String,
numPartitions: Int,
replicationFactor: Int,
properties: Properties = new Properties()): MetadataResponse.TopicMetadata = {
try {
AdminUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, properties, RackAwareMode.Safe)
info("Auto creation of topic %s with %d partitions and replication factor %d is successful"
.format(topic, numPartitions, replicationFactor))
new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, java.util.Collections.emptyList())
} catch {
case e: TopicExistsException => // let it go, possibly another broker created this topic
new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, java.util.Collections.emptyList())
case itex: InvalidTopicException =>
new MetadataResponse.TopicMetadata(Errors.INVALID_TOPIC_EXCEPTION, topic, java.util.Collections.emptyList())
}
}
private def createGroupMetadataTopic(): MetadataResponse.TopicMetadata = {
val aliveBrokers = metadataCache.getAliveBrokers
val offsetsTopicReplicationFactor =
if (aliveBrokers.nonEmpty)
Math.min(config.offsetsTopicReplicationFactor.toInt, aliveBrokers.length)
else
config.offsetsTopicReplicationFactor.toInt
createTopic(TopicConstants.GROUP_METADATA_TOPIC_NAME, config.offsetsTopicPartitions,
offsetsTopicReplicationFactor, coordinator.offsetsTopicConfigs)
}
private def getOrCreateGroupMetadataTopic(securityProtocol: SecurityProtocol): MetadataResponse.TopicMetadata = {
val topicMetadata = metadataCache.getTopicMetadata(Set(TopicConstants.GROUP_METADATA_TOPIC_NAME), securityProtocol)
topicMetadata.headOption.getOrElse(createGroupMetadataTopic())
}
private def getTopicMetadata(topics: Set[String], securityProtocol: SecurityProtocol): Seq[MetadataResponse.TopicMetadata] = {
val topicResponses = metadataCache.getTopicMetadata(topics, securityProtocol)
if (topics.isEmpty || topicResponses.size == topics.size) {
topicResponses
} else {
val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet
val responsesForNonExistentTopics = nonExistentTopics.map { topic =>
if (topic == TopicConstants.GROUP_METADATA_TOPIC_NAME) {
createGroupMetadataTopic()
} else if (config.autoCreateTopicsEnable) {
createTopic(topic, config.numPartitions, config.defaultReplicationFactor)
} else {
new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, java.util.Collections.emptyList())
}
}
topicResponses ++ responsesForNonExistentTopics
}
}
/**
* Handle a topic metadata request
*/
def handleTopicMetadataRequest(request: RequestChannel.Request) {
val metadataRequest = request.body.asInstanceOf[MetadataRequest]
val topics = metadataRequest.topics.asScala.toSet
var (authorizedTopics, unauthorizedTopics) = if (metadataRequest.topics.isEmpty) {
//if topics is empty -> fetch all topics metadata but filter out the topic response that are not authorized
val authorized = metadataCache.getAllTopics()
.filter(topic => authorize(request.session, Describe, new Resource(Topic, topic)))
(authorized, mutable.Set[String]())
} else {
topics.partition(topic => authorize(request.session, Describe, new Resource(Topic, topic)))
}
if (authorizedTopics.nonEmpty) {
val nonExistingTopics = metadataCache.getNonExistingTopics(authorizedTopics)
if (config.autoCreateTopicsEnable && nonExistingTopics.nonEmpty) {
authorizer.foreach { az =>
if (!az.authorize(request.session, Create, Resource.ClusterResource)) {
authorizedTopics --= nonExistingTopics
unauthorizedTopics ++= nonExistingTopics
}
}
}
}
val unauthorizedTopicMetadata = unauthorizedTopics.map(topic =>
new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, java.util.Collections.emptyList()))
val topicMetadata = if (authorizedTopics.isEmpty)
Seq.empty[MetadataResponse.TopicMetadata]
else
getTopicMetadata(authorizedTopics, request.securityProtocol)
val brokers = metadataCache.getAliveBrokers
trace("Sending topic metadata %s and brokers %s for correlation id %d to client %s".format(topicMetadata.mkString(","),
brokers.mkString(","), request.header.correlationId, request.header.clientId))
val responseHeader = new ResponseHeader(request.header.correlationId)
val responseBody = new MetadataResponse(
brokers.map(_.getNode(request.securityProtocol)).asJava,
(topicMetadata ++ unauthorizedTopicMetadata).asJava
)
requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
}
/*
* Handle an offset fetch request
*/
def handleOffsetFetchRequest(request: RequestChannel.Request) {
val header = request.header
val offsetFetchRequest = request.body.asInstanceOf[OffsetFetchRequest]
val responseHeader = new ResponseHeader(header.correlationId)
val offsetFetchResponse =
// reject the request if not authorized to the group
if (!authorize(request.session, Read, new Resource(Group, offsetFetchRequest.groupId))) {
val unauthorizedGroupResponse = new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.GROUP_AUTHORIZATION_FAILED.code)
val results = offsetFetchRequest.partitions.asScala.map { topicPartition => (topicPartition, unauthorizedGroupResponse)}.toMap
new OffsetFetchResponse(results.asJava)
} else {
val (authorizedTopicPartitions, unauthorizedTopicPartitions) = offsetFetchRequest.partitions.asScala.partition { topicPartition =>
authorize(request.session, Describe, new Resource(Topic, topicPartition.topic))
}
val unauthorizedTopicResponse = new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.TOPIC_AUTHORIZATION_FAILED.code)
val unauthorizedStatus = unauthorizedTopicPartitions.map(topicPartition => (topicPartition, unauthorizedTopicResponse)).toMap
val unknownTopicPartitionResponse = new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
if (header.apiVersion == 0) {
// version 0 reads offsets from ZK
val responseInfo = authorizedTopicPartitions.map { topicPartition =>
val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, topicPartition.topic)
try {
if (!metadataCache.hasTopicMetadata(topicPartition.topic))
(topicPartition, unknownTopicPartitionResponse)
else {
val payloadOpt = zkUtils.readDataMaybeNull(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}")._1
payloadOpt match {
case Some(payload) =>
(topicPartition, new OffsetFetchResponse.PartitionData(payload.toLong, "", Errors.NONE.code))
case None =>
(topicPartition, unknownTopicPartitionResponse)
}
}
} catch {
case e: Throwable =>
(topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "",
Errors.forException(e).code))
}
}.toMap
new OffsetFetchResponse((responseInfo ++ unauthorizedStatus).asJava)
} else {
// version 1 reads offsets from Kafka;
val offsets = coordinator.handleFetchOffsets(offsetFetchRequest.groupId, authorizedTopicPartitions).toMap
// Note that we do not need to filter the partitions in the
// metadata cache as the topic partitions will be filtered
// in coordinator's offset manager through the offset cache
new OffsetFetchResponse((offsets ++ unauthorizedStatus).asJava)
}
}
trace(s"Sending offset fetch response $offsetFetchResponse for correlation id ${header.correlationId} to client ${header.clientId}.")
requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, responseHeader, offsetFetchResponse)))
}
def handleGroupCoordinatorRequest(request: RequestChannel.Request) {
val groupCoordinatorRequest = request.body.asInstanceOf[GroupCoordinatorRequest]
val responseHeader = new ResponseHeader(request.header.correlationId)
if (!authorize(request.session, Describe, new Resource(Group, groupCoordinatorRequest.groupId))) {
val responseBody = new GroupCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED.code, Node.noNode)
requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
} else {
val partition = coordinator.partitionFor(groupCoordinatorRequest.groupId)
// get metadata (and create the topic if necessary)
val offsetsTopicMetadata = getOrCreateGroupMetadataTopic(request.securityProtocol)
val responseBody = if (offsetsTopicMetadata.error != Errors.NONE) {
new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code, Node.noNode)
} else {
val coordinatorEndpoint = offsetsTopicMetadata.partitionMetadata().asScala
.find(_.partition == partition)
.map(_.leader())
coordinatorEndpoint match {
case Some(endpoint) if !endpoint.isEmpty =>
new GroupCoordinatorResponse(Errors.NONE.code, endpoint)
case _ =>
new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code, Node.noNode)
}
}
trace("Sending consumer metadata %s for correlation id %d to client %s."
.format(responseBody, request.header.correlationId, request.header.clientId))
requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
}
}
def handleDescribeGroupRequest(request: RequestChannel.Request) {
val describeRequest = request.body.asInstanceOf[DescribeGroupsRequest]
val responseHeader = new ResponseHeader(request.header.correlationId)
val groups = describeRequest.groupIds().asScala.map {
case groupId =>
if (!authorize(request.session, Describe, new Resource(Group, groupId))) {
groupId -> DescribeGroupsResponse.GroupMetadata.forError(Errors.GROUP_AUTHORIZATION_FAILED)
} else {
val (error, summary) = coordinator.handleDescribeGroup(groupId)
val members = summary.members.map { member =>
val metadata = ByteBuffer.wrap(member.metadata)
val assignment = ByteBuffer.wrap(member.assignment)
new DescribeGroupsResponse.GroupMember(member.memberId, member.clientId, member.clientHost, metadata, assignment)
}
groupId -> new DescribeGroupsResponse.GroupMetadata(error.code, summary.state, summary.protocolType,
summary.protocol, members.asJava)
}
}.toMap
val responseBody = new DescribeGroupsResponse(groups.asJava)
requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
}
def handleListGroupsRequest(request: RequestChannel.Request) {
val responseHeader = new ResponseHeader(request.header.correlationId)
val responseBody = if (!authorize(request.session, Describe, Resource.ClusterResource)) {
ListGroupsResponse.fromError(Errors.CLUSTER_AUTHORIZATION_FAILED)
} else {
val (error, groups) = coordinator.handleListGroups()
val allGroups = groups.map{ group => new ListGroupsResponse.Group(group.groupId, group.protocolType) }
new ListGroupsResponse(error.code, allGroups.asJava)
}
requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
}
def handleJoinGroupRequest(request: RequestChannel.Request) {
import JavaConversions._
val joinGroupRequest = request.body.asInstanceOf[JoinGroupRequest]
val responseHeader = new ResponseHeader(request.header.correlationId)
// the callback for sending a join-group response
def sendResponseCallback(joinResult: JoinGroupResult) {
val members = joinResult.members map { case (memberId, metadataArray) => (memberId, ByteBuffer.wrap(metadataArray)) }
val responseBody = new JoinGroupResponse(joinResult.errorCode, joinResult.generationId, joinResult.subProtocol,
joinResult.memberId, joinResult.leaderId, members)
trace("Sending join group response %s for correlation id %d to client %s."
.format(responseBody, request.header.correlationId, request.header.clientId))
requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
}
if (!authorize(request.session, Read, new Resource(Group, joinGroupRequest.groupId()))) {
val responseBody = new JoinGroupResponse(
Errors.GROUP_AUTHORIZATION_FAILED.code,
JoinGroupResponse.UNKNOWN_GENERATION_ID,
JoinGroupResponse.UNKNOWN_PROTOCOL,
JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId
JoinGroupResponse.UNKNOWN_MEMBER_ID, // leaderId
Map.empty[String, ByteBuffer])
requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
} else {
// let the coordinator to handle join-group
val protocols = joinGroupRequest.groupProtocols().map(protocol =>
(protocol.name, Utils.toArray(protocol.metadata))).toList
coordinator.handleJoinGroup(
joinGroupRequest.groupId,
joinGroupRequest.memberId,
request.header.clientId,
request.session.clientAddress.toString,
joinGroupRequest.sessionTimeout,
joinGroupRequest.protocolType,
protocols,
sendResponseCallback)
}
}
def handleSyncGroupRequest(request: RequestChannel.Request) {
import JavaConversions._
val syncGroupRequest = request.body.asInstanceOf[SyncGroupRequest]
def sendResponseCallback(memberState: Array[Byte], errorCode: Short) {
val responseBody = new SyncGroupResponse(errorCode, ByteBuffer.wrap(memberState))
val responseHeader = new ResponseHeader(request.header.correlationId)
requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
}
if (!authorize(request.session, Read, new Resource(Group, syncGroupRequest.groupId()))) {
sendResponseCallback(Array[Byte](), Errors.GROUP_AUTHORIZATION_FAILED.code)
} else {
coordinator.handleSyncGroup(
syncGroupRequest.groupId(),
syncGroupRequest.generationId(),
syncGroupRequest.memberId(),
syncGroupRequest.groupAssignment().mapValues(Utils.toArray(_)),
sendResponseCallback
)
}
}
def handleHeartbeatRequest(request: RequestChannel.Request) {
val heartbeatRequest = request.body.asInstanceOf[HeartbeatRequest]
val respHeader = new ResponseHeader(request.header.correlationId)
// the callback for sending a heartbeat response
def sendResponseCallback(errorCode: Short) {
val response = new HeartbeatResponse(errorCode)
trace("Sending heartbeat response %s for correlation id %d to client %s."
.format(response, request.header.correlationId, request.header.clientId))
requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, respHeader, response)))
}
if (!authorize(request.session, Read, new Resource(Group, heartbeatRequest.groupId))) {
val heartbeatResponse = new HeartbeatResponse(Errors.GROUP_AUTHORIZATION_FAILED.code)
requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, heartbeatResponse)))
}
else {
// let the coordinator to handle heartbeat
coordinator.handleHeartbeat(
heartbeatRequest.groupId(),
heartbeatRequest.memberId(),
heartbeatRequest.groupGenerationId(),
sendResponseCallback)
}
}
/*
* Returns a Map of all quota managers configured. The request Api key is the key for the Map
*/
private def instantiateQuotaManagers(cfg: KafkaConfig): Map[Short, ClientQuotaManager] = {
val producerQuotaManagerCfg = ClientQuotaManagerConfig(
quotaBytesPerSecondDefault = cfg.producerQuotaBytesPerSecondDefault,
numQuotaSamples = cfg.numQuotaSamples,
quotaWindowSizeSeconds = cfg.quotaWindowSizeSeconds
)
val consumerQuotaManagerCfg = ClientQuotaManagerConfig(
quotaBytesPerSecondDefault = cfg.consumerQuotaBytesPerSecondDefault,
numQuotaSamples = cfg.numQuotaSamples,
quotaWindowSizeSeconds = cfg.quotaWindowSizeSeconds
)
val quotaManagers = Map[Short, ClientQuotaManager](
ApiKeys.PRODUCE.id ->
new ClientQuotaManager(producerQuotaManagerCfg, metrics, ApiKeys.PRODUCE.name, new org.apache.kafka.common.utils.SystemTime),
ApiKeys.FETCH.id ->
new ClientQuotaManager(consumerQuotaManagerCfg, metrics, ApiKeys.FETCH.name, new org.apache.kafka.common.utils.SystemTime)
)
quotaManagers
}
def handleLeaveGroupRequest(request: RequestChannel.Request) {
val leaveGroupRequest = request.body.asInstanceOf[LeaveGroupRequest]
val respHeader = new ResponseHeader(request.header.correlationId)
// the callback for sending a leave-group response
def sendResponseCallback(errorCode: Short) {
val response = new LeaveGroupResponse(errorCode)
trace("Sending leave group response %s for correlation id %d to client %s."
.format(response, request.header.correlationId, request.header.clientId))
requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, respHeader, response)))
}
if (!authorize(request.session, Read, new Resource(Group, leaveGroupRequest.groupId))) {
val leaveGroupResponse = new LeaveGroupResponse(Errors.GROUP_AUTHORIZATION_FAILED.code)
requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, leaveGroupResponse)))
} else {
// let the coordinator to handle leave-group
coordinator.handleLeaveGroup(
leaveGroupRequest.groupId(),
leaveGroupRequest.memberId(),
sendResponseCallback)
}
}
def close() {
quotaManagers.foreach { case (apiKey, quotaManager) =>
quotaManager.shutdown()
}
info("Shutdown complete.")
}
def authorizeClusterAction(request: RequestChannel.Request): Unit = {
if (!authorize(request.session, ClusterAction, Resource.ClusterResource))
throw new ClusterAuthorizationException(s"Request $request is not authorized.")
}
}