blob: 7cd3bb3de2b58d7aec32a97b43730d27486ec9cd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.fs.impl.prefetch;
import java.nio.ByteBuffer;
import static org.apache.hadoop.fs.impl.prefetch.Validate.checkNotNegative;
import static org.apache.hadoop.fs.impl.prefetch.Validate.checkNotNull;
import static org.apache.hadoop.fs.impl.prefetch.Validate.checkPositiveInteger;
import static org.apache.hadoop.fs.impl.prefetch.Validate.checkState;
import static org.apache.hadoop.fs.impl.prefetch.Validate.checkWithinRange;
/**
* Provides functionality related to tracking the position within a file.
*
* The file is accessed through an in memory buffer. The absolute position within
* the file is the sum of start offset of the buffer within the file and the relative
* offset of the current access location within the buffer.
*
* A file is made up of equal sized blocks. The last block may be of a smaller size.
* The size of a buffer associated with this file is typically the same as block size.
*/
public final class FilePosition {
/**
* Holds block based information about a file.
*/
private BlockData blockData;
/**
* Information about the buffer in use.
*/
private BufferData data;
/**
* Provides access to the underlying file.
*/
private ByteBuffer buffer;
/**
* Start offset of the buffer relative to the start of a file.
*/
private long bufferStartOffset;
/**
* Offset where reading starts relative to the start of a file.
*/
private long readStartOffset;
// Read stats after a seek (mostly for debugging use).
private int numSingleByteReads;
private int numBytesRead;
private int numBufferReads;
/**
* Constructs an instance of {@link FilePosition}.
*
* @param fileSize size of the associated file.
* @param blockSize size of each block within the file.
*
* @throws IllegalArgumentException if fileSize is negative.
* @throws IllegalArgumentException if blockSize is zero or negative.
*/
public FilePosition(long fileSize, int blockSize) {
checkNotNegative(fileSize, "fileSize");
if (fileSize == 0) {
checkNotNegative(blockSize, "blockSize");
} else {
checkPositiveInteger(blockSize, "blockSize");
}
this.blockData = new BlockData(fileSize, blockSize);
// The position is valid only when a valid buffer is associated with this file.
this.invalidate();
}
/**
* Associates a buffer with this file.
*
* @param bufferData the buffer associated with this file.
* @param startOffset Start offset of the buffer relative to the start of a file.
* @param readOffset Offset where reading starts relative to the start of a file.
*
* @throws IllegalArgumentException if bufferData is null.
* @throws IllegalArgumentException if startOffset is negative.
* @throws IllegalArgumentException if readOffset is negative.
* @throws IllegalArgumentException if readOffset is outside the range [startOffset, buffer end].
*/
public void setData(BufferData bufferData,
long startOffset,
long readOffset) {
checkNotNull(bufferData, "bufferData");
checkNotNegative(startOffset, "startOffset");
checkNotNegative(readOffset, "readOffset");
checkWithinRange(
readOffset,
"readOffset",
startOffset,
startOffset + bufferData.getBuffer().limit() - 1);
data = bufferData;
buffer = bufferData.getBuffer().duplicate();
bufferStartOffset = startOffset;
readStartOffset = readOffset;
setAbsolute(readOffset);
resetReadStats();
}
public ByteBuffer buffer() {
throwIfInvalidBuffer();
return buffer;
}
public BufferData data() {
throwIfInvalidBuffer();
return data;
}
/**
* Gets the current absolute position within this file.
*
* @return the current absolute position within this file.
*/
public long absolute() {
throwIfInvalidBuffer();
return bufferStartOffset + relative();
}
/**
* If the given {@code pos} lies within the current buffer, updates the current position to
* the specified value and returns true; otherwise returns false without changing the position.
*
* @param pos the absolute position to change the current position to if possible.
* @return true if the given current position was updated, false otherwise.
*/
public boolean setAbsolute(long pos) {
if (isValid() && isWithinCurrentBuffer(pos)) {
int relativePos = (int) (pos - bufferStartOffset);
buffer.position(relativePos);
return true;
} else {
return false;
}
}
/**
* Gets the current position within this file relative to the start of the associated buffer.
*
* @return the current position within this file relative to the start of the associated buffer.
*/
public int relative() {
throwIfInvalidBuffer();
return buffer.position();
}
/**
* Determines whether the given absolute position lies within the current buffer.
*
* @param pos the position to check.
* @return true if the given absolute position lies within the current buffer, false otherwise.
*/
public boolean isWithinCurrentBuffer(long pos) {
throwIfInvalidBuffer();
long bufferEndOffset = bufferStartOffset + buffer.limit() - 1;
return (pos >= bufferStartOffset) && (pos <= bufferEndOffset);
}
/**
* Gets the id of the current block.
*
* @return the id of the current block.
*/
public int blockNumber() {
throwIfInvalidBuffer();
return blockData.getBlockNumber(bufferStartOffset);
}
/**
* Determines whether the current block is the last block in this file.
*
* @return true if the current block is the last block in this file, false otherwise.
*/
public boolean isLastBlock() {
return blockData.isLastBlock(blockNumber());
}
/**
* Determines if the current position is valid.
*
* @return true if the current position is valid, false otherwise.
*/
public boolean isValid() {
return buffer != null;
}
/**
* Marks the current position as invalid.
*/
public void invalidate() {
buffer = null;
bufferStartOffset = -1;
data = null;
}
/**
* Gets the start of the current block's absolute offset.
*
* @return the start of the current block's absolute offset.
*/
public long bufferStartOffset() {
throwIfInvalidBuffer();
return bufferStartOffset;
}
/**
* Determines whether the current buffer has been fully read.
*
* @return true if the current buffer has been fully read, false otherwise.
*/
public boolean bufferFullyRead() {
throwIfInvalidBuffer();
return (bufferStartOffset == readStartOffset)
&& (relative() == buffer.limit())
&& (numBytesRead == buffer.limit());
}
public void incrementBytesRead(int n) {
numBytesRead += n;
if (n == 1) {
numSingleByteReads++;
} else {
numBufferReads++;
}
}
public int numBytesRead() {
return numBytesRead;
}
public int numSingleByteReads() {
return numSingleByteReads;
}
public int numBufferReads() {
return numBufferReads;
}
private void resetReadStats() {
numBytesRead = 0;
numSingleByteReads = 0;
numBufferReads = 0;
}
public String toString() {
StringBuilder sb = new StringBuilder();
if (buffer == null) {
sb.append("currentBuffer = null");
} else {
int pos = buffer.position();
int val;
if (pos >= buffer.limit()) {
val = -1;
} else {
val = buffer.get(pos);
}
String currentBufferState =
String.format("%d at pos: %d, lim: %d", val, pos, buffer.limit());
sb.append(String.format(
"block: %d, pos: %d (CBuf: %s)%n",
blockNumber(), absolute(),
currentBufferState));
sb.append("\n");
}
return sb.toString();
}
private void throwIfInvalidBuffer() {
checkState(buffer != null, "'buffer' must not be null");
}
}