blob: aca378e342a552592cf7f7410449e1d5dfe61b07 [file] [log] [blame]
package org.apache.airavata.mft.core;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.locks.ReentrantLock;
/**
* A Thread safe byte buffer bridging bytes from a output stream to a input stream.
* This is an alternative for {@link CircularStreamingBuffer} to avoid synchronization overhead among the read thread
* and the write thread. This has a separate input and output stream that should be utilized by two different threads.
* Bytes are copied through two internal byte arrays. Always one array is dedicated to writes and another for reads. Once
* the write buffer is full and read buffer is empty, read and write threads swap the buffers. This is the only placed where
* synchronization is enforced.
*/
public class DoubleStreamingBuffer {
/*
Size of the internal arrays
*/
private int bufferSize = 2048;
/*
Internal buffers
*/
private final byte[] buffer1 = new byte[bufferSize];
private final byte[] buffer2 = new byte[bufferSize];
private OutputStream outputStream = new DoubleStreamingBuffer.DSBOutputStream();
private InputStream inputStream = new DoubleStreamingBuffer.DSBInputStream();
private CyclicBarrier barrier = new CyclicBarrier(2);
/*
Remaining bytes in each buffer available for read. Read thread subtracts the count once read and write threads
increases the count for writes
*/
private int buf1Remain = 0;
private int buf2Remain = 0;
private ReentrantLock buffer1Lock = new ReentrantLock();
private ReentrantLock buffer2Lock = new ReentrantLock();
boolean readBuffer1 = true;
boolean doneWrite = false;
int readPoint = 0;
boolean barrierPassed = false;
private long processedBytes = 0L;
public class DSBOutputStream extends OutputStream {
@Override
public void close() throws IOException {
doneWrite = true;
if (readBuffer1) {
buffer2Lock.unlock();
} else {
buffer1Lock.unlock();
}
try {
barrier.await();
} catch (Exception e) {
throw new IOException();
}
}
@Override
public void write(int b) throws IOException {
if (!barrierPassed) {
try {
if (readBuffer1) {
buffer2Lock.lock();
} else {
buffer1Lock.lock();
}
// wait for reader to enter into read block for the first time
barrier.await();
barrierPassed = true;
} catch (Exception e) {
throw new IOException(e);
}
}
//System.out.println("Write " + readBuffer1 + " " + buf1Remain + " " + buf2Remain);
if (readBuffer1) {
if (buf2Remain < bufferSize) {
buffer2[buf2Remain] = (byte)b;
buf2Remain ++;
} else {
barrier.reset();
buffer2Lock.unlock();
buffer1Lock.lock();
try {
// Wait for reader to move into next buffer
barrier.await();
} catch (Exception e) {
throw new IOException();
}
write(b);
}
} else {
if (buf1Remain < bufferSize) {
buffer1[buf1Remain] = (byte)b;
buf1Remain++;
} else {
barrier.reset();
buffer1Lock.unlock();
buffer2Lock.lock();
try {
// Wait for reader to move into next buffer
barrier.await();
} catch (Exception e) {
throw new IOException();
}
write(b);
}
}
}
}
public class DSBInputStream extends InputStream {
@Override
public int read() throws IOException {
if (!barrierPassed) {
try {
if (readBuffer1) {
buffer1Lock.lock();
} else {
buffer2Lock.lock();
}
// wait for writer to enter into read block for the first time
barrier.await();
barrierPassed = true;
} catch (Exception e) {
throw new IOException(e);
}
}
//System.out.println("Read " + readBuffer1 + " " + buf1Remain + " " + buf2Remain);
if (readBuffer1) {
if (buf1Remain > 0) {
buf1Remain --;
//System.out.println("Readval " + (buffer1[readPoint] & 0xff));
processedBytes++;
return buffer1[readPoint++] & 0xff;
} else {
if (doneWrite && buf2Remain <= 0) {
//System.out.println("Return -1");
return -1;
}
buffer2Lock.lock();
readBuffer1 = false;
buffer1Lock.unlock();
readPoint = 0;
try {
// Wait for writer to move into next buffer
barrier.await();
} catch (Exception e) {
throw new IOException();
}
//return read();
buf2Remain --;
processedBytes++;
return buffer2[readPoint++] & 0xff;
}
} else {
if (buf2Remain > 0) {
buf2Remain --;
//System.out.println("Readval " + (buffer2[readPoint] & 0xff));
processedBytes++;
return buffer2[readPoint++] & 0xff;
} else {
if (doneWrite && buf1Remain <= 0) {
//System.out.println("Return -1");
return -1;
}
buffer1Lock.lock();
readBuffer1 = true;
buffer2Lock.unlock();
readPoint = 0;
try {
// Wait for writer to move into next buffer
barrier.await();
} catch (Exception e) {
throw new IOException();
}
//return read();
buf1Remain --;
processedBytes++;
return buffer1[readPoint++] & 0xff;
}
}
}
}
public OutputStream getOutputStream() {
return outputStream;
}
public InputStream getInputStream() {
return inputStream;
}
public long getProcessedBytes() {
return processedBytes;
}
}