blob: 929753df1110306718a531c54e7343c77d38ab1b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import java.nio.ByteBuffer
import java.lang.{Long => JLong, Short => JShort}
import java.util.{Collections, Properties}
import java.util
import kafka.admin.{AdminUtils, RackAwareMode}
import kafka.api.{ControlledShutdownRequest, ControlledShutdownResponse}
import kafka.cluster.Partition
import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
import kafka.common._
import kafka.controller.KafkaController
import kafka.coordinator.{GroupCoordinator, JoinGroupResult}
import kafka.log._
import kafka.network._
import kafka.network.RequestChannel.{Response, Session}
import kafka.security.auth
import kafka.security.auth.{Authorizer, ClusterAction, Create, Delete, Describe, Group, Operation, Read, Resource, Write}
import kafka.utils.{Logging, ZKGroupTopicDirs, ZkUtils}
import org.apache.kafka.common.errors.{ClusterAuthorizationException, NotLeaderForPartitionException, TopicExistsException, UnknownTopicOrPartitionException, UnsupportedForMessageFormatException}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.{ApiKeys, Errors, Protocol}
import org.apache.kafka.common.record.{MemoryRecords, Record}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{Node, TopicPartition}
import org.apache.kafka.common.requests.SaslHandshakeResponse
import scala.collection._
import scala.collection.JavaConverters._
/**
* Logic to handle the various Kafka requests
*/
class KafkaApis(val requestChannel: RequestChannel,
val replicaManager: ReplicaManager,
val adminManager: AdminManager,
val coordinator: GroupCoordinator,
val controller: KafkaController,
val zkUtils: ZkUtils,
val brokerId: Int,
val config: KafkaConfig,
val metadataCache: MetadataCache,
val metrics: Metrics,
val authorizer: Option[Authorizer],
val quotas: QuotaManagers,
val clusterId: String,
time: Time) extends Logging {
this.logIdent = "[KafkaApi-%d] ".format(brokerId)
/**
* Top-level method that handles all requests and multiplexes to the right api
*/
def handle(request: RequestChannel.Request) {
try {
trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s".
format(request.requestDesc(true), request.connectionId, request.securityProtocol, request.session.principal))
ApiKeys.forId(request.requestId) match {
case ApiKeys.PRODUCE => handleProducerRequest(request)
case ApiKeys.FETCH => handleFetchRequest(request)
case ApiKeys.LIST_OFFSETS => handleOffsetRequest(request)
case ApiKeys.METADATA => handleTopicMetadataRequest(request)
case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
case ApiKeys.UPDATE_METADATA_KEY => handleUpdateMetadataRequest(request)
case ApiKeys.CONTROLLED_SHUTDOWN_KEY => handleControlledShutdownRequest(request)
case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
case ApiKeys.GROUP_COORDINATOR => handleGroupCoordinatorRequest(request)
case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request)
case requestId => throw new KafkaException("Unknown api code " + requestId)
}
} catch {
case e: Throwable =>
if (request.requestObj != null) {
request.requestObj.handleError(e, requestChannel, request)
error("Error when handling request %s".format(request.requestObj), e)
} else {
val response = request.body.getErrorResponse(e)
/* If request doesn't have a default error response, we just close the connection.
For example, when produce request has acks set to 0 */
if (response == null)
requestChannel.closeConnection(request.processor, request)
else
requestChannel.sendResponse(new Response(request, response))
error("Error when handling request %s".format(request.body), e)
}
} finally
request.apiLocalCompleteTimeMs = time.milliseconds
}
def handleLeaderAndIsrRequest(request: RequestChannel.Request) {
// ensureTopicExists is only for client facing requests
// We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
// stop serving data to clients for the topic being deleted
val correlationId = request.header.correlationId
val leaderAndIsrRequest = request.body.asInstanceOf[LeaderAndIsrRequest]
try {
def onLeadershipChange(updatedLeaders: Iterable[Partition], updatedFollowers: Iterable[Partition]) {
// for each new leader or follower, call coordinator to handle consumer group migration.
// this callback is invoked under the replica state change lock to ensure proper order of
// leadership changes
updatedLeaders.foreach { partition =>
if (partition.topic == Topic.GroupMetadataTopicName)
coordinator.handleGroupImmigration(partition.partitionId)
}
updatedFollowers.foreach { partition =>
if (partition.topic == Topic.GroupMetadataTopicName)
coordinator.handleGroupEmigration(partition.partitionId)
}
}
val leaderAndIsrResponse =
if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, metadataCache, onLeadershipChange)
new LeaderAndIsrResponse(result.errorCode, result.responseMap.mapValues(new JShort(_)).asJava)
} else {
val result = leaderAndIsrRequest.partitionStates.asScala.keys.map((_, new JShort(Errors.CLUSTER_AUTHORIZATION_FAILED.code))).toMap
new LeaderAndIsrResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.code, result.asJava)
}
requestChannel.sendResponse(new Response(request, leaderAndIsrResponse))
} catch {
case e: KafkaStorageException =>
fatal("Disk error during leadership change.", e)
Runtime.getRuntime.halt(1)
}
}
def handleStopReplicaRequest(request: RequestChannel.Request) {
// ensureTopicExists is only for client facing requests
// We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
// stop serving data to clients for the topic being deleted
val stopReplicaRequest = request.body.asInstanceOf[StopReplicaRequest]
val response =
if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
val (result, error) = replicaManager.stopReplicas(stopReplicaRequest)
// Clearing out the cache for groups that belong to an offsets topic partition for which this broker was the leader,
// since this broker is no longer a replica for that offsets topic partition.
// This is required to handle the following scenario :
// Consider old replicas : {[1,2,3], Leader = 1} is reassigned to new replicas : {[2,3,4], Leader = 2}, broker 1 does not receive a LeaderAndIsr
// request to become a follower due to which cache for groups that belong to an offsets topic partition for which broker 1 was the leader,
// is not cleared.
result.foreach { case (topicPartition, errorCode) =>
if (errorCode == Errors.NONE.code && stopReplicaRequest.deletePartitions() && topicPartition.topic == Topic.GroupMetadataTopicName) {
coordinator.handleGroupEmigration(topicPartition.partition)
}
}
new StopReplicaResponse(error, result.asInstanceOf[Map[TopicPartition, JShort]].asJava)
} else {
val result = stopReplicaRequest.partitions.asScala.map((_, new JShort(Errors.CLUSTER_AUTHORIZATION_FAILED.code))).toMap
new StopReplicaResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.code, result.asJava)
}
requestChannel.sendResponse(new RequestChannel.Response(request, response))
replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads()
}
def handleUpdateMetadataRequest(request: RequestChannel.Request) {
val correlationId = request.header.correlationId
val updateMetadataRequest = request.body.asInstanceOf[UpdateMetadataRequest]
val updateMetadataResponse =
if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
val deletedPartitions = replicaManager.maybeUpdateMetadataCache(correlationId, updateMetadataRequest, metadataCache)
if (deletedPartitions.nonEmpty)
coordinator.handleDeletedPartitions(deletedPartitions)
if (adminManager.hasDelayedTopicOperations) {
updateMetadataRequest.partitionStates.keySet.asScala.map(_.topic).foreach { topic =>
adminManager.tryCompleteDelayedTopicOperations(topic)
}
}
new UpdateMetadataResponse(Errors.NONE.code)
} else {
new UpdateMetadataResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.code)
}
requestChannel.sendResponse(new Response(request, updateMetadataResponse))
}
def handleControlledShutdownRequest(request: RequestChannel.Request) {
// ensureTopicExists is only for client facing requests
// We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
// stop serving data to clients for the topic being deleted
val controlledShutdownRequest = request.requestObj.asInstanceOf[ControlledShutdownRequest]
authorizeClusterAction(request)
val partitionsRemaining = controller.shutdownBroker(controlledShutdownRequest.brokerId)
val controlledShutdownResponse = new ControlledShutdownResponse(controlledShutdownRequest.correlationId,
Errors.NONE.code, partitionsRemaining)
requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, controlledShutdownResponse)))
}
/**
* Handle an offset commit request
*/
def handleOffsetCommitRequest(request: RequestChannel.Request) {
val header = request.header
val offsetCommitRequest = request.body.asInstanceOf[OffsetCommitRequest]
// reject the request if not authorized to the group
if (!authorize(request.session, Read, new Resource(Group, offsetCommitRequest.groupId))) {
val errorCode = new JShort(Errors.GROUP_AUTHORIZATION_FAILED.code)
val results = offsetCommitRequest.offsetData.keySet.asScala.map { topicPartition =>
(topicPartition, errorCode)
}.toMap
val response = new OffsetCommitResponse(results.asJava)
requestChannel.sendResponse(new RequestChannel.Response(request, response))
} else {
val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = offsetCommitRequest.offsetData.asScala.toMap.partition {
case (topicPartition, _) => {
val authorizedForDescribe = authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic))
val exists = metadataCache.contains(topicPartition.topic)
if (!authorizedForDescribe && exists)
debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " +
s"on partition $topicPartition failing due to user not having DESCRIBE authorization, but returning UNKNOWN_TOPIC_OR_PARTITION")
authorizedForDescribe && exists
}
}
val (authorizedTopics, unauthorizedForReadTopics) = existingAndAuthorizedForDescribeTopics.partition {
case (topicPartition, _) => authorize(request.session, Read, new Resource(auth.Topic, topicPartition.topic))
}
// the callback for sending an offset commit response
def sendResponseCallback(commitStatus: immutable.Map[TopicPartition, Short]) {
val combinedCommitStatus = commitStatus.mapValues(new JShort(_)) ++
unauthorizedForReadTopics.mapValues(_ => new JShort(Errors.TOPIC_AUTHORIZATION_FAILED.code)) ++
nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ => new JShort(Errors.UNKNOWN_TOPIC_OR_PARTITION.code))
if (isDebugEnabled)
combinedCommitStatus.foreach { case (topicPartition, errorCode) =>
if (errorCode != Errors.NONE.code) {
debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " +
s"on partition $topicPartition failed due to ${Errors.forCode(errorCode).exceptionName}")
}
}
val response = new OffsetCommitResponse(combinedCommitStatus.asJava)
requestChannel.sendResponse(new RequestChannel.Response(request, response))
}
if (authorizedTopics.isEmpty)
sendResponseCallback(Map.empty)
else if (header.apiVersion == 0) {
// for version 0 always store offsets to ZK
val responseInfo = authorizedTopics.map {
case (topicPartition, partitionData) =>
val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicPartition.topic)
try {
if (partitionData.metadata != null && partitionData.metadata.length > config.offsetMetadataMaxSize)
(topicPartition, Errors.OFFSET_METADATA_TOO_LARGE.code)
else {
zkUtils.updatePersistentPath(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}", partitionData.offset.toString)
(topicPartition, Errors.NONE.code)
}
} catch {
case e: Throwable => (topicPartition, Errors.forException(e).code)
}
}
sendResponseCallback(responseInfo)
} else {
// for version 1 and beyond store offsets in offset manager
// compute the retention time based on the request version:
// if it is v1 or not specified by user, we can use the default retention
val offsetRetention =
if (header.apiVersion <= 1 ||
offsetCommitRequest.retentionTime == OffsetCommitRequest.DEFAULT_RETENTION_TIME)
coordinator.offsetConfig.offsetsRetentionMs
else
offsetCommitRequest.retentionTime
// commit timestamp is always set to now.
// "default" expiration timestamp is now + retention (and retention may be overridden if v2)
// expire timestamp is computed differently for v1 and v2.
// - If v1 and no explicit commit timestamp is provided we use default expiration timestamp.
// - If v1 and explicit commit timestamp is provided we calculate retention from that explicit commit timestamp
// - If v2 we use the default expiration timestamp
val currentTimestamp = time.milliseconds
val defaultExpireTimestamp = offsetRetention + currentTimestamp
val partitionData = authorizedTopics.mapValues { partitionData =>
val metadata = if (partitionData.metadata == null) OffsetMetadata.NoMetadata else partitionData.metadata
new OffsetAndMetadata(
offsetMetadata = OffsetMetadata(partitionData.offset, metadata),
commitTimestamp = currentTimestamp,
expireTimestamp = {
if (partitionData.timestamp == OffsetCommitRequest.DEFAULT_TIMESTAMP)
defaultExpireTimestamp
else
offsetRetention + partitionData.timestamp
}
)
}
// call coordinator to handle commit offset
coordinator.handleCommitOffsets(
offsetCommitRequest.groupId,
offsetCommitRequest.memberId,
offsetCommitRequest.generationId,
partitionData,
sendResponseCallback)
}
}
}
private def authorize(session: Session, operation: Operation, resource: Resource): Boolean =
authorizer.forall(_.authorize(session, operation, resource))
/**
* Handle a produce request
*/
def handleProducerRequest(request: RequestChannel.Request) {
val produceRequest = request.body.asInstanceOf[ProduceRequest]
val numBytesAppended = request.header.sizeOf + produceRequest.sizeOf
val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) =
produceRequest.partitionRecordsOrFail.asScala.partition { case (tp, _) =>
authorize(request.session, Describe, new Resource(auth.Topic, tp.topic)) && metadataCache.contains(tp.topic)
}
val (authorizedRequestInfo, unauthorizedForWriteRequestInfo) = existingAndAuthorizedForDescribeTopics.partition {
case (tp, _) => authorize(request.session, Write, new Resource(auth.Topic, tp.topic))
}
// the callback for sending a produce response
def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
val mergedResponseStatus = responseStatus ++
unauthorizedForWriteRequestInfo.mapValues(_ => new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED)) ++
nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ => new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION))
var errorInResponse = false
mergedResponseStatus.foreach { case (topicPartition, status) =>
if (status.error != Errors.NONE) {
errorInResponse = true
debug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(
request.header.correlationId,
request.header.clientId,
topicPartition,
status.error.exceptionName))
}
}
def produceResponseCallback(delayTimeMs: Int) {
if (produceRequest.acks == 0) {
// no operation needed if producer request.required.acks = 0; however, if there is any error in handling
// the request, since no response is expected by the producer, the server will close socket server so that
// the producer client will know that some error has happened and will refresh its metadata
if (errorInResponse) {
val exceptionsSummary = mergedResponseStatus.map { case (topicPartition, status) =>
topicPartition -> status.error.exceptionName
}.mkString(", ")
info(
s"Closing connection due to error during produce request with correlation id ${request.header.correlationId} " +
s"from client id ${request.header.clientId} with ack=0\n" +
s"Topic and partition to exceptions: $exceptionsSummary"
)
requestChannel.closeConnection(request.processor, request)
} else {
requestChannel.noOperation(request.processor, request)
}
} else {
val respBody = request.header.apiVersion match {
case 0 => new ProduceResponse(mergedResponseStatus.asJava)
case version@(1 | 2) => new ProduceResponse(mergedResponseStatus.asJava, delayTimeMs, version)
// This case shouldn't happen unless a new version of ProducerRequest is added without
// updating this part of the code to handle it properly.
case version => throw new IllegalArgumentException(s"Version `$version` of ProduceRequest is not handled. Code must be updated.")
}
requestChannel.sendResponse(new RequestChannel.Response(request, respBody))
}
}
// When this callback is triggered, the remote API call has completed
request.apiRemoteCompleteTimeMs = time.milliseconds
quotas.produce.recordAndMaybeThrottle(
request.session.sanitizedUser,
request.header.clientId,
numBytesAppended,
produceResponseCallback)
}
if (authorizedRequestInfo.isEmpty)
sendResponseCallback(Map.empty)
else {
val internalTopicsAllowed = request.header.clientId == AdminUtils.AdminClientId
// call the replica manager to append messages to the replicas
replicaManager.appendRecords(
produceRequest.timeout.toLong,
produceRequest.acks,
internalTopicsAllowed,
authorizedRequestInfo,
sendResponseCallback)
// if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected;
// hence we clear its data here inorder to let GC re-claim its memory since it is already appended to log
produceRequest.clearPartitionRecords()
}
}
/**
* Handle a fetch request
*/
def handleFetchRequest(request: RequestChannel.Request) {
val fetchRequest = request.body.asInstanceOf[FetchRequest]
val versionId = request.header.apiVersion
val clientId = request.header.clientId
val (clusterAuthorizedTopics, clusterUnauthorizedTopics) =
if (fetchRequest.isFromFollower() && !authorize(request.session, ClusterAction, Resource.ClusterResource)) {
(Seq.empty, fetchRequest.fetchData.asScala.toSeq)
} else {
(fetchRequest.fetchData.asScala.toSeq, Seq.empty)
}
val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = clusterAuthorizedTopics.partition {
case (tp, _) => authorize(request.session, Describe, new Resource(auth.Topic, tp.topic)) && metadataCache.contains(tp.topic)
}
val (authorizedRequestInfo, unauthorizedForReadRequestInfo) = existingAndAuthorizedForDescribeTopics.partition {
case (tp, _) => authorize(request.session, Read, new Resource(auth.Topic, tp.topic))
}
val clusterUnauthorizedPartitionData = clusterUnauthorizedTopics.map {
case (tp, _) => (tp, new FetchResponse.PartitionData(Errors.CLUSTER_AUTHORIZATION_FAILED.code, FetchResponse.INVALID_HIGHWATERMARK, MemoryRecords.EMPTY))
}
val nonExistingOrUnauthorizedForDescribePartitionData = nonExistingOrUnauthorizedForDescribeTopics.map {
case (tp, _) => (tp, new FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, FetchResponse.INVALID_HIGHWATERMARK, MemoryRecords.EMPTY))
}
val unauthorizedForReadPartitionData = unauthorizedForReadRequestInfo.map {
case (tp, _) => (tp, new FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code, FetchResponse.INVALID_HIGHWATERMARK, MemoryRecords.EMPTY))
}
// the callback for sending a fetch response
def sendResponseCallback(responsePartitionData: Seq[(TopicPartition, FetchPartitionData)]) {
val convertedPartitionData = {
responsePartitionData.map { case (tp, data) =>
// We only do down-conversion when:
// 1. The message format version configured for the topic is using magic value > 0, and
// 2. The message set contains message whose magic > 0
// This is to reduce the message format conversion as much as possible. The conversion will only occur
// when new message format is used for the topic and we see an old request.
// Please note that if the message format is changed from a higher version back to lower version this
// test might break because some messages in new message format can be delivered to consumers before 0.10.0.0
// without format down conversion.
val convertedData = if (versionId <= 1 && replicaManager.getMagic(tp).exists(_ > Record.MAGIC_VALUE_V0) &&
!data.records.hasMatchingShallowMagic(Record.MAGIC_VALUE_V0)) {
trace(s"Down converting message to V0 for fetch request from $clientId")
FetchPartitionData(data.error, data.hw, data.records.toMessageFormat(Record.MAGIC_VALUE_V0))
} else data
tp -> new FetchResponse.PartitionData(convertedData.error.code, convertedData.hw, convertedData.records)
}
}
val mergedPartitionData = convertedPartitionData ++ unauthorizedForReadPartitionData ++ nonExistingOrUnauthorizedForDescribePartitionData ++ clusterUnauthorizedPartitionData
val fetchedPartitionData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData]()
mergedPartitionData.foreach { case (topicPartition, data) =>
if (data.errorCode != Errors.NONE.code)
debug(s"Fetch request with correlation id ${request.header.correlationId} from client $clientId " +
s"on partition $topicPartition failed due to ${Errors.forCode(data.errorCode).exceptionName}")
fetchedPartitionData.put(topicPartition, data)
// record the bytes out metrics only when the response is being sent
BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).bytesOutRate.mark(data.records.sizeInBytes)
BrokerTopicStats.getBrokerAllTopicsStats().bytesOutRate.mark(data.records.sizeInBytes)
}
val response = new FetchResponse(versionId, fetchedPartitionData, 0)
def fetchResponseCallback(delayTimeMs: Int) {
trace(s"Sending fetch response to client $clientId of " +
s"${convertedPartitionData.map { case (_, v) => v.records.sizeInBytes }.sum} bytes")
val fetchResponse = if (delayTimeMs > 0) new FetchResponse(versionId, fetchedPartitionData, delayTimeMs) else response
requestChannel.sendResponse(new RequestChannel.Response(request, fetchResponse))
}
// When this callback is triggered, the remote API call has completed
request.apiRemoteCompleteTimeMs = time.milliseconds
if (fetchRequest.isFromFollower) {
// We've already evaluated against the quota and are good to go. Just need to record it now.
val responseSize = sizeOfThrottledPartitions(versionId, fetchRequest, mergedPartitionData, quotas.leader)
quotas.leader.record(responseSize)
fetchResponseCallback(0)
} else {
quotas.fetch.recordAndMaybeThrottle(request.session.sanitizedUser, clientId, response.sizeOf, fetchResponseCallback)
}
}
if (authorizedRequestInfo.isEmpty)
sendResponseCallback(Seq.empty)
else {
// call the replica manager to fetch messages from the local replica
replicaManager.fetchMessages(
fetchRequest.maxWait.toLong,
fetchRequest.replicaId,
fetchRequest.minBytes,
fetchRequest.maxBytes,
versionId <= 2,
authorizedRequestInfo,
replicationQuota(fetchRequest),
sendResponseCallback)
}
}
private def sizeOfThrottledPartitions(versionId: Short,
fetchRequest: FetchRequest,
mergedPartitionData: Seq[(TopicPartition, FetchResponse.PartitionData)],
quota: ReplicationQuotaManager): Int = {
val partitionData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData]()
mergedPartitionData.foreach { case (tp, data) =>
if (quota.isThrottled(tp))
partitionData.put(tp, data)
}
FetchResponse.sizeOf(versionId, partitionData)
}
def replicationQuota(fetchRequest: FetchRequest): ReplicaQuota =
if (fetchRequest.isFromFollower) quotas.leader else UnboundedQuota
/**
* Handle an offset request
*/
def handleOffsetRequest(request: RequestChannel.Request) {
val version = request.header.apiVersion()
val mergedResponseMap =
if (version == 0)
handleOffsetRequestV0(request)
else
handleOffsetRequestV1(request)
val response = new ListOffsetResponse(mergedResponseMap.asJava, version)
requestChannel.sendResponse(new RequestChannel.Response(request, response))
}
private def handleOffsetRequestV0(request : RequestChannel.Request) : Map[TopicPartition, ListOffsetResponse.PartitionData] = {
val correlationId = request.header.correlationId
val clientId = request.header.clientId
val offsetRequest = request.body.asInstanceOf[ListOffsetRequest]
val (authorizedRequestInfo, unauthorizedRequestInfo) = offsetRequest.offsetData.asScala.partition {
case (topicPartition, _) => authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic))
}
val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ =>
new ListOffsetResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, List[JLong]().asJava)
)
val responseMap = authorizedRequestInfo.map {case (topicPartition, partitionData) =>
try {
// ensure leader exists
val localReplica = if (offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID)
replicaManager.getLeaderReplicaIfLocal(topicPartition)
else
replicaManager.getReplicaOrException(topicPartition)
val offsets = {
val allOffsets = fetchOffsets(replicaManager.logManager,
topicPartition,
partitionData.timestamp,
partitionData.maxNumOffsets)
if (offsetRequest.replicaId != ListOffsetRequest.CONSUMER_REPLICA_ID) {
allOffsets
} else {
val hw = localReplica.highWatermark.messageOffset
if (allOffsets.exists(_ > hw))
hw +: allOffsets.dropWhile(_ > hw)
else
allOffsets
}
}
(topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE.code, offsets.map(new JLong(_)).asJava))
} catch {
// NOTE: UnknownTopicOrPartitionException and NotLeaderForPartitionException are special cased since these error messages
// are typically transient and there is no value in logging the entire stack trace for the same
case e @ ( _ : UnknownTopicOrPartitionException | _ : NotLeaderForPartitionException) =>
debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format(
correlationId, clientId, topicPartition, e.getMessage))
(topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e).code, List[JLong]().asJava))
case e: Throwable =>
error("Error while responding to offset request", e)
(topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e).code, List[JLong]().asJava))
}
}
responseMap ++ unauthorizedResponseStatus
}
private def handleOffsetRequestV1(request : RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData] = {
val correlationId = request.header.correlationId
val clientId = request.header.clientId
val offsetRequest = request.body.asInstanceOf[ListOffsetRequest]
val (authorizedRequestInfo, unauthorizedRequestInfo) = offsetRequest.partitionTimestamps.asScala.partition {
case (topicPartition, _) => authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic))
}
val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => {
new ListOffsetResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION.code,
ListOffsetResponse.UNKNOWN_TIMESTAMP,
ListOffsetResponse.UNKNOWN_OFFSET)
})
val responseMap = authorizedRequestInfo.map { case (topicPartition, timestamp) =>
if (offsetRequest.duplicatePartitions().contains(topicPartition)) {
debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition $topicPartition " +
s"failed because the partition is duplicated in the request.")
(topicPartition, new ListOffsetResponse.PartitionData(Errors.INVALID_REQUEST.code,
ListOffsetResponse.UNKNOWN_TIMESTAMP,
ListOffsetResponse.UNKNOWN_OFFSET))
} else {
try {
val fromConsumer = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID
// ensure leader exists
val localReplica = if (offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID)
replicaManager.getLeaderReplicaIfLocal(topicPartition)
else
replicaManager.getReplicaOrException(topicPartition)
val found = {
if (fromConsumer && timestamp == ListOffsetRequest.LATEST_TIMESTAMP)
TimestampOffset(Record.NO_TIMESTAMP, localReplica.highWatermark.messageOffset)
else {
def allowed(timestampOffset: TimestampOffset): Boolean =
!fromConsumer || timestampOffset.offset <= localReplica.highWatermark.messageOffset
fetchOffsetForTimestamp(replicaManager.logManager, topicPartition, timestamp) match {
case Some(timestampOffset) if allowed(timestampOffset) => timestampOffset
case _ => TimestampOffset(ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET)
}
}
}
(topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE.code, found.timestamp, found.offset))
} catch {
// NOTE: These exceptions are special cased since these error messages are typically transient or the client
// would have received a clear exception and there is no value in logging the entire stack trace for the same
case e @ (_ : UnknownTopicOrPartitionException |
_ : NotLeaderForPartitionException |
_ : UnsupportedForMessageFormatException) =>
debug(s"Offset request with correlation id $correlationId from client $clientId on " +
s"partition $topicPartition failed due to ${e.getMessage}")
(topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e).code,
ListOffsetResponse.UNKNOWN_TIMESTAMP,
ListOffsetResponse.UNKNOWN_OFFSET))
case e: Throwable =>
error("Error while responding to offset request", e)
(topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e).code,
ListOffsetResponse.UNKNOWN_TIMESTAMP,
ListOffsetResponse.UNKNOWN_OFFSET))
}
}
}
responseMap ++ unauthorizedResponseStatus
}
def fetchOffsets(logManager: LogManager, topicPartition: TopicPartition, timestamp: Long, maxNumOffsets: Int): Seq[Long] = {
logManager.getLog(topicPartition) match {
case Some(log) =>
fetchOffsetsBefore(log, timestamp, maxNumOffsets)
case None =>
if (timestamp == ListOffsetRequest.LATEST_TIMESTAMP || timestamp == ListOffsetRequest.EARLIEST_TIMESTAMP)
Seq(0L)
else
Nil
}
}
private def fetchOffsetForTimestamp(logManager: LogManager, topicPartition: TopicPartition, timestamp: Long) : Option[TimestampOffset] = {
logManager.getLog(topicPartition) match {
case Some(log) =>
log.fetchOffsetsByTimestamp(timestamp)
case None =>
throw new UnknownTopicOrPartitionException(s"$topicPartition does not exist on the broker.")
}
}
private[server] def fetchOffsetsBefore(log: Log, timestamp: Long, maxNumOffsets: Int): Seq[Long] = {
// Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides
// constant time access while being safe to use with concurrent collections unlike `toArray`.
val segments = log.logSegments.toBuffer
val lastSegmentHasSize = segments.last.size > 0
val offsetTimeArray =
if (lastSegmentHasSize)
new Array[(Long, Long)](segments.length + 1)
else
new Array[(Long, Long)](segments.length)
for (i <- segments.indices)
offsetTimeArray(i) = (segments(i).baseOffset, segments(i).lastModified)
if (lastSegmentHasSize)
offsetTimeArray(segments.length) = (log.logEndOffset, time.milliseconds)
var startIndex = -1
timestamp match {
case ListOffsetRequest.LATEST_TIMESTAMP =>
startIndex = offsetTimeArray.length - 1
case ListOffsetRequest.EARLIEST_TIMESTAMP =>
startIndex = 0
case _ =>
var isFound = false
debug("Offset time array = " + offsetTimeArray.foreach(o => "%d, %d".format(o._1, o._2)))
startIndex = offsetTimeArray.length - 1
while (startIndex >= 0 && !isFound) {
if (offsetTimeArray(startIndex)._2 <= timestamp)
isFound = true
else
startIndex -= 1
}
}
val retSize = maxNumOffsets.min(startIndex + 1)
val ret = new Array[Long](retSize)
for (j <- 0 until retSize) {
ret(j) = offsetTimeArray(startIndex)._1
startIndex -= 1
}
// ensure that the returned seq is in descending order of offsets
ret.toSeq.sortBy(-_)
}
private def createTopic(topic: String,
numPartitions: Int,
replicationFactor: Int,
properties: Properties = new Properties()): MetadataResponse.TopicMetadata = {
try {
AdminUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, properties, RackAwareMode.Safe)
info("Auto creation of topic %s with %d partitions and replication factor %d is successful"
.format(topic, numPartitions, replicationFactor))
new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, Topic.isInternal(topic),
java.util.Collections.emptyList())
} catch {
case _: TopicExistsException => // let it go, possibly another broker created this topic
new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, Topic.isInternal(topic),
java.util.Collections.emptyList())
case ex: Throwable => // Catch all to prevent unhandled errors
new MetadataResponse.TopicMetadata(Errors.forException(ex), topic, Topic.isInternal(topic),
java.util.Collections.emptyList())
}
}
private def createGroupMetadataTopic(): MetadataResponse.TopicMetadata = {
val aliveBrokers = metadataCache.getAliveBrokers
val offsetsTopicReplicationFactor =
if (aliveBrokers.nonEmpty)
Math.min(config.offsetsTopicReplicationFactor.toInt, aliveBrokers.length)
else
config.offsetsTopicReplicationFactor.toInt
createTopic(Topic.GroupMetadataTopicName, config.offsetsTopicPartitions,
offsetsTopicReplicationFactor, coordinator.offsetsTopicConfigs)
}
private def getOrCreateGroupMetadataTopic(listenerName: ListenerName): MetadataResponse.TopicMetadata = {
val topicMetadata = metadataCache.getTopicMetadata(Set(Topic.GroupMetadataTopicName), listenerName)
topicMetadata.headOption.getOrElse(createGroupMetadataTopic())
}
private def getTopicMetadata(topics: Set[String], listenerName: ListenerName, errorUnavailableEndpoints: Boolean): Seq[MetadataResponse.TopicMetadata] = {
val topicResponses = metadataCache.getTopicMetadata(topics, listenerName, errorUnavailableEndpoints)
if (topics.isEmpty || topicResponses.size == topics.size) {
topicResponses
} else {
val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet
val responsesForNonExistentTopics = nonExistentTopics.map { topic =>
if (topic == Topic.GroupMetadataTopicName) {
createGroupMetadataTopic()
} else if (config.autoCreateTopicsEnable) {
createTopic(topic, config.numPartitions, config.defaultReplicationFactor)
} else {
new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, false,
java.util.Collections.emptyList())
}
}
topicResponses ++ responsesForNonExistentTopics
}
}
/**
* Handle a topic metadata request
*/
def handleTopicMetadataRequest(request: RequestChannel.Request) {
val metadataRequest = request.body.asInstanceOf[MetadataRequest]
val requestVersion = request.header.apiVersion()
val topics =
// Handle old metadata request logic. Version 0 has no way to specify "no topics".
if (requestVersion == 0) {
if (metadataRequest.topics() == null || metadataRequest.topics().isEmpty)
metadataCache.getAllTopics()
else
metadataRequest.topics.asScala.toSet
} else {
if (metadataRequest.isAllTopics)
metadataCache.getAllTopics()
else
metadataRequest.topics.asScala.toSet
}
var (authorizedTopics, unauthorizedForDescribeTopics) =
topics.partition(topic => authorize(request.session, Describe, new Resource(auth.Topic, topic)))
var unauthorizedForCreateTopics = Set[String]()
if (authorizedTopics.nonEmpty) {
val nonExistingTopics = metadataCache.getNonExistingTopics(authorizedTopics)
if (config.autoCreateTopicsEnable && nonExistingTopics.nonEmpty) {
if (!authorize(request.session, Create, Resource.ClusterResource)) {
authorizedTopics --= nonExistingTopics
unauthorizedForCreateTopics ++= nonExistingTopics
}
}
}
val unauthorizedForCreateTopicMetadata = unauthorizedForCreateTopics.map(topic =>
new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, Topic.isInternal(topic),
java.util.Collections.emptyList()))
// do not disclose the existence of topics unauthorized for Describe, so we've not even checked if they exist or not
val unauthorizedForDescribeTopicMetadata =
// In case of all topics, don't include topics unauthorized for Describe
if ((requestVersion == 0 && (metadataRequest.topics == null || metadataRequest.topics.isEmpty)) || metadataRequest.isAllTopics)
Set.empty[MetadataResponse.TopicMetadata]
else
unauthorizedForDescribeTopics.map(topic =>
new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, false, java.util.Collections.emptyList()))
// In version 0, we returned an error when brokers with replicas were unavailable,
// while in higher versions we simply don't include the broker in the returned broker list
val errorUnavailableEndpoints = requestVersion == 0
val topicMetadata =
if (authorizedTopics.isEmpty)
Seq.empty[MetadataResponse.TopicMetadata]
else
getTopicMetadata(authorizedTopics, request.listenerName, errorUnavailableEndpoints)
val completeTopicMetadata = topicMetadata ++ unauthorizedForCreateTopicMetadata ++ unauthorizedForDescribeTopicMetadata
val brokers = metadataCache.getAliveBrokers
trace("Sending topic metadata %s and brokers %s for correlation id %d to client %s".format(completeTopicMetadata.mkString(","),
brokers.mkString(","), request.header.correlationId, request.header.clientId))
val responseBody = new MetadataResponse(
brokers.map(_.getNode(request.listenerName)).asJava,
clusterId,
metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID),
completeTopicMetadata.asJava,
requestVersion
)
requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
}
/**
* Handle an offset fetch request
*/
def handleOffsetFetchRequest(request: RequestChannel.Request) {
val header = request.header
val offsetFetchRequest = request.body.asInstanceOf[OffsetFetchRequest]
def authorizeTopicDescribe(partition: TopicPartition) =
authorize(request.session, Describe, new Resource(auth.Topic, partition.topic))
val offsetFetchResponse =
// reject the request if not authorized to the group
if (!authorize(request.session, Read, new Resource(Group, offsetFetchRequest.groupId)))
offsetFetchRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED)
else {
if (header.apiVersion == 0) {
val (authorizedPartitions, unauthorizedPartitions) = offsetFetchRequest.partitions.asScala
.partition(authorizeTopicDescribe)
// version 0 reads offsets from ZK
val authorizedPartitionData = authorizedPartitions.map { topicPartition =>
val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, topicPartition.topic)
try {
if (!metadataCache.contains(topicPartition.topic))
(topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION)
else {
val payloadOpt = zkUtils.readDataMaybeNull(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}")._1
payloadOpt match {
case Some(payload) =>
(topicPartition, new OffsetFetchResponse.PartitionData(
payload.toLong, OffsetFetchResponse.NO_METADATA, Errors.NONE))
case None =>
(topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION)
}
}
} catch {
case e: Throwable =>
(topicPartition, new OffsetFetchResponse.PartitionData(
OffsetFetchResponse.INVALID_OFFSET, OffsetFetchResponse.NO_METADATA, Errors.forException(e)))
}
}.toMap
val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNKNOWN_PARTITION).toMap
new OffsetFetchResponse(Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava, header.apiVersion)
} else {
// versions 1 and above read offsets from Kafka
if (offsetFetchRequest.isAllPartitions) {
val (error, allPartitionData) = coordinator.handleFetchOffsets(offsetFetchRequest.groupId)
if (error != Errors.NONE)
offsetFetchRequest.getErrorResponse(error)
else {
// clients are not allowed to see offsets for topics that are not authorized for Describe
val authorizedPartitionData = allPartitionData.filter { case (topicPartition, _) => authorizeTopicDescribe(topicPartition) }
new OffsetFetchResponse(Errors.NONE, authorizedPartitionData.asJava, header.apiVersion)
}
} else {
val (authorizedPartitions, unauthorizedPartitions) = offsetFetchRequest.partitions.asScala
.partition(authorizeTopicDescribe)
val (error, authorizedPartitionData) = coordinator.handleFetchOffsets(offsetFetchRequest.groupId,
Some(authorizedPartitions))
if (error != Errors.NONE)
offsetFetchRequest.getErrorResponse(error)
else {
val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNKNOWN_PARTITION).toMap
new OffsetFetchResponse(Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava, header.apiVersion)
}
}
}
}
trace(s"Sending offset fetch response $offsetFetchResponse for correlation id ${header.correlationId} to client ${header.clientId}.")
requestChannel.sendResponse(new Response(request, offsetFetchResponse))
}
def handleGroupCoordinatorRequest(request: RequestChannel.Request) {
val groupCoordinatorRequest = request.body.asInstanceOf[GroupCoordinatorRequest]
if (!authorize(request.session, Describe, new Resource(Group, groupCoordinatorRequest.groupId))) {
val responseBody = new GroupCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED.code, Node.noNode)
requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
} else {
val partition = coordinator.partitionFor(groupCoordinatorRequest.groupId)
// get metadata (and create the topic if necessary)
val offsetsTopicMetadata = getOrCreateGroupMetadataTopic(request.listenerName)
val responseBody = if (offsetsTopicMetadata.error != Errors.NONE) {
new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code, Node.noNode)
} else {
val coordinatorEndpoint = offsetsTopicMetadata.partitionMetadata().asScala
.find(_.partition == partition)
.map(_.leader())
coordinatorEndpoint match {
case Some(endpoint) if !endpoint.isEmpty =>
new GroupCoordinatorResponse(Errors.NONE.code, endpoint)
case _ =>
new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code, Node.noNode)
}
}
trace("Sending consumer metadata %s for correlation id %d to client %s."
.format(responseBody, request.header.correlationId, request.header.clientId))
requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
}
}
def handleDescribeGroupRequest(request: RequestChannel.Request) {
val describeRequest = request.body.asInstanceOf[DescribeGroupsRequest]
val groups = describeRequest.groupIds().asScala.map { groupId =>
if (!authorize(request.session, Describe, new Resource(Group, groupId))) {
groupId -> DescribeGroupsResponse.GroupMetadata.forError(Errors.GROUP_AUTHORIZATION_FAILED)
} else {
val (error, summary) = coordinator.handleDescribeGroup(groupId)
val members = summary.members.map { member =>
val metadata = ByteBuffer.wrap(member.metadata)
val assignment = ByteBuffer.wrap(member.assignment)
new DescribeGroupsResponse.GroupMember(member.memberId, member.clientId, member.clientHost, metadata, assignment)
}
groupId -> new DescribeGroupsResponse.GroupMetadata(error.code, summary.state, summary.protocolType,
summary.protocol, members.asJava)
}
}.toMap
val responseBody = new DescribeGroupsResponse(groups.asJava)
requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
}
def handleListGroupsRequest(request: RequestChannel.Request) {
val responseBody = if (!authorize(request.session, Describe, Resource.ClusterResource)) {
ListGroupsResponse.fromError(Errors.CLUSTER_AUTHORIZATION_FAILED)
} else {
val (error, groups) = coordinator.handleListGroups()
val allGroups = groups.map { group => new ListGroupsResponse.Group(group.groupId, group.protocolType) }
new ListGroupsResponse(error.code, allGroups.asJava)
}
requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
}
def handleJoinGroupRequest(request: RequestChannel.Request) {
val joinGroupRequest = request.body.asInstanceOf[JoinGroupRequest]
// the callback for sending a join-group response
def sendResponseCallback(joinResult: JoinGroupResult) {
val members = joinResult.members map { case (memberId, metadataArray) => (memberId, ByteBuffer.wrap(metadataArray)) }
val responseBody = new JoinGroupResponse(request.header.apiVersion, joinResult.errorCode, joinResult.generationId,
joinResult.subProtocol, joinResult.memberId, joinResult.leaderId, members.asJava)
trace("Sending join group response %s for correlation id %d to client %s."
.format(responseBody, request.header.correlationId, request.header.clientId))
requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
}
if (!authorize(request.session, Read, new Resource(Group, joinGroupRequest.groupId()))) {
val responseBody = new JoinGroupResponse(
request.header.apiVersion,
Errors.GROUP_AUTHORIZATION_FAILED.code,
JoinGroupResponse.UNKNOWN_GENERATION_ID,
JoinGroupResponse.UNKNOWN_PROTOCOL,
JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId
JoinGroupResponse.UNKNOWN_MEMBER_ID, // leaderId
Collections.emptyMap())
requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
} else {
// let the coordinator to handle join-group
val protocols = joinGroupRequest.groupProtocols().asScala.map(protocol =>
(protocol.name, Utils.toArray(protocol.metadata))).toList
coordinator.handleJoinGroup(
joinGroupRequest.groupId,
joinGroupRequest.memberId,
request.header.clientId,
request.session.clientAddress.toString,
joinGroupRequest.rebalanceTimeout,
joinGroupRequest.sessionTimeout,
joinGroupRequest.protocolType,
protocols,
sendResponseCallback)
}
}
def handleSyncGroupRequest(request: RequestChannel.Request) {
val syncGroupRequest = request.body.asInstanceOf[SyncGroupRequest]
def sendResponseCallback(memberState: Array[Byte], errorCode: Short) {
val responseBody = new SyncGroupResponse(errorCode, ByteBuffer.wrap(memberState))
requestChannel.sendResponse(new Response(request, responseBody))
}
if (!authorize(request.session, Read, new Resource(Group, syncGroupRequest.groupId()))) {
sendResponseCallback(Array[Byte](), Errors.GROUP_AUTHORIZATION_FAILED.code)
} else {
coordinator.handleSyncGroup(
syncGroupRequest.groupId(),
syncGroupRequest.generationId(),
syncGroupRequest.memberId(),
syncGroupRequest.groupAssignment().asScala.mapValues(Utils.toArray),
sendResponseCallback
)
}
}
def handleHeartbeatRequest(request: RequestChannel.Request) {
val heartbeatRequest = request.body.asInstanceOf[HeartbeatRequest]
// the callback for sending a heartbeat response
def sendResponseCallback(errorCode: Short) {
val response = new HeartbeatResponse(errorCode)
trace("Sending heartbeat response %s for correlation id %d to client %s."
.format(response, request.header.correlationId, request.header.clientId))
requestChannel.sendResponse(new RequestChannel.Response(request, response))
}
if (!authorize(request.session, Read, new Resource(Group, heartbeatRequest.groupId))) {
val heartbeatResponse = new HeartbeatResponse(Errors.GROUP_AUTHORIZATION_FAILED.code)
requestChannel.sendResponse(new Response(request, heartbeatResponse))
}
else {
// let the coordinator to handle heartbeat
coordinator.handleHeartbeat(
heartbeatRequest.groupId(),
heartbeatRequest.memberId(),
heartbeatRequest.groupGenerationId(),
sendResponseCallback)
}
}
def handleLeaveGroupRequest(request: RequestChannel.Request) {
val leaveGroupRequest = request.body.asInstanceOf[LeaveGroupRequest]
// the callback for sending a leave-group response
def sendResponseCallback(errorCode: Short) {
val response = new LeaveGroupResponse(errorCode)
trace("Sending leave group response %s for correlation id %d to client %s."
.format(response, request.header.correlationId, request.header.clientId))
requestChannel.sendResponse(new RequestChannel.Response(request, response))
}
if (!authorize(request.session, Read, new Resource(Group, leaveGroupRequest.groupId))) {
val leaveGroupResponse = new LeaveGroupResponse(Errors.GROUP_AUTHORIZATION_FAILED.code)
requestChannel.sendResponse(new Response(request, leaveGroupResponse))
} else {
// let the coordinator to handle leave-group
coordinator.handleLeaveGroup(
leaveGroupRequest.groupId(),
leaveGroupRequest.memberId(),
sendResponseCallback)
}
}
def handleSaslHandshakeRequest(request: RequestChannel.Request) {
val response = new SaslHandshakeResponse(Errors.ILLEGAL_SASL_STATE.code, config.saslEnabledMechanisms)
requestChannel.sendResponse(new RequestChannel.Response(request, response))
}
def handleApiVersionsRequest(request: RequestChannel.Request) {
// Note that broker returns its full list of supported ApiKeys and versions regardless of current
// authentication state (e.g., before SASL authentication on an SASL listener, do note that no
// Kafka protocol requests may take place on a SSL listener before the SSL handshake is finished).
// If this is considered to leak information about the broker version a workaround is to use SSL
// with client authentication which is performed at an earlier stage of the connection where the
// ApiVersionRequest is not available.
val responseBody = if (Protocol.apiVersionSupported(ApiKeys.API_VERSIONS.id, request.header.apiVersion))
ApiVersionsResponse.API_VERSIONS_RESPONSE
else
ApiVersionsResponse.fromError(Errors.UNSUPPORTED_VERSION)
requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
}
def close() {
quotas.shutdown()
info("Shutdown complete.")
}
def handleCreateTopicsRequest(request: RequestChannel.Request) {
val createTopicsRequest = request.body.asInstanceOf[CreateTopicsRequest]
def sendResponseCallback(results: Map[String, CreateTopicsResponse.Error]): Unit = {
val responseBody = new CreateTopicsResponse(results.asJava, request.header.apiVersion)
trace(s"Sending create topics response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.")
requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
}
if (!controller.isActive) {
val results = createTopicsRequest.topics.asScala.map { case (topic, _) =>
(topic, new CreateTopicsResponse.Error(Errors.NOT_CONTROLLER, null))
}
sendResponseCallback(results)
} else if (!authorize(request.session, Create, Resource.ClusterResource)) {
val results = createTopicsRequest.topics.asScala.map { case (topic, _) =>
(topic, new CreateTopicsResponse.Error(Errors.CLUSTER_AUTHORIZATION_FAILED, null))
}
sendResponseCallback(results)
} else {
val (validTopics, duplicateTopics) = createTopicsRequest.topics.asScala.partition { case (topic, _) =>
!createTopicsRequest.duplicateTopics.contains(topic)
}
// Special handling to add duplicate topics to the response
def sendResponseWithDuplicatesCallback(results: Map[String, CreateTopicsResponse.Error]): Unit = {
val duplicatedTopicsResults =
if (duplicateTopics.nonEmpty) {
val errorMessage = s"Create topics request from client `${request.header.clientId}` contains multiple entries " +
s"for the following topics: ${duplicateTopics.keySet.mkString(",")}"
// We can send the error message in the response for version 1, so we don't have to log it any more
if (request.header.apiVersion == 0)
warn(errorMessage)
duplicateTopics.keySet.map((_, new CreateTopicsResponse.Error(Errors.INVALID_REQUEST, errorMessage))).toMap
} else Map.empty
val completeResults = results ++ duplicatedTopicsResults
sendResponseCallback(completeResults)
}
adminManager.createTopics(
createTopicsRequest.timeout,
createTopicsRequest.validateOnly,
validTopics,
sendResponseWithDuplicatesCallback
)
}
}
def handleDeleteTopicsRequest(request: RequestChannel.Request) {
val deleteTopicRequest = request.body.asInstanceOf[DeleteTopicsRequest]
val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = deleteTopicRequest.topics.asScala.partition { topic =>
authorize(request.session, Describe, new Resource(auth.Topic, topic)) && metadataCache.contains(topic)
}
val (authorizedTopics, unauthorizedForDeleteTopics) = existingAndAuthorizedForDescribeTopics.partition { topic =>
authorize(request.session, Delete, new Resource(auth.Topic, topic))
}
def sendResponseCallback(results: Map[String, Errors]): Unit = {
val completeResults = nonExistingOrUnauthorizedForDescribeTopics.map(topic => (topic, Errors.UNKNOWN_TOPIC_OR_PARTITION)).toMap ++
unauthorizedForDeleteTopics.map(topic => (topic, Errors.TOPIC_AUTHORIZATION_FAILED)).toMap ++ results
val responseBody = new DeleteTopicsResponse(completeResults.asJava)
trace(s"Sending delete topics response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.")
requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
}
if (!controller.isActive) {
val results = deleteTopicRequest.topics.asScala.map { topic =>
(topic, Errors.NOT_CONTROLLER)
}.toMap
sendResponseCallback(results)
} else {
// If no authorized topics return immediately
if (authorizedTopics.isEmpty)
sendResponseCallback(Map())
else {
adminManager.deleteTopics(
deleteTopicRequest.timeout.toInt,
authorizedTopics,
sendResponseCallback
)
}
}
}
def authorizeClusterAction(request: RequestChannel.Request): Unit = {
if (!authorize(request.session, ClusterAction, Resource.ClusterResource))
throw new ClusterAuthorizationException(s"Request $request is not authorized.")
}
}