blob: 360d262ce6eaa0f47dd5b34904457997f1a4e79b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.cassandra.io.util;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.SyncUtil;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import static org.apache.cassandra.Util.expectEOF;
import static org.apache.cassandra.Util.expectException;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
import org.junit.Test;
public class BufferedRandomAccessFileTest
{
@Test
public void testReadAndWrite() throws Exception
{
SequentialWriter w = createTempFile("braf");
ChannelProxy channel = new ChannelProxy(w.getPath());
// writting string of data to the file
byte[] data = "Hello".getBytes();
w.write(data);
assertEquals(data.length, w.length());
assertEquals(data.length, w.position());
w.sync();
// reading small amount of data from file, this is handled by initial buffer
RandomAccessReader r = RandomAccessReader.open(channel);
byte[] buffer = new byte[data.length];
assertEquals(data.length, r.read(buffer));
assertTrue(Arrays.equals(buffer, data)); // we read exactly what we wrote
assertEquals(r.read(), -1); // nothing more to read EOF
assert r.bytesRemaining() == 0 && r.isEOF();
r.close();
// writing buffer bigger than page size, which will trigger reBuffer()
byte[] bigData = new byte[RandomAccessReader.DEFAULT_BUFFER_SIZE + 10];
for (int i = 0; i < bigData.length; i++)
bigData[i] = 'd';
long initialPosition = w.position();
w.write(bigData); // writing data
assertEquals(w.position(), initialPosition + bigData.length);
assertEquals(w.length(), initialPosition + bigData.length); // file size should equals to last position
w.sync();
r = RandomAccessReader.open(channel); // re-opening file in read-only mode
// reading written buffer
r.seek(initialPosition); // back to initial (before write) position
data = new byte[bigData.length];
long sizeRead = 0;
for (int i = 0; i < data.length; i++)
{
data[i] = (byte) r.read();
sizeRead++;
}
assertEquals(sizeRead, data.length); // read exactly data.length bytes
assertEquals(r.getFilePointer(), initialPosition + data.length);
assertEquals(r.length(), initialPosition + bigData.length);
assertTrue(Arrays.equals(bigData, data));
assertTrue(r.bytesRemaining() == 0 && r.isEOF()); // we are at the of the file
// test readBytes(int) method
r.seek(0);
ByteBuffer fileContent = ByteBufferUtil.read(r, (int) w.length());
assertEquals(fileContent.limit(), w.length());
assert ByteBufferUtil.string(fileContent).equals("Hello" + new String(bigData));
// read the same buffer but using readFully(int)
data = new byte[bigData.length];
r.seek(initialPosition);
r.readFully(data);
assert r.bytesRemaining() == 0 && r.isEOF(); // we should be at EOF
assertTrue(Arrays.equals(bigData, data));
// try to read past mark (all methods should return -1)
data = new byte[10];
assertEquals(r.read(), -1);
assertEquals(r.read(data), -1);
assertEquals(r.read(data, 0, data.length), -1);
// test read(byte[], int, int)
r.seek(0);
data = new byte[20];
assertEquals(15, r.read(data, 0, 15));
assertTrue(new String(data).contains("Hellodddddddddd"));
for (int i = 16; i < data.length; i++)
{
assert data[i] == 0;
}
w.finish();
r.close();
channel.close();
}
@Test
public void testReadAndWriteOnCapacity() throws IOException
{
File tmpFile = File.createTempFile("readtest", "bin");
SequentialWriter w = SequentialWriter.open(tmpFile);
// Fully write the file and sync..
byte[] in = generateByteArray(RandomAccessReader.DEFAULT_BUFFER_SIZE);
w.write(in);
ChannelProxy channel = new ChannelProxy(w.getPath());
RandomAccessReader r = RandomAccessReader.open(channel);
// Read it into a same size array.
byte[] out = new byte[RandomAccessReader.DEFAULT_BUFFER_SIZE];
r.read(out);
// Cannot read any more.
int negone = r.read();
assert negone == -1 : "We read past the end of the file, should have gotten EOF -1. Instead, " + negone;
r.close();
w.finish();
channel.close();
}
@Test
public void testLength() throws IOException
{
File tmpFile = File.createTempFile("lengthtest", "bin");
SequentialWriter w = SequentialWriter.open(tmpFile);
assertEquals(0, w.length());
// write a chunk smaller then our buffer, so will not be flushed
// to disk
byte[] lessThenBuffer = generateByteArray(RandomAccessReader.DEFAULT_BUFFER_SIZE / 2);
w.write(lessThenBuffer);
assertEquals(lessThenBuffer.length, w.length());
// sync the data and check length
w.sync();
assertEquals(lessThenBuffer.length, w.length());
// write more then the buffer can hold and check length
byte[] biggerThenBuffer = generateByteArray(RandomAccessReader.DEFAULT_BUFFER_SIZE * 2);
w.write(biggerThenBuffer);
assertEquals(biggerThenBuffer.length + lessThenBuffer.length, w.length());
w.finish();
// will use cachedlength
try (ChannelProxy channel = new ChannelProxy(tmpFile);
RandomAccessReader r = RandomAccessReader.open(channel))
{
assertEquals(lessThenBuffer.length + biggerThenBuffer.length, r.length());
}
}
@Test
public void testReadBytes() throws IOException
{
final SequentialWriter w = createTempFile("brafReadBytes");
byte[] data = new byte[RandomAccessReader.DEFAULT_BUFFER_SIZE + 10];
for (int i = 0; i < data.length; i++)
{
data[i] = 'c';
}
w.write(data);
w.sync();
final ChannelProxy channel = new ChannelProxy(w.getPath());
final RandomAccessReader r = RandomAccessReader.open(channel);
ByteBuffer content = ByteBufferUtil.read(r, (int) r.length());
// after reading whole file we should be at EOF
assertEquals(0, ByteBufferUtil.compare(content, data));
assert r.bytesRemaining() == 0 && r.isEOF();
r.seek(0);
content = ByteBufferUtil.read(r, 10); // reading first 10 bytes
assertEquals(ByteBufferUtil.compare(content, "cccccccccc".getBytes()), 0);
assertEquals(r.bytesRemaining(), r.length() - content.limit());
// trying to read more than file has right now
expectEOF(() -> ByteBufferUtil.read(r, (int) r.length() + 10));
w.finish();
r.close();
channel.close();
}
@Test
public void testSeek() throws Exception
{
SequentialWriter w = createTempFile("brafSeek");
byte[] data = generateByteArray(RandomAccessReader.DEFAULT_BUFFER_SIZE + 20);
w.write(data);
w.finish();
final ChannelProxy channel = new ChannelProxy(w.getPath());
final RandomAccessReader file = RandomAccessReader.open(channel);
file.seek(0);
assertEquals(file.getFilePointer(), 0);
assertEquals(file.bytesRemaining(), file.length());
file.seek(20);
assertEquals(file.getFilePointer(), 20);
assertEquals(file.bytesRemaining(), file.length() - 20);
// trying to seek past the end of the file should produce EOFException
expectException(() -> { file.seek(file.length() + 30); return null; }, IllegalArgumentException.class);
expectException(() -> { file.seek(-1); return null; }, IllegalArgumentException.class); // throws IllegalArgumentException
file.close();
channel.close();
}
@Test
public void testSkipBytes() throws IOException
{
SequentialWriter w = createTempFile("brafSkipBytes");
w.write(generateByteArray(RandomAccessReader.DEFAULT_BUFFER_SIZE * 2));
w.finish();
ChannelProxy channel = new ChannelProxy(w.getPath());
RandomAccessReader file = RandomAccessReader.open(channel);
file.seek(0); // back to the beginning of the file
assertEquals(file.skipBytes(10), 10);
assertEquals(file.bytesRemaining(), file.length() - 10);
int initialPosition = (int) file.getFilePointer();
// can't skip more than file size
assertEquals(file.skipBytes((int) file.length() + 10), file.length() - initialPosition);
assertEquals(file.getFilePointer(), file.length());
assert file.bytesRemaining() == 0 && file.isEOF();
file.seek(0);
// skipping negative amount should return 0
assertEquals(file.skipBytes(-1000), 0);
assertEquals(file.getFilePointer(), 0);
assertEquals(file.bytesRemaining(), file.length());
file.close();
channel.close();
}
@Test
public void testGetFilePointer() throws IOException
{
final SequentialWriter w = createTempFile("brafGetFilePointer");
assertEquals(w.position(), 0); // initial position should be 0
w.write(generateByteArray(20));
assertEquals(w.position(), 20); // position 20 after writing 20 bytes
w.sync();
ChannelProxy channel = new ChannelProxy(w.getPath());
RandomAccessReader r = RandomAccessReader.open(channel);
// position should change after skip bytes
r.seek(0);
r.skipBytes(15);
assertEquals(r.getFilePointer(), 15);
r.read();
assertEquals(r.getFilePointer(), 16);
r.read(new byte[4]);
assertEquals(r.getFilePointer(), 20);
w.finish();
r.close();
channel.close();
}
@Test
public void testGetPath() throws IOException
{
SequentialWriter file = createTempFile("brafGetPath");
assert file.getPath().contains("brafGetPath");
file.finish();
}
@Test
public void testIsEOF() throws IOException
{
for (int bufferSize : Arrays.asList(1, 2, 3, 5, 8, 64)) // smaller, equal, bigger buffer sizes
{
final byte[] target = new byte[32];
// single too-large read
for (final int offset : Arrays.asList(0, 8))
{
File file1 = writeTemporaryFile(new byte[16]);
try (final ChannelProxy channel = new ChannelProxy(file1);
final RandomAccessReader file = new RandomAccessReader.Builder(channel)
.bufferSize(bufferSize)
.build())
{
expectEOF(() -> { file.readFully(target, offset, 17); return null; });
}
}
// first read is ok but eventually EOFs
for (final int n : Arrays.asList(1, 2, 4, 8))
{
File file1 = writeTemporaryFile(new byte[16]);
try (final ChannelProxy channel = new ChannelProxy(file1);
final RandomAccessReader file = new RandomAccessReader.Builder(channel).bufferSize(bufferSize).build())
{
expectEOF(() -> {
while (true)
file.readFully(target, 0, n);
});
}
}
}
}
@Test
public void testNotEOF() throws IOException
{
try (final RandomAccessReader reader = RandomAccessReader.open(writeTemporaryFile(new byte[1])))
{
assertEquals(1, reader.read(new byte[2]));
}
}
@Test
public void testBytesRemaining() throws IOException
{
SequentialWriter w = createTempFile("brafBytesRemaining");
int toWrite = RandomAccessReader.DEFAULT_BUFFER_SIZE + 10;
w.write(generateByteArray(toWrite));
w.sync();
ChannelProxy channel = new ChannelProxy(w.getPath());
RandomAccessReader r = RandomAccessReader.open(channel);
assertEquals(r.bytesRemaining(), toWrite);
for (int i = 1; i <= r.length(); i++)
{
r.read();
assertEquals(r.bytesRemaining(), r.length() - i);
}
r.seek(0);
r.skipBytes(10);
assertEquals(r.bytesRemaining(), r.length() - 10);
w.finish();
r.close();
channel.close();
}
@Test
public void testBytesPastMark() throws IOException
{
File tmpFile = File.createTempFile("overflowtest", "bin");
tmpFile.deleteOnExit();
// Create the BRAF by filename instead of by file.
try (final RandomAccessReader r = RandomAccessReader.open(new File(tmpFile.getPath())))
{
assert tmpFile.getPath().equals(r.getPath());
// Create a mark and move the rw there.
final DataPosition mark = r.mark();
r.reset(mark);
// Expect this call to succeed.
r.bytesPastMark(mark);
}
}
@Test
public void testClose() throws IOException
{
final SequentialWriter w = createTempFile("brafClose");
byte[] data = generateByteArray(RandomAccessReader.DEFAULT_BUFFER_SIZE + 20);
w.write(data);
w.finish();
final RandomAccessReader r = RandomAccessReader.open(new File(w.getPath()));
r.close(); // closing to test read after close
expectException(() -> r.read(), NullPointerException.class);
//Used to throw ClosedChannelException, but now that it extends BDOSP it just NPEs on the buffer
//Writing to a BufferedOutputStream that is closed generates no error
//Going to allow the NPE to throw to catch as a bug any use after close. Notably it won't throw NPE for a
//write of a 0 length, but that is kind of a corner case
expectException(() -> { w.write(generateByteArray(1)); return null; }, NullPointerException.class);
try (RandomAccessReader copy = RandomAccessReader.open(new File(r.getPath())))
{
ByteBuffer contents = ByteBufferUtil.read(copy, (int) copy.length());
assertEquals(contents.limit(), data.length);
assertEquals(ByteBufferUtil.compare(contents, data), 0);
}
}
@Test
public void testMarkAndReset() throws IOException
{
SequentialWriter w = createTempFile("brafTestMark");
w.write(new byte[30]);
w.finish();
ChannelProxy channel = new ChannelProxy(w.getPath());
RandomAccessReader file = RandomAccessReader.open(channel);
file.seek(10);
DataPosition mark = file.mark();
file.seek(file.length());
assertTrue(file.isEOF());
file.reset();
assertEquals(file.bytesRemaining(), 20);
file.seek(file.length());
assertTrue(file.isEOF());
file.reset(mark);
assertEquals(file.bytesRemaining(), 20);
file.seek(file.length());
assertEquals(file.bytesPastMark(), 20);
assertEquals(file.bytesPastMark(mark), 20);
file.reset(mark);
assertEquals(file.bytesPastMark(), 0);
file.close();
channel.close();
}
@Test(expected = AssertionError.class)
public void testAssertionErrorWhenBytesPastMarkIsNegative() throws IOException
{
try (SequentialWriter w = createTempFile("brafAssertionErrorWhenBytesPastMarkIsNegative"))
{
w.write(new byte[30]);
w.flush();
try (ChannelProxy channel = new ChannelProxy(w.getPath());
RandomAccessReader r = RandomAccessReader.open(channel))
{
r.seek(10);
r.mark();
r.seek(0);
r.bytesPastMark();
}
}
}
@Test
public void testReadOnly() throws IOException
{
SequentialWriter file = createTempFile("brafReadOnlyTest");
byte[] data = new byte[20];
for (int i = 0; i < data.length; i++)
data[i] = 'c';
file.write(data);
file.sync(); // flushing file to the disk
// read-only copy of the file, with fixed file length
final RandomAccessReader copy = RandomAccessReader.open(new File(file.getPath()));
copy.seek(copy.length());
assertTrue(copy.bytesRemaining() == 0 && copy.isEOF());
// can't seek past the end of the file for read-only files
expectException(() -> { copy.seek(copy.length() + 1); return null; }, IllegalArgumentException.class);
copy.seek(0);
copy.skipBytes(5);
assertEquals(copy.bytesRemaining(), 15);
assertEquals(copy.getFilePointer(), 5);
assertTrue(!copy.isEOF());
copy.seek(0);
ByteBuffer contents = ByteBufferUtil.read(copy, (int) copy.length());
assertEquals(contents.limit(), copy.length());
assertTrue(ByteBufferUtil.compare(contents, data) == 0);
copy.seek(0);
int count = 0;
while (!copy.isEOF())
{
assertEquals((byte) copy.read(), 'c');
count++;
}
assertEquals(count, copy.length());
copy.seek(0);
byte[] content = new byte[10];
copy.read(content);
assertEquals(new String(content), "cccccccccc");
file.finish();
copy.close();
}
@Test (expected=IllegalArgumentException.class)
public void testSetNegativeLength() throws IOException, IllegalArgumentException
{
File tmpFile = File.createTempFile("set_negative_length", "bin");
try (SequentialWriter file = SequentialWriter.open(tmpFile))
{
file.truncate(-8L);
}
}
private SequentialWriter createTempFile(String name) throws IOException
{
File tempFile = File.createTempFile(name, null);
tempFile.deleteOnExit();
return SequentialWriter.open(tempFile);
}
private File writeTemporaryFile(byte[] data) throws IOException
{
File f = File.createTempFile("BRAFTestFile", null);
f.deleteOnExit();
FileOutputStream fout = new FileOutputStream(f);
fout.write(data);
SyncUtil.sync(fout);
fout.close();
return f;
}
private byte[] generateByteArray(int length)
{
byte[] arr = new byte[length];
for (int i = 0; i < length; i++)
arr[i] = 'a';
return arr;
}
}