| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.cassandra.io.util; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.nio.ByteOrder; |
| |
| import com.google.common.primitives.Ints; |
| |
| import org.apache.cassandra.io.compress.BufferType; |
| import org.apache.cassandra.io.util.Rebufferer.BufferHolder; |
| |
| public class RandomAccessReader extends RebufferingInputStream implements FileDataInput |
| { |
| // The default buffer size when the client doesn't specify it |
| public static final int DEFAULT_BUFFER_SIZE = 4096; |
| |
| // offset of the last file mark |
| private long markedPointer; |
| |
| final Rebufferer rebufferer; |
| private BufferHolder bufferHolder = Rebufferer.EMPTY; |
| |
| /** |
| * Only created through Builder |
| * |
| * @param rebufferer Rebufferer to use |
| */ |
| RandomAccessReader(Rebufferer rebufferer) |
| { |
| super(Rebufferer.EMPTY.buffer()); |
| this.rebufferer = rebufferer; |
| } |
| |
| /** |
| * Read data from file starting from current currentOffset to populate buffer. |
| */ |
| public void reBuffer() |
| { |
| if (isEOF()) |
| return; |
| |
| reBufferAt(current()); |
| } |
| |
| private void reBufferAt(long position) |
| { |
| bufferHolder.release(); |
| bufferHolder = rebufferer.rebuffer(position); |
| buffer = bufferHolder.buffer(); |
| buffer.position(Ints.checkedCast(position - bufferHolder.offset())); |
| |
| assert buffer.order() == ByteOrder.BIG_ENDIAN : "Buffer must have BIG ENDIAN byte ordering"; |
| } |
| |
| @Override |
| public long getFilePointer() |
| { |
| if (buffer == null) // closed already |
| return rebufferer.fileLength(); |
| return current(); |
| } |
| |
| protected long current() |
| { |
| return bufferHolder.offset() + buffer.position(); |
| } |
| |
| public String getPath() |
| { |
| return getChannel().filePath(); |
| } |
| |
| public ChannelProxy getChannel() |
| { |
| return rebufferer.channel(); |
| } |
| |
| @Override |
| public void reset() throws IOException |
| { |
| seek(markedPointer); |
| } |
| |
| @Override |
| public boolean markSupported() |
| { |
| return true; |
| } |
| |
| public long bytesPastMark() |
| { |
| long bytes = current() - markedPointer; |
| assert bytes >= 0; |
| return bytes; |
| } |
| |
| public DataPosition mark() |
| { |
| markedPointer = current(); |
| return new BufferedRandomAccessFileMark(markedPointer); |
| } |
| |
| public void reset(DataPosition mark) |
| { |
| assert mark instanceof BufferedRandomAccessFileMark; |
| seek(((BufferedRandomAccessFileMark) mark).pointer); |
| } |
| |
| public long bytesPastMark(DataPosition mark) |
| { |
| assert mark instanceof BufferedRandomAccessFileMark; |
| long bytes = current() - ((BufferedRandomAccessFileMark) mark).pointer; |
| assert bytes >= 0; |
| return bytes; |
| } |
| |
| /** |
| * @return true if there is no more data to read |
| */ |
| public boolean isEOF() |
| { |
| return current() == length(); |
| } |
| |
| public long bytesRemaining() |
| { |
| return length() - getFilePointer(); |
| } |
| |
| @Override |
| public int available() throws IOException |
| { |
| return Ints.saturatedCast(bytesRemaining()); |
| } |
| |
| @Override |
| public void close() |
| { |
| // close needs to be idempotent. |
| if (buffer == null) |
| return; |
| |
| bufferHolder.release(); |
| rebufferer.closeReader(); |
| buffer = null; |
| bufferHolder = null; |
| |
| //For performance reasons we don't keep a reference to the file |
| //channel so we don't close it |
| } |
| |
| @Override |
| public String toString() |
| { |
| return getClass().getSimpleName() + ':' + rebufferer.toString(); |
| } |
| |
| /** |
| * Class to hold a mark to the position of the file |
| */ |
| private static class BufferedRandomAccessFileMark implements DataPosition |
| { |
| final long pointer; |
| |
| private BufferedRandomAccessFileMark(long pointer) |
| { |
| this.pointer = pointer; |
| } |
| } |
| |
| @Override |
| public void seek(long newPosition) |
| { |
| if (newPosition < 0) |
| throw new IllegalArgumentException("new position should not be negative"); |
| |
| if (buffer == null) |
| throw new IllegalStateException("Attempted to seek in a closed RAR"); |
| |
| long bufferOffset = bufferHolder.offset(); |
| if (newPosition >= bufferOffset && newPosition < bufferOffset + buffer.limit()) |
| { |
| buffer.position((int) (newPosition - bufferOffset)); |
| return; |
| } |
| |
| if (newPosition > length()) |
| throw new IllegalArgumentException(String.format("Unable to seek to position %d in %s (%d bytes) in read-only mode", |
| newPosition, getPath(), length())); |
| reBufferAt(newPosition); |
| } |
| |
| /** |
| * Reads a line of text form the current position in this file. A line is |
| * represented by zero or more characters followed by {@code '\n'}, {@code |
| * '\r'}, {@code "\r\n"} or the end of file marker. The string does not |
| * include the line terminating sequence. |
| * <p> |
| * Blocks until a line terminating sequence has been read, the end of the |
| * file is reached or an exception is thrown. |
| * </p> |
| * @return the contents of the line or {@code null} if no characters have |
| * been read before the end of the file has been reached. |
| * @throws IOException if this file is closed or another I/O error occurs. |
| */ |
| public final String readLine() throws IOException |
| { |
| StringBuilder line = new StringBuilder(80); // Typical line length |
| boolean foundTerminator = false; |
| long unreadPosition = -1; |
| while (true) |
| { |
| int nextByte = read(); |
| switch (nextByte) |
| { |
| case -1: |
| return line.length() != 0 ? line.toString() : null; |
| case (byte) '\r': |
| if (foundTerminator) |
| { |
| seek(unreadPosition); |
| return line.toString(); |
| } |
| foundTerminator = true; |
| /* Have to be able to peek ahead one byte */ |
| unreadPosition = getPosition(); |
| break; |
| case (byte) '\n': |
| return line.toString(); |
| default: |
| if (foundTerminator) |
| { |
| seek(unreadPosition); |
| return line.toString(); |
| } |
| line.append((char) nextByte); |
| } |
| } |
| } |
| |
| public long length() |
| { |
| return rebufferer.fileLength(); |
| } |
| |
| public long getPosition() |
| { |
| return current(); |
| } |
| |
| public double getCrcCheckChance() |
| { |
| return rebufferer.getCrcCheckChance(); |
| } |
| |
| // A wrapper of the RandomAccessReader that closes the channel when done. |
| // For performance reasons RAR does not increase the reference count of |
| // a channel but assumes the owner will keep it open and close it, |
| // see CASSANDRA-9379, this thin class is just for those cases where we do |
| // not have a shared channel. |
| static class RandomAccessReaderWithOwnChannel extends RandomAccessReader |
| { |
| RandomAccessReaderWithOwnChannel(Rebufferer rebufferer) |
| { |
| super(rebufferer); |
| } |
| |
| @Override |
| public void close() |
| { |
| try |
| { |
| super.close(); |
| } |
| finally |
| { |
| try |
| { |
| rebufferer.close(); |
| } |
| finally |
| { |
| getChannel().close(); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Open a RandomAccessReader (not compressed, not mmapped, no read throttling) that will own its channel. |
| * |
| * @param file File to open for reading |
| * @return new RandomAccessReader that owns the channel opened in this method. |
| */ |
| @SuppressWarnings("resource") |
| public static RandomAccessReader open(File file) |
| { |
| ChannelProxy channel = new ChannelProxy(file); |
| try |
| { |
| ChunkReader reader = new SimpleChunkReader(channel, -1, BufferType.OFF_HEAP, DEFAULT_BUFFER_SIZE); |
| Rebufferer rebufferer = reader.instantiateRebufferer(); |
| return new RandomAccessReaderWithOwnChannel(rebufferer); |
| } |
| catch (Throwable t) |
| { |
| channel.close(); |
| throw t; |
| } |
| } |
| } |