blob: 751269b20c83c522cfccfa16310889ebdba7ce28 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.io.util;
import java.io.*;
import java.nio.ByteBuffer;
import com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.compress.BufferType;
import org.apache.cassandra.utils.ByteBufferUtil;
public class RandomAccessReader extends AbstractDataInput implements FileDataInput
{
// default buffer size, 64Kb
public static final int DEFAULT_BUFFER_SIZE = 65536;
public static final int BUFFER_SIZE = Integer.getInteger("cassandra.rar_buffer_size", DEFAULT_BUFFER_SIZE);
// buffer which will cache file blocks
protected ByteBuffer buffer;
// `bufferOffset` is the offset of the beginning of the buffer
// `markedPointer` folds the offset of the last file mark
protected long bufferOffset, markedPointer;
protected final ChannelProxy channel;
// this can be overridden at construction to a value shorter than the true length of the file;
// if so, it acts as an imposed limit on reads, rather than a convenience property
private final long fileLength;
protected final PoolingSegmentedFile owner;
protected RandomAccessReader(ChannelProxy channel, int bufferSize, long overrideLength, BufferType bufferType, PoolingSegmentedFile owner)
{
this.channel = channel.sharedCopy();
this.owner = owner;
// allocating required size of the buffer
if (bufferSize <= 0)
throw new IllegalArgumentException("bufferSize must be positive");
// we can cache file length in read-only mode
fileLength = overrideLength <= 0 ? channel.size() : overrideLength;
buffer = allocateBuffer(bufferSize, bufferType);
buffer.limit(0);
}
protected ByteBuffer allocateBuffer(int bufferSize, BufferType bufferType)
{
int size = (int) Math.min(fileLength, bufferSize);
return bufferType.allocate(size);
}
public static RandomAccessReader open(ChannelProxy channel, long overrideSize, PoolingSegmentedFile owner)
{
return open(channel, BUFFER_SIZE, overrideSize, owner);
}
public static RandomAccessReader open(File file)
{
try (ChannelProxy channel = new ChannelProxy(file))
{
return open(channel);
}
}
public static RandomAccessReader open(ChannelProxy channel)
{
return open(channel, -1L);
}
public static RandomAccessReader open(ChannelProxy channel, long overrideSize)
{
return open(channel, BUFFER_SIZE, overrideSize, null);
}
@VisibleForTesting
static RandomAccessReader open(ChannelProxy channel, int bufferSize, PoolingSegmentedFile owner)
{
return open(channel, bufferSize, -1L, owner);
}
private static RandomAccessReader open(ChannelProxy channel, int bufferSize, long overrideSize, PoolingSegmentedFile owner)
{
return new RandomAccessReader(channel, bufferSize, overrideSize, BufferType.ON_HEAP, owner);
}
@VisibleForTesting
static RandomAccessReader open(SequentialWriter writer)
{
try (ChannelProxy channel = new ChannelProxy(writer.getPath()))
{
return open(channel, BUFFER_SIZE, null);
}
}
public ChannelProxy getChannel()
{
return channel;
}
/**
* Read data from file starting from current currentOffset to populate buffer.
*/
protected void reBuffer()
{
bufferOffset += buffer.position();
buffer.clear();
assert bufferOffset < fileLength;
long position = bufferOffset;
long limit = bufferOffset;
while (buffer.hasRemaining() && limit < fileLength)
{
int n = channel.read(buffer, position);
if (n < 0)
break;
position += n;
limit = bufferOffset + buffer.position();
}
if (limit > fileLength)
buffer.position((int)(fileLength - bufferOffset));
buffer.flip();
}
@Override
public long getFilePointer()
{
return current();
}
protected long current()
{
return bufferOffset + (buffer == null ? 0 : buffer.position());
}
public String getPath()
{
return channel.filePath();
}
public int getTotalBufferSize()
{
//This may NPE so we make a ref
//https://issues.apache.org/jira/browse/CASSANDRA-7756
ByteBuffer ref = buffer;
return ref != null ? ref.capacity() : 0;
}
public void reset()
{
seek(markedPointer);
}
public long bytesPastMark()
{
long bytes = current() - markedPointer;
assert bytes >= 0;
return bytes;
}
public FileMark mark()
{
markedPointer = current();
return new BufferedRandomAccessFileMark(markedPointer);
}
public void reset(FileMark mark)
{
assert mark instanceof BufferedRandomAccessFileMark;
seek(((BufferedRandomAccessFileMark) mark).pointer);
}
public long bytesPastMark(FileMark mark)
{
assert mark instanceof BufferedRandomAccessFileMark;
long bytes = current() - ((BufferedRandomAccessFileMark) mark).pointer;
assert bytes >= 0;
return bytes;
}
/**
* @return true if there is no more data to read
*/
public boolean isEOF()
{
return getFilePointer() == length();
}
public long bytesRemaining()
{
return length() - getFilePointer();
}
@Override
public void close()
{
if (owner == null || buffer == null)
{
// The buffer == null check is so that if the pool owner has deallocated us, calling close()
// will re-call deallocate rather than recycling a deallocated object.
// I'd be more comfortable if deallocate didn't have to handle being idempotent like that,
// but RandomAccessFile.close will call AbstractInterruptibleChannel.close which will
// re-call RAF.close -- in this case, [C]RAR.close since we are overriding that.
deallocate();
}
else
{
owner.recycle(this);
}
}
public void deallocate()
{
//make idempotent
if (buffer == null)
return;
bufferOffset += buffer.position();
FileUtils.clean(buffer);
buffer = null; // makes sure we don't use this after it's ostensibly closed
channel.close();
}
@Override
public String toString()
{
return getClass().getSimpleName() + "(" + "filePath='" + channel + "')";
}
/**
* Class to hold a mark to the position of the file
*/
protected static class BufferedRandomAccessFileMark implements FileMark
{
final long pointer;
public BufferedRandomAccessFileMark(long pointer)
{
this.pointer = pointer;
}
}
@Override
public void seek(long newPosition)
{
if (newPosition < 0)
throw new IllegalArgumentException("new position should not be negative");
if (newPosition >= length()) // it is save to call length() in read-only mode
{
if (newPosition > length())
throw new IllegalArgumentException(String.format("Unable to seek to position %d in %s (%d bytes) in read-only mode",
newPosition, getPath(), length()));
buffer.limit(0);
bufferOffset = newPosition;
return;
}
if (newPosition >= bufferOffset && newPosition < bufferOffset + buffer.limit())
{
buffer.position((int) (newPosition - bufferOffset));
return;
}
// Set current location to newPosition and clear buffer so reBuffer calculates from newPosition
bufferOffset = newPosition;
buffer.clear();
reBuffer();
assert current() == newPosition;
}
// -1 will be returned if there is nothing to read; higher-level methods like readInt
// or readFully (from RandomAccessFile) will throw EOFException but this should not
public int read()
{
if (buffer == null)
throw new AssertionError("Attempted to read from closed RAR");
if (isEOF())
return -1; // required by RandomAccessFile
if (!buffer.hasRemaining())
reBuffer();
return (int)buffer.get() & 0xff;
}
@Override
public int read(byte[] buffer)
{
return read(buffer, 0, buffer.length);
}
@Override
// -1 will be returned if there is nothing to read; higher-level methods like readInt
// or readFully (from RandomAccessFile) will throw EOFException but this should not
public int read(byte[] buff, int offset, int length)
{
if (buffer == null)
throw new AssertionError("Attempted to read from closed RAR");
if (length == 0)
return 0;
if (isEOF())
return -1;
if (!buffer.hasRemaining())
reBuffer();
int toCopy = Math.min(length, buffer.remaining());
buffer.get(buff, offset, toCopy);
return toCopy;
}
public ByteBuffer readBytes(int length) throws EOFException
{
assert length >= 0 : "buffer length should not be negative: " + length;
try
{
ByteBuffer result = ByteBuffer.allocate(length);
while (result.hasRemaining())
{
if (isEOF())
throw new EOFException();
if (!buffer.hasRemaining())
reBuffer();
ByteBufferUtil.put(buffer, result);
}
result.flip();
return result;
}
catch (EOFException e)
{
throw e;
}
catch (Exception e)
{
throw new FSReadError(e, channel.toString());
}
}
public long length()
{
return fileLength;
}
public long getPosition()
{
return bufferOffset + buffer.position();
}
public long getPositionLimit()
{
return length();
}
}