blob: 3d483bc7518ad76f9548772522751afb4d046b78 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.api
import java.nio.ByteBuffer
import kafka.common.{ErrorMapping, TopicAndPartition}
import kafka.api.ApiUtils._
import kafka.network.{BoundedByteBufferSend, RequestChannel}
import kafka.network.RequestChannel.Response
object OffsetRequest {
val CurrentVersion = 0.shortValue
val DefaultClientId = ""
val SmallestTimeString = "smallest"
val LargestTimeString = "largest"
val LatestTime = -1L
val EarliestTime = -2L
def readFrom(buffer: ByteBuffer): OffsetRequest = {
val versionId = buffer.getShort
val correlationId = buffer.getInt
val clientId = readShortString(buffer)
val replicaId = buffer.getInt
val topicCount = buffer.getInt
val pairs = (1 to topicCount).flatMap(_ => {
val topic = readShortString(buffer)
val partitionCount = buffer.getInt
(1 to partitionCount).map(_ => {
val partitionId = buffer.getInt
val time = buffer.getLong
val maxNumOffsets = buffer.getInt
(TopicAndPartition(topic, partitionId), PartitionOffsetRequestInfo(time, maxNumOffsets))
})
})
OffsetRequest(Map(pairs:_*), versionId= versionId, clientId = clientId, correlationId = correlationId, replicaId = replicaId)
}
}
case class PartitionOffsetRequestInfo(time: Long, maxNumOffsets: Int)
case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequestInfo],
versionId: Short = OffsetRequest.CurrentVersion,
correlationId: Int = 0,
clientId: String = OffsetRequest.DefaultClientId,
replicaId: Int = Request.OrdinaryConsumerId)
extends RequestOrResponse(Some(RequestKeys.OffsetsKey)) {
def this(requestInfo: Map[TopicAndPartition, PartitionOffsetRequestInfo], correlationId: Int, replicaId: Int) = this(requestInfo, OffsetRequest.CurrentVersion, correlationId, OffsetRequest.DefaultClientId, replicaId)
lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic)
def writeTo(buffer: ByteBuffer) {
buffer.putShort(versionId)
buffer.putInt(correlationId)
writeShortString(buffer, clientId)
buffer.putInt(replicaId)
buffer.putInt(requestInfoGroupedByTopic.size) // topic count
requestInfoGroupedByTopic.foreach {
case((topic, partitionInfos)) =>
writeShortString(buffer, topic)
buffer.putInt(partitionInfos.size) // partition count
partitionInfos.foreach {
case (TopicAndPartition(_, partition), partitionInfo) =>
buffer.putInt(partition)
buffer.putLong(partitionInfo.time)
buffer.putInt(partitionInfo.maxNumOffsets)
}
}
}
def sizeInBytes =
2 + /* versionId */
4 + /* correlationId */
shortStringLength(clientId) +
4 + /* replicaId */
4 + /* topic count */
requestInfoGroupedByTopic.foldLeft(0)((foldedTopics, currTopic) => {
val (topic, partitionInfos) = currTopic
foldedTopics +
shortStringLength(topic) +
4 + /* partition count */
partitionInfos.size * (
4 + /* partition */
8 + /* time */
4 /* maxNumOffsets */
)
})
def isFromOrdinaryClient = replicaId == Request.OrdinaryConsumerId
def isFromDebuggingClient = replicaId == Request.DebuggingConsumerId
override def toString(): String = {
describe(true)
}
override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
val partitionOffsetResponseMap = requestInfo.map {
case (topicAndPartition, partitionOffsetRequest) =>
(topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), null))
}
val errorResponse = OffsetResponse(correlationId, partitionOffsetResponseMap)
requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
}
override def describe(details: Boolean): String = {
val offsetRequest = new StringBuilder
offsetRequest.append("Name: " + this.getClass.getSimpleName)
offsetRequest.append("; Version: " + versionId)
offsetRequest.append("; CorrelationId: " + correlationId)
offsetRequest.append("; ClientId: " + clientId)
offsetRequest.append("; ReplicaId: " + replicaId)
if(details)
offsetRequest.append("; RequestInfo: " + requestInfo.mkString(","))
offsetRequest.toString()
}
}