blob: 9ec80becd98841c9f47a979eae791ee0ba6a6d42 [file] [log] [blame]
/* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tomcat.lite.io;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
import java.nio.channels.Channel;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.tomcat.lite.io.NioChannel.NioChannelCallback;
/**
* Abstract NIO/APR to avoid some of the complexity and allow more code
* sharing and experiments.
*
* SelectorThread provides non-blocking methods for read/write and generates
* callbacks using SelectorCallback. It has no buffers of its own.
*
* This is non-blocking, non-buffering and uses callbacks.
*
* @author Costin Manolache
*/
public class NioThread implements Runnable {
// ----------- IO handling -----------
protected long inactivityTimeout = 5000;
protected Thread selectorThread;
static Logger log = Logger.getLogger("NIO");
Selector selector;
// will be processed in the selector thread
List<NioChannel> readInterest = new ArrayList<NioChannel>();
List<NioChannel> writeInterest = new ArrayList<NioChannel>();
List<NioChannel> connectAcceptInterest = new ArrayList<NioChannel>();
List<NioChannel> updateCallback = new ArrayList<NioChannel>();
List<NioChannel> closeInterest = new LinkedList<NioChannel>();
List<Runnable> runnableInterest = new ArrayList<Runnable>();
// Statistics
AtomicInteger opened = new AtomicInteger();
AtomicInteger closed = new AtomicInteger();
AtomicInteger loops = new AtomicInteger();
AtomicInteger callbackCount = new AtomicInteger();
AtomicLong callbackTotalTime = new AtomicLong();
long maxCallbackTime = 0;
// actives are also stored in the Selector. This is only updated in the main
// thread
public ArrayList<NioChannel> active = new ArrayList<NioChannel>();
public static boolean debug = false;
boolean debugWakeup = false;
boolean running = true;
long lastWakeup = System.currentTimeMillis(); // last time we woke
long nextWakeup; // next scheduled wakeup
// Normally select will wait for the next time event - if it's
// too far in future, maxSleep will override it.
private long maxSleep = 600000;
long sleepTime = maxSleep;
// Never sleep less than minSleep. This defines the resulution for
// time events.
private long minSleep = 100;
boolean daemon = false;
// TODO: trace log - record all events with timestamps, replay
public NioThread(String name, boolean daemon) {
try {
selectorThread = (name == null) ? new Thread(this) :
new Thread(this, name);
selector = Selector.open();
// TODO: start it on-demand, close it when not in use
selectorThread.setDaemon(daemon);
this.daemon = daemon;
selectorThread.start();
} catch(IOException e) {
throw new RuntimeException(e);
}
}
/**
* Opened sockets, waiting for something ( close at least )
*/
public int getOpen() {
return opened.get();
}
/**
* Closed - we're done with them.
*/
public int getClosed() {
return closed.get();
}
public int getActive() {
return active.size();
}
public int getCallbacks() {
return callbackCount.get();
}
public long getMaxCallbackTime() {
return maxCallbackTime;
}
public long getAvgCallbackTime() {
int cnt = callbackCount.get();
if (cnt == 0) {
return 0;
}
return callbackTotalTime.get() / cnt;
}
/**
* How many times we looped
*/
public int getLoops() {
return loops.get();
}
public long getLastWakeup() {
return lastWakeup;
}
public long getTimeSinceLastWakeup() {
return System.currentTimeMillis() - lastWakeup;
}
/**
* Close all resources, stop accepting, stop the thread.
* The actual stop will happen in background.
*/
public void stop() {
running = false;
if (debug) {
log.info("Selector thread stop " + this);
}
selector.wakeup();
}
public void run() {
int sloops = 0;
if (debug) {
log.info("Start NIO thread, daemon=" + daemon);
}
while (running) {
// if we want timeouts - set here.
try {
loops.incrementAndGet();
// Check if new requests were added
processPending();
// Timers
long now = System.currentTimeMillis();
if (nextWakeup < now) {
// We don't want to iterate on every I/O
updateSleepTimeAndProcessTimeouts(now);
}
int selected = selector.select(sleepTime);
lastWakeup = System.currentTimeMillis();
long slept = lastWakeup - now;
if (debugWakeup && selected == 0) {
if (sleepTime < maxSleep - 1000) { // short wakeup
log.info("Wakeup " + selected + " " + slept
+ " " + sleepTime);
}
}
if (slept < 10 && selected == 0) {
if (sloops > 50) {
sloops = 0;
log.severe("Looping !");
resetSelector();
}
sloops++;
}
// handle events for existing req first.
if (selected != 0) {
sloops = 0;
int callbackCnt = 0;
Set<SelectionKey> sel = selector.selectedKeys();
Iterator<SelectionKey> i = sel.iterator();
while (i.hasNext()) {
callbackCnt++;
long beforeCallback = System.currentTimeMillis();
SelectionKey sk = i.next();
i.remove();
boolean valid = sk.isValid();
int readyOps = (valid) ? sk.readyOps() : 0;
NioChannel ch = (NioChannel) sk.attachment();
if (debugWakeup) {
log.info("Wakeup selCnt=" + selected + " slept=" + (lastWakeup - now) +
" ready: " + readyOps + " v=" +
sk.isValid() + " ch=" + ch);
}
if (ch == null) {
log.severe("Missing channel");
sk.cancel();
continue;
}
if (ch.selKey != sk) {
// if (ch.selKey != null) { // null if closed
log.severe("Invalid state, selKey doesn't match ");
ch.selKey = sk;
}
if (ch.channel != sk.channel()) {
ch.channel = sk.channel();
log.severe("Invalid state, channel doesn't match ");
}
if (!sk.isValid()) {
if (debug) {
log.info("!isValid, closed socket " + ch);
}
ch.close();
continue;
}
try {
int ready = sk.readyOps();
// callbacks
if (sk.isValid() && sk.isAcceptable()) {
handleAccept(ch, sk);
}
if (sk.isValid() && sk.isConnectable()) {
sk.interestOps(sk.interestOps() & ~SelectionKey.OP_CONNECT);
SocketChannel sc = (SocketChannel) sk.channel();
handleConnect(ch, sc);
}
if (sk.isValid() && sk.isWritable()) {
// Needs to be explicitely re-enabled by callback
// if more data.
sk.interestOps(sk.interestOps() & ~SelectionKey.OP_WRITE);
ch.writeInterest = false;
handleDataWriteable(ch);
}
if (sk.isValid() && sk.isReadable()) {
// Leave readable interest !
handleReadable(ch);
}
long callbackTime =
System.currentTimeMillis() - beforeCallback;
if (callbackTime > 250) {
log.warning("Callback too long ! ops=" + ready +
" time=" + callbackTime + " ch=" + ch +
" " + callbackCnt);
}
if (callbackTime > maxCallbackTime) {
maxCallbackTime = callbackTime;
}
callbackCount.incrementAndGet();
this.callbackTotalTime.addAndGet(callbackTime);
} catch (Throwable t) {
log.log(Level.SEVERE, "SelectorThread: Channel error, closing", t);
ch.lastException = t;
ch.close();
}
}
// All at once
sel.clear();
}
} catch (Throwable e) {
log.log(Level.SEVERE, "SelectorThread: Error in select", e);
}
} // while(running)
log.info("SelectorThread done");
}
private void log(String msg, int selected, long slept, SelectionKey sk, int readyOps) {
log.info(msg + " " + selected
+ " " + slept
+ " ready: " + readyOps + " "
+ sk.readyOps() + " " + sk);
}
private void resetSelector() throws IOException, ClosedChannelException {
// Let's close all sockets - one is bad, but we can't do much.
Set<SelectionKey> keys = selector.keys();
//Set<SelectionKey> keys = selector.keys();
ArrayList<NioChannel> oldCh = new ArrayList<NioChannel>();
ArrayList<Integer> interests = new ArrayList<Integer>();
for (SelectionKey k : keys) {
NioChannel cd = (NioChannel) k.attachment();
interests.add(k.interestOps());
oldCh.add(cd);
k.cancel();
}
selector.close();
selector = Selector.open();
for (int i = 0; i < oldCh.size(); i++) {
NioChannel selectorData = oldCh.get(i);
if (selectorData == null) {
continue;
}
int interest = interests.get(i);
if (selectorData.channel instanceof ServerSocketChannel) {
ServerSocketChannel socketChannel =
(ServerSocketChannel) selectorData.channel;
selectorData.selKey = socketChannel.register(selector, SelectionKey.OP_ACCEPT);
} else {
SocketChannel socketChannel =
(SocketChannel) selectorData.channel;
if (interest != 0) {
selectorData.selKey = socketChannel.register(selector,
interest);
}
}
}
}
private void handleReadable(NioChannel ch) throws IOException {
ch.lastReadResult = 0;
if (ch.callback != null) {
ch.callback.handleReadable(ch);
}
if (ch.lastReadResult != 0 && ch.readInterest && !ch.inClosed) {
log.warning("LOOP: read interest" +
" after incomplete read");
ch.close();
}
}
private void handleDataWriteable(NioChannel ch) throws IOException {
ch.lastWriteResult = 0;
if (ch.callback != null) {
ch.callback.handleWriteable(ch);
}
if (ch.lastWriteResult > 0 && ch.writeInterest) {
log.warning("SelectorThread: write interest" +
" after incomplete write, LOOP");
}
}
private void handleConnect(NioChannel ch, SocketChannel sc)
throws IOException, SocketException {
try {
if (!sc.finishConnect()) {
log.warning("handleConnected - finishConnect returns false");
}
ch.sel = this;
//sc.socket().setSoLinger(true, 0);
if (debug) {
log.info("connected() " + ch + " isConnected()=" + sc.isConnected() + " " +
sc.isConnectionPending());
}
readInterest(ch, true);
} catch (Throwable t) {
close(ch, t);
}
try {
if (ch.callback != null) {
ch.callback.handleConnected(ch);
}
} catch(Throwable t1) {
log.log(Level.WARNING, "Error in connect callback", t1);
}
}
private void handleAccept(NioChannel ch, SelectionKey sk)
throws IOException, ClosedChannelException {
SelectableChannel selc = sk.channel();
ServerSocketChannel ssc=(ServerSocketChannel)selc;
SocketChannel sockC = ssc.accept();
sockC.configureBlocking(false);
NioChannel acceptedChannel = new NioChannel(this);
acceptedChannel.selKey = sockC.register(selector,
SelectionKey.OP_READ,
acceptedChannel);
acceptedChannel.channel = sockC;
synchronized (active) {
active.add(acceptedChannel);
}
// Find the callback for the new socket
if (ch.callback != null) {
// TODO: use future !
try {
ch.callback.handleConnected(acceptedChannel);
} catch (Throwable t) {
log.log(Level.SEVERE, "SelectorThread: Channel error, closing ", t);
acceptedChannel.lastException = t;
acceptedChannel.close();
}
}
//sk.interestOps(sk.interestOps() | SelectionKey.OP_ACCEPT);
if (debug) {
log.info("handleAccept " + ch);
}
}
public void shutdownOutput(NioChannel ch) throws IOException {
Channel channel = ch.channel;
if (channel instanceof SocketChannel) {
SocketChannel sc = (SocketChannel) channel;
if (sc.isOpen() && sc.isConnected()) {
if (debug) {
log.info("Half shutdown " + ch);
}
sc.socket().shutdownOutput(); // TCP end to the other side
}
}
}
/**
* Called from the IO thread
*/
private void closeIOThread(NioChannel ch, boolean remove) {
SelectionKey sk = (SelectionKey) ch.selKey;
Channel channel = ch.channel;
try {
synchronized(closeInterest) {
if (ch.closeCalled) {
if (debug) {
log.severe("Close called 2x ");
}
return;
}
ch.closeCalled = true;
int o = opened.decrementAndGet();
if (debug) {
log.info("-------------> close: " + ch + " t=" + ch.lastException);
}
if (sk != null) {
if (sk.isValid()) {
sk.interestOps(0);
}
sk.cancel();
ch.selKey = null;
}
if (channel instanceof SocketChannel) {
SocketChannel sc = (SocketChannel) channel;
if (sc.isConnected()) {
if (debug) {
log.info("Close socket, opened=" + o);
}
try {
sc.socket().shutdownInput();
} catch(IOException io1) {
}
try {
sc.socket().shutdownOutput(); // TCP end to the other side
} catch(IOException io1) {
}
sc.socket().close();
}
}
channel.close();
closed.incrementAndGet();
if (ch.callback != null) {
ch.callback.handleClosed(ch);
}
// remove from active - false only if already removed
if (remove) {
synchronized (active) {
boolean removed = active.remove(ch);
}
}
}
} catch (IOException ex2) {
log.log(Level.SEVERE, "SelectorThread: Error closing socket ", ex2);
}
}
// --------------- Socket op abstractions ------------
public int readNonBlocking(NioChannel selectorData, ByteBuffer bb)
throws IOException {
try {
int off = bb.position();
int done = 0;
done = ((SocketChannel) selectorData.channel).read(bb);
if (debug) {
log.info("-------------readNB rd=" + done + " bb.limit=" +
bb.limit() + " pos=" + bb.position() + " " + selectorData);
}
if (done > 0) {
if (debug) {
if (!bb.isDirect()) {
String s = new String(bb.array(), off,
bb.position() - off);
log.info("Data:\n" + s);
} else {
log.info("Data: " + bb.toString());
}
}
selectorData.zeroReads = 0;
} else if (done < 0) {
if (debug) {
log.info("SelectorThread: EOF while reading " + selectorData);
}
} else {
// need more...
if (selectorData.lastReadResult == 0) {
selectorData.zeroReads++;
if (selectorData.zeroReads > 6) {
log.severe("LOOP 0 reading ");
selectorData.lastException = new IOException("Polling read");
selectorData.close();
return -1;
}
}
}
selectorData.lastReadResult = done;
return done;
} catch(IOException ex) {
if (debug) {
log.info("readNB error rd=" + -1 + " bblen=" +
(bb.limit() - bb.position()) + " " + selectorData + " " + ex);
}
// common case: other side closed the connection. No need for trace
if (ex.toString().indexOf("Connection reset by peer") < 0) {
ex.printStackTrace();
}
selectorData.lastException = ex;
selectorData.close();
return -1;
}
}
/**
* May be called from any thread
*/
public int writeNonBlocking(NioChannel selectorData, ByteBuffer bb)
throws IOException {
try {
if (debug) {
log.info("writeNB pos=" + bb.position() + " len=" +
(bb.limit() - bb.position()) + " " + selectorData);
if (!bb.isDirect()) {
String s = new String(bb.array(), bb.position(),
bb.limit() - bb.position());
log.info("Data:\n" + s);
}
}
if (selectorData.writeInterest) {
// writeInterest will be false after a callback, if it is
// set it means we want to wait for the callback.
if (debug) {
log.info("Prevent writeNB when writeInterest is set");
}
return 0;
}
int done = 0;
done = ((SocketChannel) selectorData.channel).write(bb);
selectorData.lastWriteResult = done;
return done;
} catch(IOException ex) {
if (debug) {
log.info("writeNB error pos=" + bb.position() + " len=" +
(bb.limit() - bb.position()) + " " + selectorData + " " +
ex);
}
//ex.printStackTrace();
selectorData.lastException = ex;
selectorData.close();
throw ex;
// return -1;
}
}
public int getPort(NioChannel sd, boolean remote) {
SocketChannel socketChannel = (SocketChannel) sd.channel;
if (remote) {
return socketChannel.socket().getPort();
} else {
return socketChannel.socket().getLocalPort();
}
}
public InetAddress getAddress(NioChannel sd, boolean remote) {
SocketChannel socketChannel = (SocketChannel) sd.channel;
if (remote) {
return socketChannel.socket().getInetAddress();
} else {
return socketChannel.socket().getLocalAddress();
}
}
/**
*/
public void connect(String host, int port, NioChannelCallback cstate)
throws IOException {
connect(new InetSocketAddress(host, port), cstate);
}
public void connect(SocketAddress sa, NioChannelCallback cstate)
throws IOException {
connect(sa, cstate, null);
}
public void connect(SocketAddress sa, NioChannelCallback cstate,
NioChannel filter)
throws IOException {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
NioChannel selectorData = new NioChannel(this);
selectorData.sel = this;
selectorData.callback = cstate;
selectorData.channel = socketChannel;
selectorData.channel = socketChannel; // no key
socketChannel.connect(sa);
opened.incrementAndGet();
synchronized (connectAcceptInterest) {
connectAcceptInterest.add(selectorData);
}
selector.wakeup();
}
// TODO
public void configureSocket(ByteChannel ch,
boolean noDelay) throws IOException {
SocketChannel sockC = (SocketChannel) ch;
sockC.socket().setTcpNoDelay(noDelay);
}
// TODO
public void setSocketOptions(NioChannel selectorData,
int linger,
boolean tcpNoDelay,
int socketTimeout)
throws IOException {
SocketChannel socketChannel =
(SocketChannel) selectorData.channel;
Socket socket = socketChannel.socket();
if(linger >= 0 )
socket.setSoLinger( true, linger);
if( tcpNoDelay )
socket.setTcpNoDelay(tcpNoDelay);
if( socketTimeout > 0 )
socket.setSoTimeout( socketTimeout );
}
/**
* Can be called from multiple threads or multiple times.
*/
public int close(NioChannel selectorData, Throwable exception) throws IOException {
synchronized (closeInterest) {
if (exception != null) {
selectorData.lastException = exception;
}
selectorData.readInterest = false;
if (isSelectorThread()) {
closeIOThread(selectorData, true);
return 0;
}
if (!selectorData.inClosed) {
closeInterest.add(selectorData);
}
}
selector.wakeup();
return 0;
}
public void acceptor(NioChannelCallback cstate,
int port,
InetAddress inet,
int backlog,
int serverTimeout)
throws IOException
{
ServerSocketChannel ssc=ServerSocketChannel.open();
ServerSocket serverSocket = ssc.socket();
SocketAddress sa = null;
if (inet == null) {
sa = new InetSocketAddress( port );
} else {
sa = new InetSocketAddress(inet, port);
}
if (backlog > 0) {
serverSocket.bind( sa , backlog);
} else {
serverSocket.bind(sa);
}
if( serverTimeout >= 0 ) {
serverSocket.setSoTimeout( serverTimeout );
}
ssc.configureBlocking(false);
NioChannel selectorData = new NioChannel(this);
selectorData.channel = ssc; // no key yet
selectorData.callback = cstate;
// key will be set in pending
// TODO: add SSL here
synchronized (connectAcceptInterest) {
connectAcceptInterest.add(selectorData);
}
selector.wakeup();
}
public void runInSelectorThread(Runnable cb) throws IOException {
if (isSelectorThread()) {
cb.run();
} else {
synchronized (runnableInterest) {
runnableInterest.add(cb);
}
selector.wakeup();
}
}
/**
* Example config:
*
* www stream tcp wait USER PATH_TO_tomcatInetd.sh
*
* For a different port, you need to add it to /etc/services.
*
* 'wait' is critical - the common use of inetd is 'nowait' for
* tcp services, which doesn't make sense for java ( too slow startup
* time ). It may make sense in future with something like android VM.
*
* In 'wait' mode, inetd will pass the acceptor socket to java - so
* you can listen on port 80 and run as regular user with no special
* code and magic.
* If tomcat dies, inetd will get back the acceptor and on next connection
* restart tomcat.
*
* This also works with xinetd. It might work with Apple launchd.
*
* TODO: detect inactivity for N minutes, exist - to free resources.
*/
public void inetdAcceptor(NioChannelCallback cstate) throws IOException {
SelectorProvider sp=SelectorProvider.provider();
Channel ch=sp.inheritedChannel();
if(ch!=null ) {
log.info("Inherited: " + ch.getClass().getName());
// blocking mode
ServerSocketChannel ssc=(ServerSocketChannel)ch;
ssc.configureBlocking(false);
NioChannel selectorData = new NioChannel(this);
selectorData.channel = ssc;
selectorData.callback = cstate;
synchronized (connectAcceptInterest) {
connectAcceptInterest.add(selectorData);
}
selector.wakeup();
} else {
log.severe("No inet socket ");
throw new IOException("Invalid inheritedChannel");
}
}
// -------------- Housekeeping -------------
/**
* Same as APR connector - iterate over tasks, get
* smallest timeout
* @throws IOException
*/
void updateSleepTimeAndProcessTimeouts(long now)
throws IOException {
long min = Long.MAX_VALUE;
// TODO: test with large sets, maybe sort
synchronized (active) {
Iterator<NioChannel> activeIt = active.iterator();
while(activeIt.hasNext()) {
NioChannel selectorData = activeIt.next();
if (! selectorData.channel.isOpen()) {
if (debug) {
log.info("Found closed socket, removing " +
selectorData.channel);
}
// activeIt.remove();
// selectorData.close();
}
long t = selectorData.nextTimeEvent;
if (t == 0) {
continue;
}
if (t < now) {
// Timeout
if (debug) {
log.info("Time event " + selectorData);
}
if (selectorData.timeEvent != null) {
selectorData.timeEvent.run();
}
// TODO: make sure this is updated if it was selected
continue;
}
if (t < min) {
min = t;
}
}
}
long nextSleep = min - now;
if (nextSleep > maxSleep) {
sleepTime = maxSleep;
} else if (nextSleep < minSleep) {
sleepTime = minSleep;
} else {
sleepTime = nextSleep;
}
nextWakeup = now + sleepTime;
}
/**
* Request a callback whenever data can be written.
* When the callback is invoked, the write interest is removed ( to avoid
* looping ). If the write() operation doesn't complete, you must call
* writeInterest - AND stop writing, some implementations will throw
* exception. write() will actually attempt to detect this and avoid the
* error.
*
* @param sc
*/
public void writeInterest(NioChannel selectorData) {
// TODO: suspended ?
SelectionKey sk = (SelectionKey) selectorData.selKey;
if (!sk.isValid()) {
return;
}
selectorData.writeInterest = true;
int interest = sk.interestOps();
if ((interest & SelectionKey.OP_WRITE) != 0) {
return;
}
if (Thread.currentThread() == selectorThread) {
interest =
interest | SelectionKey.OP_WRITE;
sk.interestOps(interest);
if (debug) {
log.info("Write interest " + selectorData + " i=" + interest);
}
return;
}
if (debug) {
log.info("Pending write interest " + selectorData);
}
synchronized (writeInterest) {
writeInterest.add(selectorData);
}
selector.wakeup();
}
public void readInterest(NioChannel selectorData, boolean b) throws IOException {
if (Thread.currentThread() == selectorThread) {
selectorData.readInterest = b;
selThreadReadInterest(selectorData);
return;
}
SelectionKey sk = (SelectionKey) selectorData.selKey;
if (sk == null) {
close(selectorData, null);
return;
}
int interest = sk.interestOps();
selectorData.readInterest = b;
if (b && (interest & SelectionKey.OP_READ) != 0) {
return;
}
if (!b && (interest & SelectionKey.OP_READ) == 0) {
return;
}
// Schedule the interest update.
synchronized (readInterest) {
readInterest.add(selectorData);
}
if (debug) {
log.info("Registering pending read interest");
}
selector.wakeup();
}
private void selThreadReadInterest(NioChannel selectorData) throws IOException {
SelectionKey sk = (SelectionKey) selectorData.selKey;
if (sk == null) {
if (selectorData.readInterest) {
if (debug) {
log.info("Register again for read interest");
}
SocketChannel socketChannel =
(SocketChannel) selectorData.channel;
if (socketChannel.isOpen()) {
selectorData.sel = this;
selectorData.selKey =
socketChannel.register(selector,
SelectionKey.OP_READ, selectorData);
selectorData.channel = socketChannel;
}
}
return;
}
if (!sk.isValid()) {
return;
}
int interest = sk.interestOps();
if (sk != null && sk.isValid()) {
if (selectorData.readInterest) {
// if ((interest | SelectionKey.OP_READ) != 0) {
// return;
// }
interest =
interest | SelectionKey.OP_READ;
} else {
// if ((interest | SelectionKey.OP_READ) == 0) {
// return;
// }
interest =
interest & ~SelectionKey.OP_READ;
}
if (interest == 0) {
if (!selectorData.inClosed) {
new Throwable().printStackTrace();
log.warning("No interest(rd removed) " + selectorData);
}
// TODO: should we remove it ? It needs to be re-activated
// later.
sk.cancel(); //??
selectorData.selKey = null;
} else {
sk.interestOps(interest);
}
if (debug) {
log.info(((selectorData.readInterest)
? "RESUME read " : "SUSPEND read ")
+ selectorData);
}
}
}
private void processPendingConnectAccept() throws IOException {
synchronized (connectAcceptInterest) {
Iterator<NioChannel> ci = connectAcceptInterest.iterator();
while (ci.hasNext()) {
NioChannel selectorData = ci.next();
// Find host, port - initiate connection
try {
// Accept interest ?
if (selectorData.channel instanceof ServerSocketChannel) {
ServerSocketChannel socketChannel =
(ServerSocketChannel) selectorData.channel;
selectorData.sel = this;
selectorData.selKey =
socketChannel.register(selector,
SelectionKey.OP_ACCEPT, selectorData);
selectorData.channel = socketChannel;
synchronized (active) {
active.add(selectorData);
}
if (debug) {
log.info("Pending acceptor added: " + selectorData);
}
} else {
SocketChannel socketChannel =
(SocketChannel) selectorData.channel;
selectorData.sel = this;
selectorData.selKey =
socketChannel.register(selector,
SelectionKey.OP_CONNECT, selectorData);
synchronized (active) {
active.add(selectorData);
}
if (debug) {
log.info("Pending connect added: " + selectorData);
}
}
} catch (Throwable e) {
log.log(Level.SEVERE, "error registering connect/accept",
e);
}
}
connectAcceptInterest.clear();
}
}
private void processPending() throws IOException {
if (closeInterest.size() > 0) {
synchronized (closeInterest) {
List<NioChannel> closeList = new ArrayList(closeInterest);
closeInterest.clear();
Iterator<NioChannel> ci = closeList.iterator();
while (ci.hasNext()) {
try {
NioChannel selectorData = ci.next();
closeIOThread(selectorData, true);
} catch (Throwable t) {
t.printStackTrace();
}
}
}
}
processPendingConnectAccept();
processPendingReadWrite();
if (runnableInterest.size() > 0) {
synchronized (runnableInterest) {
Iterator<Runnable> ci = runnableInterest.iterator();
while (ci.hasNext()) {
Runnable cstate = ci.next();
try {
cstate.run();
} catch (Throwable t) {
t.printStackTrace();
}
if (debug) {
log.info("Run in selthread: " + cstate);
}
}
runnableInterest.clear();
}
}
//processPendingUpdateCallback();
}
private void processPendingReadWrite() throws IOException {
// Update interest
if (readInterest.size() > 0) {
synchronized (readInterest) {
Iterator<NioChannel> ci = readInterest.iterator();
while (ci.hasNext()) {
NioChannel cstate = ci.next();
selThreadReadInterest(cstate);
if (debug) {
log.info("Read interest added: " + cstate);
}
}
readInterest.clear();
}
}
if (writeInterest.size() > 0) {
synchronized (writeInterest) {
Iterator<NioChannel> ci = writeInterest.iterator();
while (ci.hasNext()) {
NioChannel cstate = ci.next();
// Fake callback - will update as side effect
handleDataWriteable(cstate);
if (debug) {
log.info("Write interest, calling dataWritable: " + cstate);
}
}
writeInterest.clear();
}
}
}
protected boolean isSelectorThread() {
return Thread.currentThread() == selectorThread;
}
public static boolean isSelectorThread(IOChannel ch) {
SocketIOChannel sc = (SocketIOChannel) ch.getFirst();
return Thread.currentThread() == sc.ch.sel.selectorThread;
}
}