blob: 5e14987c990fe561c01dac2909f5ed21a506e038 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.api
import java.nio._
import kafka.api.ApiUtils._
import kafka.network.{BoundedByteBufferSend, RequestChannel, InvalidRequestException}
import kafka.common.{TopicAndPartition, ErrorMapping}
import kafka.network.RequestChannel.Response
import kafka.utils.Logging
import collection.Set
object StopReplicaRequest extends Logging {
val CurrentVersion = 0.shortValue
val DefaultClientId = ""
val DefaultAckTimeout = 100
def readFrom(buffer: ByteBuffer): StopReplicaRequest = {
val versionId = buffer.getShort
val correlationId = buffer.getInt
val clientId = readShortString(buffer)
val controllerId = buffer.getInt
val controllerEpoch = buffer.getInt
val deletePartitions = buffer.get match {
case 1 => true
case 0 => false
case x =>
throw new InvalidRequestException("Invalid byte %d in delete partitions field. (Assuming false.)".format(x))
}
val topicPartitionPairCount = buffer.getInt
val topicPartitionPairSet = new collection.mutable.HashSet[TopicAndPartition]()
(1 to topicPartitionPairCount) foreach { _ =>
topicPartitionPairSet.add(TopicAndPartition(readShortString(buffer), buffer.getInt))
}
StopReplicaRequest(versionId, correlationId, clientId, controllerId, controllerEpoch,
deletePartitions, topicPartitionPairSet.toSet)
}
}
case class StopReplicaRequest(versionId: Short,
correlationId: Int,
clientId: String,
controllerId: Int,
controllerEpoch: Int,
deletePartitions: Boolean,
partitions: Set[TopicAndPartition])
extends RequestOrResponse(Some(RequestKeys.StopReplicaKey)) {
def this(deletePartitions: Boolean, partitions: Set[TopicAndPartition], controllerId: Int, controllerEpoch: Int, correlationId: Int) = {
this(StopReplicaRequest.CurrentVersion, correlationId, StopReplicaRequest.DefaultClientId,
controllerId, controllerEpoch, deletePartitions, partitions)
}
def writeTo(buffer: ByteBuffer) {
buffer.putShort(versionId)
buffer.putInt(correlationId)
writeShortString(buffer, clientId)
buffer.putInt(controllerId)
buffer.putInt(controllerEpoch)
buffer.put(if (deletePartitions) 1.toByte else 0.toByte)
buffer.putInt(partitions.size)
for (topicAndPartition <- partitions) {
writeShortString(buffer, topicAndPartition.topic)
buffer.putInt(topicAndPartition.partition)
}
}
def sizeInBytes(): Int = {
var size =
2 + /* versionId */
4 + /* correlation id */
ApiUtils.shortStringLength(clientId) +
4 + /* controller id*/
4 + /* controller epoch */
1 + /* deletePartitions */
4 /* partition count */
for (topicAndPartition <- partitions){
size += (ApiUtils.shortStringLength(topicAndPartition.topic)) +
4 /* partition id */
}
size
}
override def toString(): String = {
describe(true)
}
override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
val responseMap = partitions.map {
case topicAndPartition => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
}.toMap
val errorResponse = StopReplicaResponse(correlationId, responseMap)
requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
}
override def describe(details: Boolean): String = {
val stopReplicaRequest = new StringBuilder
stopReplicaRequest.append("Name: " + this.getClass.getSimpleName)
stopReplicaRequest.append("; Version: " + versionId)
stopReplicaRequest.append("; CorrelationId: " + correlationId)
stopReplicaRequest.append("; ClientId: " + clientId)
stopReplicaRequest.append("; DeletePartitions: " + deletePartitions)
stopReplicaRequest.append("; ControllerId: " + controllerId)
stopReplicaRequest.append("; ControllerEpoch: " + controllerEpoch)
if(details)
stopReplicaRequest.append("; Partitions: " + partitions.mkString(","))
stopReplicaRequest.toString()
}
}