blob: 4eab1905594ae1a5dd20678fedbe5789aee1f266 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.util;
import org.apache.log4j.Logger;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
/**
* Use a private buffer for the read/write/seek operations of the RandomAccessFile
* to reduce the number of disk operations.
*
* S C L/E
* Buffer: |----+---|
* File: +---------------D-------------------+
* S. startPosition - file position of start of the buffer.
* C. currentPosition - file position of current pointer in the buffer.
* L. validLastPosition - file position of the last byte in the buffer. (This is same as
* endPosition of the buffer. Except for the last file block).
* S-C--L--E
* (A)
* Buffer: |-+--+--|
* File: +-----------------------------------D
* E. endPosition() - end file position of the current buffer.
* D. DiskPosition - Position in the file pointed by underlying RandomAccessFile.
* When reading from the file, diskPosition aligns with startPosition. When writing to
* the file, diskPosition(D) aligns with validLastPosition(L/E).
*
* A. AvailableSpace - Space between validLastPosition(L) and EndPosition(E)
* is the available space in buffer, when writing/appending records
* into the buffer/file.
*
* Note: Based on BufferedRandomAccessFile implementation in Apache/Cassandra.
* - adopted from org.apache.cassandra.io
* Copyright: 2015-2019 The Apache Software Foundation
* Home page: http://cassandra.apache.org/
* License: http://www.apache.org/licenses/LICENSE-2.0
*/
public final class BufferedRandomAccessFile extends RandomAccessFile {
private static final Logger LOG = Logger.getLogger(BufferedRandomAccessFile.class);
static final int DEFAULT_BUFFER_SIZE = (1 << 16); // 64K buffer
static final int BUFFER_BOUNDARY_MASK = -DEFAULT_BUFFER_SIZE;
private int capacity;
private ByteBuffer dataBuffer;
private long startPosition = 0L;
private long currentPosition = 0L;
private long validLastPosition = 0L;
private long diskPosition = 0L;
private boolean isDirty = false;
private boolean isClosed = false;
private boolean isEOF = false;
/**
*
* @param file - file name
* @param mode - "r" for read only; "rw" for read write
* @throws IOException
*/
public BufferedRandomAccessFile(File file, String mode) throws IOException {
super(file, mode);
this.init(DEFAULT_BUFFER_SIZE);
}
/**
*
* @param file - file name
* @param mode - "r" for read only; "rw" for read write
* @param size - size/capacity of the buffer.
* @throws IOException
*/
public BufferedRandomAccessFile(File file, String mode, int size) throws IOException {
super(file, mode);
this.init(size);
}
/**
*
* @param name - name of the file
* @param mode - "r" for read only; "rw" for read write
* @throws IOException
*/
public BufferedRandomAccessFile(String name, String mode) throws IOException {
super(name, mode);
this.init(DEFAULT_BUFFER_SIZE);
}
/**
*
* @param name - name of the file
* @param mode - "r" for read only; "rw" for read write
* @param size - size/capacity of the buffer
* @throws FileNotFoundException
*/
public BufferedRandomAccessFile(String name, String mode, int size) throws FileNotFoundException {
super(name, mode);
this.init(size);
}
/**
*
* @param size - capacity of the buffer
*/
private void init(int size) {
this.capacity = Math.max(DEFAULT_BUFFER_SIZE, size);
this.dataBuffer = ByteBuffer.wrap(new byte[this.capacity]);
}
/**
* Close the file, after flushing data in the buffer.
* @throws IOException
*/
@Override
public void close() throws IOException {
if (!isClosed) {
this.flush();
super.close();
this.isClosed = true;
}
}
/**
* If the file is writable, flush any bytes in the buffer that have not yet been written to disk.
* @throws IOException
*/
public void flush() throws IOException {
this.flushBuffer();
}
/**
* Flush any dirty bytes in the buffer to disk.
* @throws IOException
*/
private void flushBuffer() throws IOException {
if (this.isDirty) {
alignDiskPositionToBufferStartIfNeeded();
int len = (int) (this.currentPosition - this.startPosition);
super.write(this.dataBuffer.array(), 0, len);
this.diskPosition = this.currentPosition;
this.isDirty = false;
}
}
/**
* read ahead file contents to buffer.
* @return number of bytes filled
* @throws IOException
*/
private int fillBuffer() throws IOException {
int cnt = 0;
int bytesToRead = this.capacity;
// blocking read, until buffer is filled or EOF reached
while (bytesToRead > 0) {
int n = super.read(this.dataBuffer.array(), cnt, bytesToRead);
if (n < 0) {
break;
}
cnt += n;
bytesToRead -= n;
}
this.isEOF = (cnt < this.dataBuffer.array().length);
this.diskPosition += cnt;
return cnt;
}
/**
* If the diskPosition differs from the startPosition, flush the data in the buffer
* and realign/fill the buffer at startPosition.
* @throws IOException
*/
private void alignDiskPositionToBufferStartIfNeeded() throws IOException {
if (this.diskPosition != this.startPosition) {
super.seek(this.startPosition);
this.diskPosition = this.startPosition;
}
}
/**
* If the new seek position is in the buffer, adjust the currentPostion.
* If the new seek position is outside of the buffer, flush the contents to
* the file and reload the buffer corresponding to the position.
*
* We logically view the file as group blocks, where each block will perfectly
* fit into the buffer (except for the last block). Given a position to seek,
* we identify the block to be loaded using BUFFER_BOUNDARY_MASK.
*
* When dealing with the last block, we will have extra space between validLastPosition
* and endPosition of the buffer.
*
* @param pos - position in the file to be loaded to the buffer.
* @throws IOException
*/
@Override
public void seek(long pos) throws IOException {
if (pos >= this.validLastPosition || pos < this.startPosition) {
// seeking outside of current buffer -- flush and read
this.flushBuffer();
this.startPosition = pos & BUFFER_BOUNDARY_MASK; // start at BuffSz boundary
alignDiskPositionToBufferStartIfNeeded();
int n = this.fillBuffer();
this.validLastPosition = this.startPosition + (long) n;
} else {
// seeking inside current buffer -- no read required
if (pos < this.currentPosition) {
// if seeking backwards, flush buffer.
this.flushBuffer();
}
}
this.currentPosition = pos;
}
/**
* @return current file position
*/
@Override
public long getFilePointer() {
return this.currentPosition;
}
/**
* Returns the length of the file, depending on whether buffer has more data (to be flushed).
* @return - length of the file (including data yet to be flushed to the file).
* @throws IOException
*/
@Override
public long length() throws IOException {
return Math.max(this.currentPosition, super.length());
}
/**
* @return whether currentPosition has reached the end of valid buffer.
*/
private boolean endOfBufferReached() {
return this.currentPosition >= this.validLastPosition;
}
/**
* Load a new data block. Returns false, when EOF is reached.
* @return - whether new data block was loaded or not
* @throws IOException
*/
private boolean loadNewBlockToBuffer() throws IOException {
if (this.isEOF) {
return false;
}
// read next block into buffer
this.seek(this.currentPosition);
// if currentPosition is at start, EOF has been reached
return this.currentPosition != this.validLastPosition;
}
/**
* @return - returns a byte as an integer.
* @throws IOException
*/
@Override
public int read() throws IOException {
if (endOfBufferReached()) {
if (!loadNewBlockToBuffer()) {
return -1;
}
}
byte res = this.dataBuffer.array()[(int) (this.currentPosition - this.startPosition)];
this.currentPosition++;
return ((int) res) & 0xFF; // convert byte -> int
}
/**
* @param b - byte array into which to read data.
* @return - returns number of bytes read.
* @throws IOException
*/
@Override
public int read(byte[] b) throws IOException {
return this.read(b, 0, b.length);
}
/**
* Read specified number of bytes into given array starting at given offset.
* @param b - byte array
* @param off - start offset
* @param len - length of bytes to be read
* @return - number of bytes read.
* @throws IOException
*/
@Override
public int read(byte[] b, int off, int len) throws IOException {
if (endOfBufferReached()) {
if (!loadNewBlockToBuffer()) {
return -1;
}
}
// copy data from buffer
len = Math.min(len, (int) (this.validLastPosition - this.currentPosition));
int buffOff = (int) (this.currentPosition - this.startPosition);
System.arraycopy(this.dataBuffer.array(), buffOff, b, off, len);
this.currentPosition += len;
return len;
}
/**
* @return endPosition of the buffer. For the last file block, this may not be a valid position.
*/
private long endPosition() {
return this.startPosition + this.capacity;
}
/**
* @return - whether space is available at the end of the buffer.
*/
private boolean spaceAvailableInBuffer() {
return (this.isEOF && (this.validLastPosition < this.endPosition()));
}
/**
* write a byte to the buffer/file.
* @param v - value to be written
* @throws IOException
*/
@Override
public void write(int v) throws IOException {
byte [] b = new byte[1];
b[0] = (byte) v;
this.write(b, 0, b.length);
}
/**
* write an array of bytes to the buffer/file.
* @param b - byte array with data to be written
* @throws IOException
*/
@Override
public void write(byte[] b) throws IOException {
this.write(b, 0, b.length);
}
/**
* Write specified number of bytes into buffer/file, with given starting offset and length.
* @param b - byte array with data to be written
* @param off - starting offset.
* @param len - length of bytes to be written
* @throws IOException
*/
@Override
public void write(byte[] b, int off, int len) throws IOException {
// As all data may not fit into the buffer, more than one write would be required.
while (len > 0) {
int n = this.writeAtMost(b, off, len);
off += n;
len -= n;
this.isDirty = true;
}
}
/**
* If space is available at the end of the buffer, start using it. Otherwise,
* flush the unwritten data into the file and load the buffer corresponding to startPosition.
* @throws IOException
*/
private void expandBufferToCapacityIfNeeded() throws IOException {
if (spaceAvailableInBuffer()) {
// space available at end of buffer -- adjust validLastPosition
this.validLastPosition = this.endPosition();
} else {
loadNewBlockToBuffer();
// appending to EOF, adjust validLastPosition.
if (this.currentPosition == this.validLastPosition) {
this.validLastPosition = this.endPosition();
}
}
}
/**
* Given a byte array, offset in the array and length of bytes to be written,
* update the buffer/file.
* @param b - byte array of data to be written
* @param off - starting offset.
* @param len - length of bytes to be written
* @return - number of bytes written
* @throws IOException
*/
private int writeAtMost(byte[] b, int off, int len) throws IOException {
if (endOfBufferReached()) {
expandBufferToCapacityIfNeeded();
}
// copy data to buffer, until all data is copied or to buffer capacity.
len = Math.min(len, (int) (this.validLastPosition - this.currentPosition));
int buffOff = (int) (this.currentPosition - this.startPosition);
System.arraycopy(b, off, this.dataBuffer.array(), buffOff, len);
this.currentPosition += len;
return len;
}
}