blob: 15d0975e8fe2549785876d3847ec46d043c86d99 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.io.util;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import net.nicoulaj.compilecommand.annotations.DontInline;
import org.apache.cassandra.utils.FastByteOperations;
import org.apache.cassandra.utils.vint.VIntCoding;
import com.google.common.base.Preconditions;
/**
* Rough equivalent of BufferedInputStream and DataInputStream wrapping a ByteBuffer that can be refilled
* via rebuffer. Implementations provide this buffer from various channels (socket, file, memory, etc).
*
* RebufferingInputStream is not thread safe.
*/
public abstract class RebufferingInputStream extends InputStream implements DataInputPlus, Closeable
{
protected ByteBuffer buffer;
protected RebufferingInputStream(ByteBuffer buffer)
{
Preconditions.checkArgument(buffer == null || buffer.order() == ByteOrder.BIG_ENDIAN, "Buffer must have BIG ENDIAN byte ordering");
this.buffer = buffer;
}
/**
* Implementations must implement this method to refill the buffer.
* They can expect the buffer to be empty when this method is invoked.
* @throws IOException
*/
protected abstract void reBuffer() throws IOException;
@Override
public void readFully(byte[] b) throws IOException
{
readFully(b, 0, b.length);
}
@Override
public void readFully(byte[] b, int off, int len) throws IOException
{
int read = read(b, off, len);
if (read < len)
throw new EOFException("EOF after " + read + " bytes out of " + len);
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
// avoid int overflow
if (off < 0 || off > b.length || len < 0 || len > b.length - off)
throw new IndexOutOfBoundsException();
if (len == 0)
return 0;
int copied = 0;
while (copied < len)
{
int position = buffer.position();
int remaining = buffer.limit() - position;
if (remaining == 0)
{
reBuffer();
position = buffer.position();
remaining = buffer.limit() - position;
if (remaining == 0)
return copied == 0 ? -1 : copied;
}
int toCopy = Math.min(len - copied, remaining);
FastByteOperations.copy(buffer, position, b, off + copied, toCopy);
buffer.position(position + toCopy);
copied += toCopy;
}
return copied;
}
@DontInline
protected long readPrimitiveSlowly(int bytes) throws IOException
{
long result = 0;
for (int i = 0; i < bytes; i++)
result = (result << 8) | (readByte() & 0xFFL);
return result;
}
@Override
public int skipBytes(int n) throws IOException
{
if (n < 0)
return 0;
int requested = n;
int position = buffer.position(), limit = buffer.limit(), remaining;
while ((remaining = limit - position) < n)
{
n -= remaining;
buffer.position(limit);
reBuffer();
position = buffer.position();
limit = buffer.limit();
if (position == limit)
return requested - n;
}
buffer.position(position + n);
return requested;
}
@Override
public boolean readBoolean() throws IOException
{
return readByte() != 0;
}
@Override
public byte readByte() throws IOException
{
if (!buffer.hasRemaining())
{
reBuffer();
if (!buffer.hasRemaining())
throw new EOFException();
}
return buffer.get();
}
@Override
public int readUnsignedByte() throws IOException
{
return readByte() & 0xff;
}
@Override
public short readShort() throws IOException
{
if (buffer.remaining() >= 2)
return buffer.getShort();
else
return (short) readPrimitiveSlowly(2);
}
@Override
public int readUnsignedShort() throws IOException
{
return readShort() & 0xFFFF;
}
@Override
public char readChar() throws IOException
{
if (buffer.remaining() >= 2)
return buffer.getChar();
else
return (char) readPrimitiveSlowly(2);
}
@Override
public int readInt() throws IOException
{
if (buffer.remaining() >= 4)
return buffer.getInt();
else
return (int) readPrimitiveSlowly(4);
}
@Override
public long readLong() throws IOException
{
if (buffer.remaining() >= 8)
return buffer.getLong();
else
return readPrimitiveSlowly(8);
}
public long readVInt() throws IOException
{
return VIntCoding.decodeZigZag64(readUnsignedVInt());
}
public long readUnsignedVInt() throws IOException
{
//If 9 bytes aren't available use the slow path in VIntCoding
if (buffer.remaining() < 9)
return VIntCoding.readUnsignedVInt(this);
byte firstByte = buffer.get();
//Bail out early if this is one byte, necessary or it fails later
if (firstByte >= 0)
return firstByte;
int extraBytes = VIntCoding.numberOfExtraBytesToRead(firstByte);
int position = buffer.position();
int extraBits = extraBytes * 8;
long retval = buffer.getLong(position);
if (buffer.order() == ByteOrder.LITTLE_ENDIAN)
retval = Long.reverseBytes(retval);
buffer.position(position + extraBytes);
// truncate the bytes we read in excess of those we needed
retval >>>= 64 - extraBits;
// remove the non-value bits from the first byte
firstByte &= VIntCoding.firstByteValueMask(extraBytes);
// shift the first byte up to its correct position
retval |= (long) firstByte << extraBits;
return retval;
}
@Override
public float readFloat() throws IOException
{
if (buffer.remaining() >= 4)
return buffer.getFloat();
else
return Float.intBitsToFloat((int)readPrimitiveSlowly(4));
}
@Override
public double readDouble() throws IOException
{
if (buffer.remaining() >= 8)
return buffer.getDouble();
else
return Double.longBitsToDouble(readPrimitiveSlowly(8));
}
@Override
public String readLine() throws IOException
{
throw new UnsupportedOperationException();
}
@Override
public String readUTF() throws IOException
{
return DataInputStream.readUTF(this);
}
@Override
public int read() throws IOException
{
try
{
return readUnsignedByte();
}
catch (EOFException ex)
{
return -1;
}
}
@Override
public void reset() throws IOException
{
throw new IOException("mark/reset not supported");
}
@Override
public boolean markSupported()
{
return false;
}
}