blob: 747d205690c69667a7e65ee7a53671f9f773699e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.api
import java.nio.ByteBuffer
import kafka.utils.{nonthreadsafe, Utils}
import kafka.network.{Send, Request}
import java.nio.channels.GatheringByteChannel
import kafka.common.ErrorMapping
object OffsetRequest {
val SmallestTimeString = "smallest"
val LargestTimeString = "largest"
val LatestTime = -1L
val EarliestTime = -2L
def readFrom(buffer: ByteBuffer): OffsetRequest = {
val topic = Utils.readShortString(buffer, "UTF-8")
val partition = buffer.getInt()
val offset = buffer.getLong
val maxNumOffsets = buffer.getInt
new OffsetRequest(topic, partition, offset, maxNumOffsets)
}
def serializeOffsetArray(offsets: Array[Long]): ByteBuffer = {
val size = 4 + 8 * offsets.length
val buffer = ByteBuffer.allocate(size)
buffer.putInt(offsets.length)
for (i <- 0 until offsets.length)
buffer.putLong(offsets(i))
buffer.rewind
buffer
}
def deserializeOffsetArray(buffer: ByteBuffer): Array[Long] = {
val size = buffer.getInt
val offsets = new Array[Long](size)
for (i <- 0 until offsets.length)
offsets(i) = buffer.getLong
offsets
}
}
class OffsetRequest(val topic: String,
val partition: Int,
val time: Long,
val maxNumOffsets: Int) extends Request(RequestKeys.Offsets) {
def writeTo(buffer: ByteBuffer) {
Utils.writeShortString(buffer, topic, "UTF-8")
buffer.putInt(partition)
buffer.putLong(time)
buffer.putInt(maxNumOffsets)
}
def sizeInBytes(): Int = 2 + topic.length + 4 + 8 + 4
override def toString(): String= "OffsetRequest(topic:" + topic + ", part:" + partition + ", time:" + time +
", maxNumOffsets:" + maxNumOffsets + ")"
}
@nonthreadsafe
private[kafka] class OffsetArraySend(offsets: Array[Long]) extends Send {
private var size: Long = offsets.foldLeft(4)((sum, _) => sum + 8)
private val header = ByteBuffer.allocate(6)
header.putInt(size.asInstanceOf[Int] + 2)
header.putShort(ErrorMapping.NoError.asInstanceOf[Short])
header.rewind()
private val contentBuffer = OffsetRequest.serializeOffsetArray(offsets)
var complete: Boolean = false
def writeTo(channel: GatheringByteChannel): Int = {
expectIncomplete()
var written = 0
if(header.hasRemaining)
written += channel.write(header)
if(!header.hasRemaining && contentBuffer.hasRemaining)
written += channel.write(contentBuffer)
if(!contentBuffer.hasRemaining)
complete = true
written
}
}