| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package kafka.api |
| |
| import java.nio.ByteBuffer |
| import kafka.common.{ErrorMapping, TopicAndPartition} |
| import kafka.api.ApiUtils._ |
| import kafka.network.{BoundedByteBufferSend, RequestChannel} |
| import kafka.network.RequestChannel.Response |
| |
| |
| object OffsetRequest { |
| val CurrentVersion = 0.shortValue |
| val DefaultClientId = "" |
| |
| val SmallestTimeString = "smallest" |
| val LargestTimeString = "largest" |
| val LatestTime = -1L |
| val EarliestTime = -2L |
| |
| def readFrom(buffer: ByteBuffer): OffsetRequest = { |
| val versionId = buffer.getShort |
| val correlationId = buffer.getInt |
| val clientId = readShortString(buffer) |
| val replicaId = buffer.getInt |
| val topicCount = buffer.getInt |
| val pairs = (1 to topicCount).flatMap(_ => { |
| val topic = readShortString(buffer) |
| val partitionCount = buffer.getInt |
| (1 to partitionCount).map(_ => { |
| val partitionId = buffer.getInt |
| val time = buffer.getLong |
| val maxNumOffsets = buffer.getInt |
| (TopicAndPartition(topic, partitionId), PartitionOffsetRequestInfo(time, maxNumOffsets)) |
| }) |
| }) |
| OffsetRequest(Map(pairs:_*), versionId= versionId, clientId = clientId, correlationId = correlationId, replicaId = replicaId) |
| } |
| } |
| |
| case class PartitionOffsetRequestInfo(time: Long, maxNumOffsets: Int) |
| |
| case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequestInfo], |
| versionId: Short = OffsetRequest.CurrentVersion, |
| correlationId: Int = 0, |
| clientId: String = OffsetRequest.DefaultClientId, |
| replicaId: Int = Request.OrdinaryConsumerId) |
| extends RequestOrResponse(Some(RequestKeys.OffsetsKey)) { |
| |
| def this(requestInfo: Map[TopicAndPartition, PartitionOffsetRequestInfo], correlationId: Int, replicaId: Int) = this(requestInfo, OffsetRequest.CurrentVersion, correlationId, OffsetRequest.DefaultClientId, replicaId) |
| |
| lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic) |
| |
| def writeTo(buffer: ByteBuffer) { |
| buffer.putShort(versionId) |
| buffer.putInt(correlationId) |
| writeShortString(buffer, clientId) |
| buffer.putInt(replicaId) |
| |
| buffer.putInt(requestInfoGroupedByTopic.size) // topic count |
| requestInfoGroupedByTopic.foreach { |
| case((topic, partitionInfos)) => |
| writeShortString(buffer, topic) |
| buffer.putInt(partitionInfos.size) // partition count |
| partitionInfos.foreach { |
| case (TopicAndPartition(_, partition), partitionInfo) => |
| buffer.putInt(partition) |
| buffer.putLong(partitionInfo.time) |
| buffer.putInt(partitionInfo.maxNumOffsets) |
| } |
| } |
| } |
| |
| def sizeInBytes = |
| 2 + /* versionId */ |
| 4 + /* correlationId */ |
| shortStringLength(clientId) + |
| 4 + /* replicaId */ |
| 4 + /* topic count */ |
| requestInfoGroupedByTopic.foldLeft(0)((foldedTopics, currTopic) => { |
| val (topic, partitionInfos) = currTopic |
| foldedTopics + |
| shortStringLength(topic) + |
| 4 + /* partition count */ |
| partitionInfos.size * ( |
| 4 + /* partition */ |
| 8 + /* time */ |
| 4 /* maxNumOffsets */ |
| ) |
| }) |
| |
| def isFromOrdinaryClient = replicaId == Request.OrdinaryConsumerId |
| def isFromDebuggingClient = replicaId == Request.DebuggingConsumerId |
| |
| override def toString(): String = { |
| describe(true) |
| } |
| |
| override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { |
| val partitionOffsetResponseMap = requestInfo.map { |
| case (topicAndPartition, partitionOffsetRequest) => |
| (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), null)) |
| } |
| val errorResponse = OffsetResponse(correlationId, partitionOffsetResponseMap) |
| requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) |
| } |
| |
| override def describe(details: Boolean): String = { |
| val offsetRequest = new StringBuilder |
| offsetRequest.append("Name: " + this.getClass.getSimpleName) |
| offsetRequest.append("; Version: " + versionId) |
| offsetRequest.append("; CorrelationId: " + correlationId) |
| offsetRequest.append("; ClientId: " + clientId) |
| offsetRequest.append("; ReplicaId: " + replicaId) |
| if(details) |
| offsetRequest.append("; RequestInfo: " + requestInfo.mkString(",")) |
| offsetRequest.toString() |
| } |
| } |