| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.tomcat.util.net; |
| |
| import java.io.EOFException; |
| import java.io.IOException; |
| import java.net.SocketTimeoutException; |
| import java.nio.ByteBuffer; |
| import java.nio.channels.SelectionKey; |
| import java.nio.channels.Selector; |
| import java.util.NoSuchElementException; |
| import java.util.concurrent.ConcurrentLinkedQueue; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import org.apache.juli.logging.Log; |
| import org.apache.juli.logging.LogFactory; |
| |
| /** |
| * |
| * Thread safe non blocking selector pool |
| * @version 1.0 |
| * @since 6.0 |
| */ |
| |
| public class NioSelectorPool { |
| |
| public NioSelectorPool() { |
| } |
| |
| private static final Log log = LogFactory.getLog(NioSelectorPool.class); |
| |
| protected static final boolean SHARED = |
| Boolean.parseBoolean(System.getProperty("org.apache.tomcat.util.net.NioSelectorShared", "true")); |
| |
| protected NioBlockingSelector blockingSelector; |
| |
| protected volatile Selector SHARED_SELECTOR; |
| |
| protected int maxSelectors = 200; |
| protected long sharedSelectorTimeout = 30000; |
| protected int maxSpareSelectors = -1; |
| protected boolean enabled = true; |
| protected AtomicInteger active = new AtomicInteger(0); |
| protected AtomicInteger spare = new AtomicInteger(0); |
| protected ConcurrentLinkedQueue<Selector> selectors = |
| new ConcurrentLinkedQueue<>(); |
| |
| protected Selector getSharedSelector() throws IOException { |
| if (SHARED && SHARED_SELECTOR == null) { |
| synchronized ( NioSelectorPool.class ) { |
| if ( SHARED_SELECTOR == null ) { |
| synchronized (Selector.class) { |
| // Selector.open() isn't thread safe |
| // http://bugs.sun.com/view_bug.do?bug_id=6427854 |
| // Affects 1.6.0_29, fixed in 1.7.0_01 |
| SHARED_SELECTOR = Selector.open(); |
| } |
| log.info("Using a shared selector for servlet write/read"); |
| } |
| } |
| } |
| return SHARED_SELECTOR; |
| } |
| |
| public Selector get() throws IOException{ |
| if ( SHARED ) { |
| return getSharedSelector(); |
| } |
| if ( (!enabled) || active.incrementAndGet() >= maxSelectors ) { |
| if ( enabled ) active.decrementAndGet(); |
| return null; |
| } |
| Selector s = null; |
| try { |
| s = selectors.size()>0?selectors.poll():null; |
| if (s == null) { |
| synchronized (Selector.class) { |
| // Selector.open() isn't thread safe |
| // http://bugs.sun.com/view_bug.do?bug_id=6427854 |
| // Affects 1.6.0_29, fixed in 1.7.0_01 |
| s = Selector.open(); |
| } |
| } |
| else spare.decrementAndGet(); |
| |
| }catch (NoSuchElementException x ) { |
| try { |
| synchronized (Selector.class) { |
| // Selector.open() isn't thread safe |
| // http://bugs.sun.com/view_bug.do?bug_id=6427854 |
| // Affects 1.6.0_29, fixed in 1.7.0_01 |
| s = Selector.open(); |
| } |
| } catch (IOException iox) { |
| } |
| } finally { |
| if ( s == null ) active.decrementAndGet();//we were unable to find a selector |
| } |
| return s; |
| } |
| |
| |
| |
| public void put(Selector s) throws IOException { |
| if ( SHARED ) return; |
| if ( enabled ) active.decrementAndGet(); |
| if ( enabled && (maxSpareSelectors==-1 || spare.get() < Math.min(maxSpareSelectors,maxSelectors)) ) { |
| spare.incrementAndGet(); |
| selectors.offer(s); |
| } |
| else s.close(); |
| } |
| |
| public void close() throws IOException { |
| enabled = false; |
| Selector s; |
| while ( (s = selectors.poll()) != null ) s.close(); |
| spare.set(0); |
| active.set(0); |
| if (blockingSelector!=null) { |
| blockingSelector.close(); |
| } |
| if ( SHARED && getSharedSelector()!=null ) { |
| getSharedSelector().close(); |
| SHARED_SELECTOR = null; |
| } |
| } |
| |
| public void open() throws IOException { |
| enabled = true; |
| getSharedSelector(); |
| if (SHARED) { |
| blockingSelector = new NioBlockingSelector(); |
| blockingSelector.open(getSharedSelector()); |
| } |
| |
| } |
| |
| /** |
| * Performs a write using the bytebuffer for data to be written and a |
| * selector to block (if blocking is requested). If the |
| * <code>selector</code> parameter is null, and blocking is requested then |
| * it will perform a busy write that could take up a lot of CPU cycles. |
| * @param buf The buffer containing the data, we will write as long as <code>(buf.hasRemaining()==true)</code> |
| * @param socket The socket to write data to |
| * @param selector The selector to use for blocking, if null then a busy write will be initiated |
| * @param writeTimeout The timeout for this write operation in milliseconds, -1 means no timeout |
| * @param block <code>true</code> to perform a blocking write |
| * otherwise a non-blocking write will be performed |
| * @return int - returns the number of bytes written |
| * @throws EOFException if write returns -1 |
| * @throws SocketTimeoutException if the write times out |
| * @throws IOException if an IO Exception occurs in the underlying socket logic |
| */ |
| public int write(ByteBuffer buf, NioChannel socket, Selector selector, |
| long writeTimeout, boolean block) throws IOException { |
| if ( SHARED && block ) { |
| return blockingSelector.write(buf,socket,writeTimeout); |
| } |
| SelectionKey key = null; |
| int written = 0; |
| boolean timedout = false; |
| int keycount = 1; //assume we can write |
| long time = System.currentTimeMillis(); //start the timeout timer |
| try { |
| while ( (!timedout) && buf.hasRemaining() ) { |
| int cnt = 0; |
| if ( keycount > 0 ) { //only write if we were registered for a write |
| cnt = socket.write(buf); //write the data |
| if (cnt == -1) throw new EOFException(); |
| |
| written += cnt; |
| if (cnt > 0) { |
| time = System.currentTimeMillis(); //reset our timeout timer |
| continue; //we successfully wrote, try again without a selector |
| } |
| if (cnt==0 && (!block)) break; //don't block |
| } |
| if ( selector != null ) { |
| //register OP_WRITE to the selector |
| if (key==null) key = socket.getIOChannel().register(selector, SelectionKey.OP_WRITE); |
| else key.interestOps(SelectionKey.OP_WRITE); |
| if (writeTimeout==0) { |
| timedout = buf.hasRemaining(); |
| } else if (writeTimeout<0) { |
| keycount = selector.select(); |
| } else { |
| keycount = selector.select(writeTimeout); |
| } |
| } |
| if (writeTimeout > 0 && (selector == null || keycount == 0) ) timedout = (System.currentTimeMillis()-time)>=writeTimeout; |
| }//while |
| if ( timedout ) throw new SocketTimeoutException(); |
| } finally { |
| if (key != null) { |
| key.cancel(); |
| if (selector != null) selector.selectNow();//removes the key from this selector |
| } |
| } |
| return written; |
| } |
| |
| /** |
| * Performs a blocking read using the bytebuffer for data to be read and a selector to block. |
| * If the <code>selector</code> parameter is null, then it will perform a busy read that could |
| * take up a lot of CPU cycles. |
| * @param buf ByteBuffer - the buffer containing the data, we will read as until we have read at least one byte or we timed out |
| * @param socket SocketChannel - the socket to write data to |
| * @param selector Selector - the selector to use for blocking, if null then a busy read will be initiated |
| * @param readTimeout long - the timeout for this read operation in milliseconds, -1 means no timeout |
| * @return int - returns the number of bytes read |
| * @throws EOFException if read returns -1 |
| * @throws SocketTimeoutException if the read times out |
| * @throws IOException if an IO Exception occurs in the underlying socket logic |
| */ |
| public int read(ByteBuffer buf, NioChannel socket, Selector selector, long readTimeout) throws IOException { |
| return read(buf,socket,selector,readTimeout,true); |
| } |
| /** |
| * Performs a read using the bytebuffer for data to be read and a selector to register for events should |
| * you have the block=true. |
| * If the <code>selector</code> parameter is null, then it will perform a busy read that could |
| * take up a lot of CPU cycles. |
| * @param buf ByteBuffer - the buffer containing the data, we will read as until we have read at least one byte or we timed out |
| * @param socket SocketChannel - the socket to write data to |
| * @param selector Selector - the selector to use for blocking, if null then a busy read will be initiated |
| * @param readTimeout long - the timeout for this read operation in milliseconds, -1 means no timeout |
| * @param block - true if you want to block until data becomes available or timeout time has been reached |
| * @return int - returns the number of bytes read |
| * @throws EOFException if read returns -1 |
| * @throws SocketTimeoutException if the read times out |
| * @throws IOException if an IO Exception occurs in the underlying socket logic |
| */ |
| public int read(ByteBuffer buf, NioChannel socket, Selector selector, long readTimeout, boolean block) throws IOException { |
| if ( SHARED && block ) { |
| return blockingSelector.read(buf,socket,readTimeout); |
| } |
| SelectionKey key = null; |
| int read = 0; |
| boolean timedout = false; |
| int keycount = 1; //assume we can write |
| long time = System.currentTimeMillis(); //start the timeout timer |
| try { |
| while ( (!timedout) ) { |
| int cnt = 0; |
| if ( keycount > 0 ) { //only read if we were registered for a read |
| cnt = socket.read(buf); |
| if (cnt == -1) throw new EOFException(); |
| read += cnt; |
| if (cnt > 0) continue; //read some more |
| if (cnt==0 && (read>0 || (!block) ) ) break; //we are done reading |
| } |
| if ( selector != null ) {//perform a blocking read |
| //register OP_WRITE to the selector |
| if (key==null) key = socket.getIOChannel().register(selector, SelectionKey.OP_READ); |
| else key.interestOps(SelectionKey.OP_READ); |
| if (readTimeout==0) { |
| timedout = (read==0); |
| } else if (readTimeout<0) { |
| keycount = selector.select(); |
| } else { |
| keycount = selector.select(readTimeout); |
| } |
| } |
| if (readTimeout > 0 && (selector == null || keycount == 0) ) timedout = (System.currentTimeMillis()-time)>=readTimeout; |
| }//while |
| if ( timedout ) throw new SocketTimeoutException(); |
| } finally { |
| if (key != null) { |
| key.cancel(); |
| if (selector != null) selector.selectNow();//removes the key from this selector |
| } |
| } |
| return read; |
| } |
| |
| public void setMaxSelectors(int maxSelectors) { |
| this.maxSelectors = maxSelectors; |
| } |
| |
| public void setMaxSpareSelectors(int maxSpareSelectors) { |
| this.maxSpareSelectors = maxSpareSelectors; |
| } |
| |
| public void setEnabled(boolean enabled) { |
| this.enabled = enabled; |
| } |
| |
| public void setSharedSelectorTimeout(long sharedSelectorTimeout) { |
| this.sharedSelectorTimeout = sharedSelectorTimeout; |
| } |
| |
| public int getMaxSelectors() { |
| return maxSelectors; |
| } |
| |
| public int getMaxSpareSelectors() { |
| return maxSpareSelectors; |
| } |
| |
| public boolean isEnabled() { |
| return enabled; |
| } |
| |
| public long getSharedSelectorTimeout() { |
| return sharedSelectorTimeout; |
| } |
| |
| public ConcurrentLinkedQueue<Selector> getSelectors() { |
| return selectors; |
| } |
| |
| public AtomicInteger getSpare() { |
| return spare; |
| } |
| } |