blob: a7db4dfa188c4991a9b7de3fd93677c0be2e699c [file] [log] [blame]
/*
*/
package org.apache.tomcat.lite.io;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
/**
* Buffered, non-blocking ByteChannel.
*
* write() data will be added to the buffer. Call startSending() to
* flush.
*
*
*
* - you can use it as a normal non-blocking ByteChannel.
* - you can call getRead
*
* Very different from MINA IoFilters, also much lower level.
*
*
* @author Costin Manolache
*/
public abstract class IOChannel implements ByteChannel, IOConnector.DataReceivedCallback,
IOConnector.DataFlushedCallback {
/**
* If this channel wraps another channel - for example a socket.
* Will be null if this is the 'root' channel - a socket, memory.
*/
protected IOChannel net;
/**
* Set with another channel layered on top of the current channel.
*/
protected IOChannel head;
protected String id;
/**
* A string that can be parsed to extract the target.
* host:port for normal sockets
*/
protected CharSequence target;
/**
* Connector that created the channel.
*/
protected IOConnector connector;
/**
* Callbacks. Will be moved if a new head is inserted.
*/
protected IOConnector.ConnectedCallback connectedCallback;
/**
* Will be called if any data is received.
* Will also be called on close. Close with lastException set indicates
* an error condition.
*/
protected IOConnector.DataReceivedCallback dataReceivedCallback;
/**
* Out data is buffered, then sent with startSending.
* This callback indicates the data has been sent. Can be used
* to implement blocking flush.
*/
protected IOConnector.DataFlushedCallback dataFlushedCallback;
// Last activity timestamp.
// TODO: update and use it ( placeholder )
public long ts;
/**
* If an async exception happens.
*/
protected Throwable lastException;
protected IOChannel() {
}
public void setConnectedCallback(IOConnector.ConnectedCallback connectedCallback) {
this.connectedCallback = connectedCallback;
}
public void setDataReceivedCallback(IOConnector.DataReceivedCallback dataReceivedCallback) {
this.dataReceivedCallback = dataReceivedCallback;
}
/**
* Callback called when the bottom ( OS ) channel has finished flushing.
*
* @param dataFlushedCallback
*/
public void setDataFlushedCallback(IOConnector.DataFlushedCallback dataFlushedCallback) {
this.dataFlushedCallback = dataFlushedCallback;
}
// Input
public abstract IOBuffer getIn();
// Output
public abstract IOBuffer getOut();
/**
* From downstream ( NET ). Pass it to the next channel.
*/
public void handleReceived(IOChannel net) throws IOException {
sendHandleReceivedCallback();
}
/**
* Called from lower layer (NET) when the last flush is
* done and all buffers have been sent to OS ( or
* intended recipient ).
*
* Will call the callback or next filter, may do additional
* processing.
*
* @throws IOException
*/
public void handleFlushed(IOChannel net) throws IOException {
sendHandleFlushedCallback();
}
private void sendHandleFlushedCallback() throws IOException {
try {
if (dataFlushedCallback != null) {
dataFlushedCallback.handleFlushed(this);
}
if (head != null) {
head.handleFlushed(this);
}
} catch (Throwable t) {
close();
if (t instanceof IOException) {
throw (IOException) t;
} else {
throw new WrappedException("Error in handleFlushed", t);
}
}
}
/**
* Notify next channel or callback that data has been received.
* Called after a lower channel gets more data ( in the IOThread
* for example ).
*
* Also called when closed stream is detected. Can be called
* to just force upper layers to check for data.
*/
public void sendHandleReceivedCallback() throws IOException {
try {
if (dataReceivedCallback != null) {
dataReceivedCallback.handleReceived(this);
}
if (head != null) {
head.handleReceived(this);
}
} catch (Throwable t) {
t.printStackTrace();
try {
close();
} catch(Throwable t2) {
t2.printStackTrace();
}
if (t instanceof IOException) {
throw (IOException) t;
} else {
throw new WrappedException(t);
}
}
}
/**
* Return last IO exception.
*
* The channel is async, exceptions can happen at any time.
* The normal callback will be called ( connected, received ), it
* should check if the channel is closed and the exception.
*/
public Throwable lastException() {
return lastException;
}
public void close() throws IOException {
shutdownOutput();
// Should it read the buffers ?
if (getIn() == null || getIn().isAppendClosed()) {
return;
} else {
getIn().close();
sendHandleReceivedCallback();
}
getIn().hasDataLock.signal(getIn());
}
public boolean isOpen() {
return getIn() != null &&
getOut() != null &&
!getIn().isAppendClosed() && !getOut().isAppendClosed();
}
public void shutdownOutput() throws IOException {
if (getOut() == null || getOut().isAppendClosed()) {
return;
} else {
getOut().close();
startSending();
}
}
public void setSink(IOChannel previous) throws IOException {
this.net = previous;
}
public IOChannel getSink() {
return net;
}
// Chaining/filtering
/**
* Called to add an filter after the current channel, for
* example set SSL on top of a socket channel.
*
* The 'next' channel will have the received/flushed callbacks
* of the current channel. The current channel's callbacks will
* be reset.
*
* "Head" is from STREAMS.
*
* @throws IOException
*/
public IOChannel setHead(IOChannel head) throws IOException {
this.head = head;
head.setSink(this);
// TODO: do we want to migrate them automatically ?
head.setDataReceivedCallback(dataReceivedCallback);
head.setDataFlushedCallback(dataFlushedCallback);
// app.setClosedCallback(closedCallback);
dataReceivedCallback = null;
dataFlushedCallback = null;
return this;
}
public IOChannel getFirst() {
IOChannel first = this;
while (true) {
if (!(first instanceof IOChannel)) {
return first;
}
IOChannel before = ((IOChannel) first).getSink();
if (before == null) {
return first;
} else {
first = before;
}
}
}
// Socket support
public void readInterest(boolean b) throws IOException {
if (net != null) {
net.readInterest(b);
}
}
// Helpers
public int read(ByteBuffer bb) throws IOException {
return getIn().read(bb);
}
public int readNonBlocking(ByteBuffer bb) throws IOException {
return getIn().read(bb);
}
public void waitFlush(long timeMs) throws IOException {
return;
}
public int readBlocking(ByteBuffer bb, long timeMs) throws IOException {
getIn().waitData(timeMs);
return getIn().read(bb);
}
/**
* Capture all output in a buffer.
*/
public BBuffer readAll(BBuffer chunk, long to)
throws IOException {
if (chunk == null) {
chunk = BBuffer.allocate();
}
while (true) {
getIn().waitData(to);
BBucket next = getIn().peekFirst();
if (getIn().isClosedAndEmpty() && next == null) {
return chunk;
}
if (next == null) {
continue; // false positive
}
chunk.append(next.array(), next.position(), next.remaining());
getIn().advance(next.remaining());
}
}
public int write(ByteBuffer bb) throws IOException {
return getOut().write(bb);
}
public void write(byte[] data) throws IOException {
getOut().append(data, 0, data.length);
}
public void write(String string) throws IOException {
write(string.getBytes());
}
/**
* Send data in out to the intended recipient.
* This is not blocking.
*/
public abstract void startSending() throws IOException;
public void setId(String id) {
this.id = id;
}
public String getId() {
return id;
}
public CharSequence getTarget() {
if (net != null) {
return net.getTarget();
}
return target;
}
public void setTarget(CharSequence target) {
this.target = target;
}
public static final String ATT_REMOTE_HOSTNAME = "RemoteHostname";
public static final String ATT_LOCAL_HOSTNAME = "LocalHostname";
public static final String ATT_REMOTE_PORT = "RemotePort";
public static final String ATT_LOCAL_PORT = "LocalPort";
public static final String ATT_LOCAL_ADDRESS = "LocalAddress";
public static final String ATT_REMOTE_ADDRESS = "RemoteAddress";
public Object getAttribute(String name) {
if (net != null) {
return net.getAttribute(name);
}
return null;
}
}