blob: abe67a574ecf689aedc037be85a1ad8b015d3b5c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.api
import java.nio.ByteBuffer
import kafka.utils.Logging
import kafka.common.{ErrorMapping, TopicAndPartition}
object OffsetCommitResponse extends Logging {
val CurrentVersion: Short = 1
def readFrom(buffer: ByteBuffer): OffsetCommitResponse = {
val correlationId = buffer.getInt
val topicCount = buffer.getInt
val pairs = (1 to topicCount).flatMap(_ => {
val topic = ApiUtils.readShortString(buffer)
val partitionCount = buffer.getInt
(1 to partitionCount).map(_ => {
val partitionId = buffer.getInt
val error = buffer.getShort
(TopicAndPartition(topic, partitionId), error)
})
})
OffsetCommitResponse(Map(pairs:_*), correlationId)
}
}
/**
* Single constructor for both version 0 and 1 since they have the same format.
*/
case class OffsetCommitResponse(commitStatus: Map[TopicAndPartition, Short],
correlationId: Int = 0)
extends RequestOrResponse() {
lazy val commitStatusGroupedByTopic = commitStatus.groupBy(_._1.topic)
def hasError = commitStatus.exists{ case (topicAndPartition, errorCode) => errorCode != ErrorMapping.NoError }
def writeTo(buffer: ByteBuffer) {
buffer.putInt(correlationId)
buffer.putInt(commitStatusGroupedByTopic.size)
commitStatusGroupedByTopic.foreach { case(topic, statusMap) =>
ApiUtils.writeShortString(buffer, topic)
buffer.putInt(statusMap.size) // partition count
statusMap.foreach { case(topicAndPartition, errorCode) =>
buffer.putInt(topicAndPartition.partition)
buffer.putShort(errorCode)
}
}
}
override def sizeInBytes =
4 + /* correlationId */
4 + /* topic count */
commitStatusGroupedByTopic.foldLeft(0)((count, partitionStatusMap) => {
val (topic, partitionStatus) = partitionStatusMap
count +
ApiUtils.shortStringLength(topic) +
4 + /* partition count */
partitionStatus.size * ( 4 /* partition */ + 2 /* error code */)
})
override def describe(details: Boolean):String = { toString }
}