blob: 71fab6116f9dc80938793c12f9048a5bdff682aa [file] [log] [blame]
package org.apache.cassandra.io;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import static org.junit.Assert.*;
import org.apache.cassandra.io.util.ChannelProxy;
import org.apache.cassandra.io.util.FileMark;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.io.util.SequentialWriter;
public class RandomAccessReaderTest
{
@Test
public void testReadFully() throws IOException
{
final File f = File.createTempFile("testReadFully", "1");
final String expected = "The quick brown fox jumps over the lazy dog";
SequentialWriter writer = SequentialWriter.open(f);
writer.write(expected.getBytes());
writer.finish();
assert f.exists();
ChannelProxy channel = new ChannelProxy(f);
RandomAccessReader reader = RandomAccessReader.open(channel);
assertEquals(f.getAbsolutePath(), reader.getPath());
assertEquals(expected.length(), reader.length());
byte[] b = new byte[expected.length()];
reader.readFully(b);
assertEquals(expected, new String(b));
assertTrue(reader.isEOF());
assertEquals(0, reader.bytesRemaining());
reader.close();
channel.close();
}
@Test
public void testReadBytes() throws IOException
{
File f = File.createTempFile("testReadBytes", "1");
final String expected = "The quick brown fox jumps over the lazy dog";
SequentialWriter writer = SequentialWriter.open(f);
writer.write(expected.getBytes());
writer.finish();
assert f.exists();
ChannelProxy channel = new ChannelProxy(f);
RandomAccessReader reader = RandomAccessReader.open(channel);
assertEquals(f.getAbsolutePath(), reader.getPath());
assertEquals(expected.length(), reader.length());
ByteBuffer b = reader.readBytes(expected.length());
assertEquals(expected, new String(b.array(), Charset.forName("UTF-8")));
assertTrue(reader.isEOF());
assertEquals(0, reader.bytesRemaining());
reader.close();
channel.close();
}
@Test
public void testReset() throws IOException
{
File f = File.createTempFile("testMark", "1");
final String expected = "The quick brown fox jumps over the lazy dog";
final int numIterations = 10;
SequentialWriter writer = SequentialWriter.open(f);
for (int i = 0; i < numIterations; i++)
writer.write(expected.getBytes());
writer.finish();
assert f.exists();
ChannelProxy channel = new ChannelProxy(f);
RandomAccessReader reader = RandomAccessReader.open(channel);
assertEquals(expected.length() * numIterations, reader.length());
ByteBuffer b = reader.readBytes(expected.length());
assertEquals(expected, new String(b.array(), Charset.forName("UTF-8")));
assertFalse(reader.isEOF());
assertEquals((numIterations - 1) * expected.length(), reader.bytesRemaining());
FileMark mark = reader.mark();
assertEquals(0, reader.bytesPastMark());
assertEquals(0, reader.bytesPastMark(mark));
for (int i = 0; i < (numIterations - 1); i++)
{
b = reader.readBytes(expected.length());
assertEquals(expected, new String(b.array(), Charset.forName("UTF-8")));
}
assertTrue(reader.isEOF());
assertEquals(expected.length() * (numIterations -1), reader.bytesPastMark());
assertEquals(expected.length() * (numIterations - 1), reader.bytesPastMark(mark));
reader.reset(mark);
assertEquals(0, reader.bytesPastMark());
assertEquals(0, reader.bytesPastMark(mark));
assertFalse(reader.isEOF());
for (int i = 0; i < (numIterations - 1); i++)
{
b = reader.readBytes(expected.length());
assertEquals(expected, new String(b.array(), Charset.forName("UTF-8")));
}
reader.reset();
assertEquals(0, reader.bytesPastMark());
assertEquals(0, reader.bytesPastMark(mark));
assertFalse(reader.isEOF());
for (int i = 0; i < (numIterations - 1); i++)
{
b = reader.readBytes(expected.length());
assertEquals(expected, new String(b.array(), Charset.forName("UTF-8")));
}
assertTrue(reader.isEOF());
reader.close();
channel.close();
}
@Test
public void testSeekSingleThread() throws IOException, InterruptedException
{
testSeek(1);
}
@Test
public void testSeekMultipleThreads() throws IOException, InterruptedException
{
testSeek(10);
}
private void testSeek(int numThreads) throws IOException, InterruptedException
{
final File f = File.createTempFile("testMark", "1");
final String[] expected = new String[10];
int len = 0;
for (int i = 0; i < expected.length; i++)
{
expected[i] = UUID.randomUUID().toString();
len += expected[i].length();
}
final int totalLength = len;
SequentialWriter writer = SequentialWriter.open(f);
for (int i = 0; i < expected.length; i++)
writer.write(expected[i].getBytes());
writer.finish();
assert f.exists();
final ChannelProxy channel = new ChannelProxy(f);
final Runnable worker = new Runnable() {
@Override
public void run()
{
try
{
RandomAccessReader reader = RandomAccessReader.open(channel);
assertEquals(totalLength, reader.length());
ByteBuffer b = reader.readBytes(expected[0].length());
assertEquals(expected[0], new String(b.array(), Charset.forName("UTF-8")));
assertFalse(reader.isEOF());
assertEquals(totalLength - expected[0].length(), reader.bytesRemaining());
long filePointer = reader.getFilePointer();
for (int i = 1; i < expected.length; i++)
{
b = reader.readBytes(expected[i].length());
assertEquals(expected[i], new String(b.array(), Charset.forName("UTF-8")));
}
assertTrue(reader.isEOF());
reader.seek(filePointer);
assertFalse(reader.isEOF());
for (int i = 1; i < expected.length; i++)
{
b = reader.readBytes(expected[i].length());
assertEquals(expected[i], new String(b.array(), Charset.forName("UTF-8")));
}
assertTrue(reader.isEOF());
reader.close();
}
catch (Exception ex)
{
ex.printStackTrace();
fail(ex.getMessage());
}
}
};
if(numThreads == 1)
{
worker.run();
return;
}
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
for (int i = 0; i < numThreads; i++)
executor.submit(worker);
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
channel.close();
}
}