blob: b8f1bc3397c65e68c15ef28a14d35cb329b7e307 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tomcat.util.net;
import java.io.EOFException;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.apache.tomcat.util.ExceptionUtils;
import org.apache.tomcat.util.collections.SynchronizedQueue;
import org.apache.tomcat.util.collections.SynchronizedStack;
import org.apache.tomcat.util.net.NioEndpoint.KeyAttachment;
public class NioBlockingSelector {
private static final Log log = LogFactory.getLog(NioBlockingSelector.class);
private static int threadCounter = 0;
private final SynchronizedStack<KeyReference> keyReferenceStack =
new SynchronizedStack<>();
protected Selector sharedSelector;
protected BlockPoller poller;
public NioBlockingSelector() {
}
public void open(Selector selector) {
sharedSelector = selector;
poller = new BlockPoller();
poller.selector = sharedSelector;
poller.setDaemon(true);
poller.setName("NioBlockingSelector.BlockPoller-"+(++threadCounter));
poller.start();
}
public void close() {
if (poller!=null) {
poller.disable();
poller.interrupt();
poller = null;
}
}
/**
* Performs a blocking write using the bytebuffer for data to be written
* If the <code>selector</code> parameter is null, then it will perform a busy write that could
* take up a lot of CPU cycles.
* @param buf ByteBuffer - the buffer containing the data, we will write as long as <code>(buf.hasRemaining()==true)</code>
* @param socket SocketChannel - the socket to write data to
* @param writeTimeout long - the timeout for this write operation in milliseconds, -1 means no timeout
* @return int - returns the number of bytes written
* @throws EOFException if write returns -1
* @throws SocketTimeoutException if the write times out
* @throws IOException if an IO Exception occurs in the underlying socket logic
*/
public int write(ByteBuffer buf, NioChannel socket, long writeTimeout)
throws IOException {
SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
if ( key == null ) throw new IOException("Key no longer registered");
KeyReference reference = keyReferenceStack.pop();
if (reference == null) {
reference = new KeyReference();
}
KeyAttachment att = (KeyAttachment) key.attachment();
int written = 0;
boolean timedout = false;
int keycount = 1; //assume we can write
long time = System.currentTimeMillis(); //start the timeout timer
try {
while ( (!timedout) && buf.hasRemaining()) {
if (keycount > 0) { //only write if we were registered for a write
int cnt = socket.write(buf); //write the data
if (cnt == -1)
throw new EOFException();
written += cnt;
if (cnt > 0) {
time = System.currentTimeMillis(); //reset our timeout timer
continue; //we successfully wrote, try again without a selector
}
}
try {
if ( att.getWriteLatch()==null || att.getWriteLatch().getCount()==0) att.startWriteLatch(1);
poller.add(att,SelectionKey.OP_WRITE,reference);
if (writeTimeout < 0) {
att.awaitWriteLatch(Long.MAX_VALUE,TimeUnit.MILLISECONDS);
} else {
att.awaitWriteLatch(writeTimeout,TimeUnit.MILLISECONDS);
}
} catch (InterruptedException ignore) {
// Ignore
}
if ( att.getWriteLatch()!=null && att.getWriteLatch().getCount()> 0) {
//we got interrupted, but we haven't received notification from the poller.
keycount = 0;
}else {
//latch countdown has happened
keycount = 1;
att.resetWriteLatch();
}
if (writeTimeout > 0 && (keycount == 0))
timedout = (System.currentTimeMillis() - time) >= writeTimeout;
} //while
if (timedout)
throw new SocketTimeoutException();
} finally {
poller.remove(att,SelectionKey.OP_WRITE);
if (timedout && reference.key!=null) {
poller.cancelKey(reference.key);
}
reference.key = null;
keyReferenceStack.push(reference);
}
return written;
}
/**
* Performs a blocking read using the bytebuffer for data to be read
* If the <code>selector</code> parameter is null, then it will perform a busy read that could
* take up a lot of CPU cycles.
* @param buf ByteBuffer - the buffer containing the data, we will read as until we have read at least one byte or we timed out
* @param socket SocketChannel - the socket to write data to
* @param readTimeout long - the timeout for this read operation in milliseconds, -1 means no timeout
* @return int - returns the number of bytes read
* @throws EOFException if read returns -1
* @throws SocketTimeoutException if the read times out
* @throws IOException if an IO Exception occurs in the underlying socket logic
*/
public int read(ByteBuffer buf, NioChannel socket, long readTimeout) throws IOException {
SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
if ( key == null ) throw new IOException("Key no longer registered");
KeyReference reference = keyReferenceStack.pop();
if (reference == null) {
reference = new KeyReference();
}
KeyAttachment att = (KeyAttachment) key.attachment();
int read = 0;
boolean timedout = false;
int keycount = 1; //assume we can read
long time = System.currentTimeMillis(); //start the timeout timer
try {
while(!timedout) {
if (keycount > 0) { //only read if we were registered for a read
read = socket.read(buf);
if (read == -1)
throw new EOFException();
if (read > 0)
break;
}
try {
if ( att.getReadLatch()==null || att.getReadLatch().getCount()==0) att.startReadLatch(1);
poller.add(att,SelectionKey.OP_READ, reference);
if (readTimeout < 0) {
att.awaitReadLatch(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
} else {
att.awaitReadLatch(readTimeout, TimeUnit.MILLISECONDS);
}
} catch (InterruptedException ignore) {
// Ignore
}
if ( att.getReadLatch()!=null && att.getReadLatch().getCount()> 0) {
//we got interrupted, but we haven't received notification from the poller.
keycount = 0;
}else {
//latch countdown has happened
keycount = 1;
att.resetReadLatch();
}
if (readTimeout >= 0 && (keycount == 0))
timedout = (System.currentTimeMillis() - time) >= readTimeout;
} //while
if (timedout)
throw new SocketTimeoutException();
} finally {
poller.remove(att,SelectionKey.OP_READ);
if (timedout && reference.key!=null) {
poller.cancelKey(reference.key);
}
reference.key = null;
keyReferenceStack.push(reference);
}
return read;
}
protected static class BlockPoller extends Thread {
protected volatile boolean run = true;
protected Selector selector = null;
protected final SynchronizedQueue<Runnable> events = new SynchronizedQueue<>();
public void disable() { run = false; selector.wakeup();}
protected final AtomicInteger wakeupCounter = new AtomicInteger(0);
public void cancelKey(final SelectionKey key) {
Runnable r = new RunnableCancel(key);
events.offer(r);
wakeup();
}
public void wakeup() {
if (wakeupCounter.addAndGet(1)==0) selector.wakeup();
}
public void cancel(SelectionKey sk, KeyAttachment key, int ops){
if (sk!=null) {
sk.cancel();
sk.attach(null);
if (SelectionKey.OP_WRITE==(ops&SelectionKey.OP_WRITE)) countDown(key.getWriteLatch());
if (SelectionKey.OP_READ==(ops&SelectionKey.OP_READ))countDown(key.getReadLatch());
}
}
public void add(final KeyAttachment key, final int ops, final KeyReference ref) {
if ( key == null ) return;
NioChannel nch = key.getSocket();
if ( nch == null ) return;
final SocketChannel ch = nch.getIOChannel();
if ( ch == null ) return;
Runnable r = new RunnableAdd(ch, key, ops, ref);
events.offer(r);
wakeup();
}
public void remove(final KeyAttachment key, final int ops) {
if ( key == null ) return;
NioChannel nch = key.getSocket();
if ( nch == null ) return;
final SocketChannel ch = nch.getIOChannel();
if ( ch == null ) return;
Runnable r = new RunnableRemove(ch, key, ops);
events.offer(r);
wakeup();
}
public boolean events() {
Runnable r = null;
/* We only poll and run the runnable events when we start this
* method. Further events added to the queue later will be delayed
* to the next execution of this method.
*
* We do in this way, because running event from the events queue
* may lead the working thread to add more events to the queue (for
* example, the worker thread may add another RunnableAdd event when
* waken up by a previous RunnableAdd event who got an invalid
* SelectionKey). Trying to consume all the events in an increasing
* queue till it's empty, will make the loop hard to be terminated,
* which will kill a lot of time, and greatly affect performance of
* the poller loop.
*/
int size = events.size();
for (int i = 0; i < size && (r = events.poll()) != null; i++) {
r.run();
}
return (size > 0);
}
@Override
public void run() {
while (run) {
try {
events();
int keyCount = 0;
try {
int i = wakeupCounter.get();
if (i>0)
keyCount = selector.selectNow();
else {
wakeupCounter.set(-1);
keyCount = selector.select(1000);
}
wakeupCounter.set(0);
if (!run) break;
}catch ( NullPointerException x ) {
//sun bug 5076772 on windows JDK 1.5
if (selector==null) throw x;
if ( log.isDebugEnabled() ) log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5",x);
continue;
} catch ( CancelledKeyException x ) {
//sun bug 5076772 on windows JDK 1.5
if ( log.isDebugEnabled() ) log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5",x);
continue;
} catch (Throwable x) {
ExceptionUtils.handleThrowable(x);
log.error("",x);
continue;
}
Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null;
// Walk through the collection of ready keys and dispatch
// any active event.
while (run && iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
KeyAttachment attachment = (KeyAttachment)sk.attachment();
try {
attachment.access();
iterator.remove();
sk.interestOps(sk.interestOps() & (~sk.readyOps()));
if ( sk.isReadable() ) {
countDown(attachment.getReadLatch());
}
if (sk.isWritable()) {
countDown(attachment.getWriteLatch());
}
}catch (CancelledKeyException ckx) {
sk.cancel();
countDown(attachment.getReadLatch());
countDown(attachment.getWriteLatch());
}
}//while
}catch ( Throwable t ) {
log.error("",t);
}
}
events.clear();
// If using a shared selector, the NioSelectorPool will also try and
// close the selector. Try and avoid the ClosedSelectorException
// although because multiple threads are involved there is always
// the possibility of an Exception here.
if (selector.isOpen()) {
try {
// Cancels all remaining keys
selector.selectNow();
}catch( Exception ignore ) {
if (log.isDebugEnabled())log.debug("",ignore);
}
}
try {
selector.close();
}catch( Exception ignore ) {
if (log.isDebugEnabled())log.debug("",ignore);
}
}
public void countDown(CountDownLatch latch) {
if ( latch == null ) return;
latch.countDown();
}
private class RunnableAdd implements Runnable {
private final SocketChannel ch;
private final KeyAttachment key;
private final int ops;
private final KeyReference ref;
public RunnableAdd(SocketChannel ch, KeyAttachment key, int ops, KeyReference ref) {
this.ch = ch;
this.key = key;
this.ops = ops;
this.ref = ref;
}
@Override
public void run() {
SelectionKey sk = ch.keyFor(selector);
try {
if (sk == null) {
sk = ch.register(selector, ops, key);
ref.key = sk;
} else if (!sk.isValid()) {
cancel(sk, key, ops);
} else {
sk.interestOps(sk.interestOps() | ops);
}
} catch (CancelledKeyException cx) {
cancel(sk, key, ops);
} catch (ClosedChannelException cx) {
cancel(null, key, ops);
}
}
}
private class RunnableRemove implements Runnable {
private final SocketChannel ch;
private final KeyAttachment key;
private final int ops;
public RunnableRemove(SocketChannel ch, KeyAttachment key, int ops) {
this.ch = ch;
this.key = key;
this.ops = ops;
}
@Override
public void run() {
SelectionKey sk = ch.keyFor(selector);
try {
if (sk == null) {
if (SelectionKey.OP_WRITE==(ops&SelectionKey.OP_WRITE)) countDown(key.getWriteLatch());
if (SelectionKey.OP_READ==(ops&SelectionKey.OP_READ))countDown(key.getReadLatch());
} else {
if (sk.isValid()) {
sk.interestOps(sk.interestOps() & (~ops));
if (SelectionKey.OP_WRITE==(ops&SelectionKey.OP_WRITE)) countDown(key.getWriteLatch());
if (SelectionKey.OP_READ==(ops&SelectionKey.OP_READ))countDown(key.getReadLatch());
if (sk.interestOps()==0) {
sk.cancel();
sk.attach(null);
}
}else {
sk.cancel();
sk.attach(null);
}
}
}catch (CancelledKeyException cx) {
if (sk!=null) {
sk.cancel();
sk.attach(null);
}
}
}
}
public static class RunnableCancel implements Runnable {
private final SelectionKey key;
public RunnableCancel(SelectionKey key) {
this.key = key;
}
@Override
public void run() {
key.cancel();
}
}
}
public static class KeyReference {
SelectionKey key = null;
@Override
public void finalize() {
if (key!=null && key.isValid()) {
log.warn("Possible key leak, cancelling key in the finalizer.");
try {key.cancel();}catch (Exception ignore){}
}
}
}
}