blob: 63c0899ec46c4a8d0477a2368973bd86aaddc6e3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.api
import java.nio.ByteBuffer
import kafka.common.{ErrorMapping, TopicAndPartition}
import kafka.api.ApiUtils._
object OffsetResponse {
def readFrom(buffer: ByteBuffer): OffsetResponse = {
val correlationId = buffer.getInt
val numTopics = buffer.getInt
val pairs = (1 to numTopics).flatMap(_ => {
val topic = readShortString(buffer)
val numPartitions = buffer.getInt
(1 to numPartitions).map(_ => {
val partition = buffer.getInt
val error = buffer.getShort
val numOffsets = buffer.getInt
val offsets = (1 to numOffsets).map(_ => buffer.getLong)
(TopicAndPartition(topic, partition), PartitionOffsetsResponse(error, offsets))
})
})
OffsetResponse(correlationId, Map(pairs:_*))
}
}
case class PartitionOffsetsResponse(error: Short, offsets: Seq[Long]) {
override def toString(): String = {
new String("error: " + ErrorMapping.exceptionFor(error).getClass.getName + " offsets: " + offsets.mkString)
}
}
case class OffsetResponse(correlationId: Int,
partitionErrorAndOffsets: Map[TopicAndPartition, PartitionOffsetsResponse])
extends RequestOrResponse() {
lazy val offsetsGroupedByTopic = partitionErrorAndOffsets.groupBy(_._1.topic)
def hasError = partitionErrorAndOffsets.values.exists(_.error != ErrorMapping.NoError)
val sizeInBytes = {
4 + /* correlation id */
4 + /* topic count */
offsetsGroupedByTopic.foldLeft(0)((foldedTopics, currTopic) => {
val (topic, errorAndOffsetsMap) = currTopic
foldedTopics +
shortStringLength(topic) +
4 + /* partition count */
errorAndOffsetsMap.foldLeft(0)((foldedPartitions, currPartition) => {
foldedPartitions +
4 + /* partition id */
2 + /* partition error */
4 + /* offset array length */
currPartition._2.offsets.size * 8 /* offset */
})
})
}
def writeTo(buffer: ByteBuffer) {
buffer.putInt(correlationId)
buffer.putInt(offsetsGroupedByTopic.size) // topic count
offsetsGroupedByTopic.foreach {
case((topic, errorAndOffsetsMap)) =>
writeShortString(buffer, topic)
buffer.putInt(errorAndOffsetsMap.size) // partition count
errorAndOffsetsMap.foreach {
case((TopicAndPartition(_, partition), errorAndOffsets)) =>
buffer.putInt(partition)
buffer.putShort(errorAndOffsets.error)
buffer.putInt(errorAndOffsets.offsets.size) // offset array length
errorAndOffsets.offsets.foreach(buffer.putLong(_))
}
}
}
override def describe(details: Boolean):String = { toString }
}