blob: c8e9628e3ea07fd60500d4fb0c63b85309986d84 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tomcat.util.net;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.CompletionHandler;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.util.Locale;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.KeyManager;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLSessionContext;
import javax.net.ssl.X509KeyManager;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.apache.tomcat.util.ExceptionUtils;
import org.apache.tomcat.util.collections.SynchronizedStack;
import org.apache.tomcat.util.compat.JreCompat;
import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
import org.apache.tomcat.util.net.SecureNio2Channel.ApplicationBufferHandler;
import org.apache.tomcat.util.net.jsse.NioX509KeyManager;
/**
* NIO2 endpoint.
*/
public class Nio2Endpoint extends AbstractEndpoint<Nio2Channel> {
// -------------------------------------------------------------- Constants
private static final Log log = LogFactory.getLog(Nio2Endpoint.class);
// ----------------------------------------------------------------- Fields
/**
* Server socket "pointer".
*/
private AsynchronousServerSocketChannel serverSock = null;
/**
* use send file
*/
private boolean useSendfile = true;
/**
* The size of the OOM parachute.
*/
private int oomParachute = 1024*1024;
/**
* Allows detecting if a completion handler completes inline.
*/
private static ThreadLocal<Boolean> inlineCompletion = new ThreadLocal<>();
/**
* Thread group associated with the server socket.
*/
private AsynchronousChannelGroup threadGroup = null;
private volatile boolean allClosed;
/**
* The oom parachute, when an OOM error happens,
* will release the data, giving the JVM instantly
* a chunk of data to be able to recover with.
*/
private byte[] oomParachuteData = null;
/**
* Make sure this string has already been allocated
*/
private static final String oomParachuteMsg =
"SEVERE:Memory usage is low, parachute is non existent, your system may start failing.";
/**
* Keep track of OOM warning messages.
*/
private long lastParachuteCheck = System.currentTimeMillis();
/**
* Cache for SocketProcessor objects
*/
private SynchronizedStack<SocketProcessor> processorCache;
/**
* Bytebuffer cache, each channel holds a set of buffers (two, except for SSL holds four)
*/
private SynchronizedStack<Nio2Channel> nioChannels;
// ------------------------------------------------------------ Constructor
public Nio2Endpoint() {
// If running on Java 7, the insecure DHE ciphers need to be excluded by
// default
if (!JreCompat.isJre8Available()) {
setCiphers(DEFAULT_CIPHERS + ":!DHE");
}
}
// ------------------------------------------------------------- Properties
/**
* Use the object caches to reduce GC at the expense of additional memory use.
*/
private boolean useCaches = false;
public void setUseCaches(boolean useCaches) { this.useCaches = useCaches; }
public boolean getUseCaches() { return useCaches; }
/**
* Priority of the poller threads.
*/
private int pollerThreadPriority = Thread.NORM_PRIORITY;
public void setPollerThreadPriority(int pollerThreadPriority) { this.pollerThreadPriority = pollerThreadPriority; }
public int getPollerThreadPriority() { return pollerThreadPriority; }
/**
* Handling of accepted sockets.
*/
private Handler handler = null;
public void setHandler(Handler handler ) { this.handler = handler; }
public Handler getHandler() { return handler; }
/**
* Allow comet request handling.
*/
private boolean useComet = true;
public void setUseComet(boolean useComet) { this.useComet = useComet; }
@Override
public boolean getUseComet() { return useComet; }
@Override
public boolean getUseCometTimeout() { return getUseComet(); }
@Override
public boolean getUsePolling() { return true; } // Always supported
public void setSocketProperties(SocketProperties socketProperties) {
this.socketProperties = socketProperties;
}
public void setUseSendfile(boolean useSendfile) {
this.useSendfile = useSendfile;
}
/**
* Is deferAccept supported?
*/
@Override
public boolean getDeferAccept() {
// Not supported
return false;
}
public void setOomParachute(int oomParachute) {
this.oomParachute = oomParachute;
}
public void setOomParachuteData(byte[] oomParachuteData) {
this.oomParachuteData = oomParachuteData;
}
private SSLContext sslContext = null;
public SSLContext getSSLContext() { return sslContext;}
public void setSSLContext(SSLContext c) { sslContext = c;}
private String[] enabledCiphers;
private String[] enabledProtocols;
/**
* Port in use.
*/
@Override
public int getLocalPort() {
AsynchronousServerSocketChannel ssc = serverSock;
if (ssc == null) {
return -1;
} else {
try {
SocketAddress sa = ssc.getLocalAddress();
if (sa != null && sa instanceof InetSocketAddress) {
return ((InetSocketAddress) sa).getPort();
} else {
return -1;
}
} catch (IOException e) {
return -1;
}
}
}
@Override
public String[] getCiphersUsed() {
return enabledCiphers;
}
// --------------------------------------------------------- OOM Parachute Methods
protected void checkParachute() {
boolean para = reclaimParachute(false);
if (!para && (System.currentTimeMillis()-lastParachuteCheck)>10000) {
try {
log.fatal(oomParachuteMsg);
}catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
System.err.println(oomParachuteMsg);
}
lastParachuteCheck = System.currentTimeMillis();
}
}
protected boolean reclaimParachute(boolean force) {
if ( oomParachuteData != null ) return true;
if ( oomParachute > 0 && ( force || (Runtime.getRuntime().freeMemory() > (oomParachute*2))) )
oomParachuteData = new byte[oomParachute];
return oomParachuteData != null;
}
protected void releaseCaches() {
if (useCaches) {
this.nioChannels.clear();
this.processorCache.clear();
}
if ( handler != null ) handler.recycle();
}
// --------------------------------------------------------- Public Methods
/**
* Number of keepalive sockets.
*/
public int getKeepAliveCount() {
// For this connector, only the overall connection count is relevant
return -1;
}
// ----------------------------------------------- Public Lifecycle Methods
/**
* Initialize the endpoint.
*/
@Override
public void bind() throws Exception {
// Create worker collection
if ( getExecutor() == null ) {
createExecutor();
}
if (getExecutor() instanceof ExecutorService) {
threadGroup = AsynchronousChannelGroup.withThreadPool((ExecutorService) getExecutor());
}
// AsynchronousChannelGroup currently needs exclusive access to its executor service
if (!internalExecutor) {
log.warn(sm.getString("endpoint.nio2.exclusiveExecutor"));
}
serverSock = AsynchronousServerSocketChannel.open(threadGroup);
socketProperties.setProperties(serverSock);
InetSocketAddress addr = (getAddress()!=null?new InetSocketAddress(getAddress(),getPort()):new InetSocketAddress(getPort()));
serverSock.bind(addr,getBacklog());
// Initialize thread count defaults for acceptor, poller
if (acceptorThreadCount != 1) {
// NIO2 does not allow any form of IO concurrency
acceptorThreadCount = 1;
}
// Initialize SSL if needed
if (isSSLEnabled()) {
SSLUtil sslUtil = handler.getSslImplementation().getSSLUtil(this);
sslContext = sslUtil.createSSLContext();
sslContext.init(wrap(sslUtil.getKeyManagers()),
sslUtil.getTrustManagers(), null);
SSLSessionContext sessionContext =
sslContext.getServerSessionContext();
if (sessionContext != null) {
sslUtil.configureSessionContext(sessionContext);
}
// Determine which cipher suites and protocols to enable
enabledCiphers = sslUtil.getEnableableCiphers(sslContext);
enabledProtocols = sslUtil.getEnableableProtocols(sslContext);
}
if (oomParachute>0) reclaimParachute(true);
}
public KeyManager[] wrap(KeyManager[] managers) {
if (managers==null) return null;
KeyManager[] result = new KeyManager[managers.length];
for (int i=0; i<result.length; i++) {
if (managers[i] instanceof X509KeyManager && getKeyAlias()!=null) {
String keyAlias = getKeyAlias();
// JKS keystores always convert the alias name to lower case
if ("jks".equalsIgnoreCase(getKeystoreType())) {
keyAlias = keyAlias.toLowerCase(Locale.ENGLISH);
}
result[i] = new NioX509KeyManager((X509KeyManager) managers[i], keyAlias);
} else {
result[i] = managers[i];
}
}
return result;
}
/**
* Start the NIO endpoint, creating acceptor, poller threads.
*/
@Override
public void startInternal() throws Exception {
if (!running) {
allClosed = false;
running = true;
paused = false;
if (useCaches) {
processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getProcessorCache());
nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getBufferPool());
}
// Create worker collection
if ( getExecutor() == null ) {
createExecutor();
}
initializeConnectionLatch();
startAcceptorThreads();
setAsyncTimeout(new AsyncTimeout());
Thread timeoutThread = new Thread(getAsyncTimeout(), getName() + "-AsyncTimeout");
timeoutThread.setPriority(threadPriority);
timeoutThread.setDaemon(true);
timeoutThread.start();
}
}
/**
* Stop the endpoint. This will cause all processing threads to stop.
*/
@Override
public void stopInternal() {
releaseConnectionLatch();
if (!paused) {
pause();
}
if (running) {
running = false;
getAsyncTimeout().stop();
unlockAccept();
// Use the executor to avoid binding the main thread if something bad
// occurs and unbind will also wait for a bit for it to complete
getExecutor().execute(new Runnable() {
@Override
public void run() {
// Timeout any pending async request
for (SocketWrapper<Nio2Channel> socket : waitingRequests) {
processSocket(socket, SocketStatus.TIMEOUT, false);
}
// Then close all active connections if any remains
try {
handler.closeAll();
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
} finally {
allClosed = true;
}
}
});
if (useCaches) {
nioChannels.clear();
processorCache.clear();
}
}
}
/**
* Deallocate NIO memory pools, and close server socket.
*/
@Override
public void unbind() throws Exception {
if (running) {
stop();
}
// Close server socket
serverSock.close();
serverSock = null;
sslContext = null;
// Unlike other connectors, the thread pool is tied to the server socket
shutdownExecutor();
releaseCaches();
}
@Override
public void shutdownExecutor() {
if (threadGroup != null && internalExecutor) {
try {
long timeout = getExecutorTerminationTimeoutMillis();
while (timeout > 0 && !allClosed) {
timeout -= 100;
Thread.sleep(100);
}
threadGroup.shutdownNow();
if (timeout > 0) {
threadGroup.awaitTermination(timeout, TimeUnit.MILLISECONDS);
}
} catch (IOException e) {
getLog().warn(sm.getString("endpoint.warn.executorShutdown", getName()), e);
} catch (InterruptedException e) {
// Ignore
}
if (!threadGroup.isTerminated()) {
getLog().warn(sm.getString("endpoint.warn.executorShutdown", getName()));
}
threadGroup = null;
}
// Mostly to cleanup references
super.shutdownExecutor();
}
// ------------------------------------------------------ Protected Methods
public int getWriteBufSize() {
return socketProperties.getTxBufSize();
}
public int getReadBufSize() {
return socketProperties.getRxBufSize();
}
@Override
public boolean getUseSendfile() {
return useSendfile;
}
public int getOomParachute() {
return oomParachute;
}
public byte[] getOomParachuteData() {
return oomParachuteData;
}
@Override
protected AbstractEndpoint.Acceptor createAcceptor() {
return new Acceptor();
}
/**
* Process the specified connection.
*/
protected boolean setSocketOptions(AsynchronousSocketChannel socket) {
// Process the connection
try {
socketProperties.setProperties(socket);
Nio2Channel channel = (useCaches) ? nioChannels.pop() : null;
if (channel == null) {
// SSL setup
if (sslContext != null) {
SSLEngine engine = createSSLEngine();
int appBufferSize = engine.getSession().getApplicationBufferSize();
NioBufferHandler bufhandler = new NioBufferHandler(
Math.max(appBufferSize, socketProperties.getAppReadBufSize()),
Math.max(appBufferSize, socketProperties.getAppWriteBufSize()),
socketProperties.getDirectBuffer());
channel = new SecureNio2Channel(engine, bufhandler, this);
} else {
NioBufferHandler bufhandler = new NioBufferHandler(
socketProperties.getAppReadBufSize(),
socketProperties.getAppWriteBufSize(),
socketProperties.getDirectBuffer());
channel = new Nio2Channel(bufhandler);
}
} else {
if (sslContext != null) {
SSLEngine engine = createSSLEngine();
((SecureNio2Channel) channel).setSSLEngine(engine);
}
}
Nio2SocketWrapper socketWrapper = new Nio2SocketWrapper(channel);
channel.reset(socket, socketWrapper);
socketWrapper.setTimeout(getSocketProperties().getSoTimeout());
socketWrapper.setKeepAliveLeft(Nio2Endpoint.this.getMaxKeepAliveRequests());
socketWrapper.setSecure(isSSLEnabled());
if (sslContext != null) {
// Use the regular processing, as the first handshake needs to be done there
processSocket(socketWrapper, SocketStatus.OPEN_READ, true);
} else {
// Wait until some bytes are available to start the real processing
awaitBytes(socketWrapper);
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
try {
log.error("",t);
} catch (Throwable tt) {
ExceptionUtils.handleThrowable(t);
}
// Tell to close the socket
return false;
}
return true;
}
protected SSLEngine createSSLEngine() {
SSLEngine engine = sslContext.createSSLEngine();
if ("false".equals(getClientAuth())) {
engine.setNeedClientAuth(false);
engine.setWantClientAuth(false);
} else if ("true".equals(getClientAuth()) || "yes".equals(getClientAuth())){
engine.setNeedClientAuth(true);
} else if ("want".equals(getClientAuth())) {
engine.setWantClientAuth(true);
}
engine.setUseClientMode(false);
engine.setEnabledCipherSuites(enabledCiphers);
engine.setEnabledProtocols(enabledProtocols);
configureUseServerCipherSuitesOrder(engine);
return engine;
}
/**
* Returns true if a worker thread is available for processing.
* @return boolean
*/
protected boolean isWorkerAvailable() {
return true;
}
@Override
public void processSocket(SocketWrapper<Nio2Channel> socketWrapper,
SocketStatus socketStatus, boolean dispatch) {
processSocket0(socketWrapper, socketStatus, dispatch);
}
protected boolean processSocket0(SocketWrapper<Nio2Channel> socketWrapper, SocketStatus status, boolean dispatch) {
try {
waitingRequests.remove(socketWrapper);
SocketProcessor sc = (useCaches) ? processorCache.pop() : null;
if (sc == null) {
sc = new SocketProcessor(socketWrapper, status);
} else {
sc.reset(socketWrapper, status);
}
Executor executor = getExecutor();
if (dispatch && executor != null) {
executor.execute(sc);
} else {
sc.run();
}
} catch (RejectedExecutionException ree) {
log.debug(sm.getString("endpoint.executor.fail", socketWrapper), ree);
return false;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
log.error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}
public void closeSocket(SocketWrapper<Nio2Channel> socket, SocketStatus status) {
if (socket == null) {
return;
}
try {
if (socket.isComet() && status != null) {
socket.setComet(false);//to avoid a loop
if (status == SocketStatus.TIMEOUT) {
if (processSocket0(socket, status, true)) {
return; // don't close on comet timeout
}
} else {
// Don't dispatch if the lines below are canceling the key
processSocket0(socket, status, false);
}
}
} catch (Throwable e) {
ExceptionUtils.handleThrowable(e);
if (log.isDebugEnabled()) log.error("",e);
}
try {
handler.release(socket);
} catch (Throwable e) {
ExceptionUtils.handleThrowable(e);
if (log.isDebugEnabled()) log.error("",e);
}
try {
if (socket.getSocket() != null) {
synchronized (socket.getSocket()) {
if (socket.getSocket() != null && socket.getSocket().isOpen()) {
countDownConnection();
socket.getSocket().close(true);
}
}
}
} catch (Exception e){
if (log.isDebugEnabled()) {
log.debug(sm.getString(
"endpoint.debug.socketCloseFail"), e);
}
}
try {
Nio2SocketWrapper nio2Socket = (Nio2SocketWrapper) socket;
if (nio2Socket.getSendfileData() != null
&& nio2Socket.getSendfileData().fchannel != null
&& nio2Socket.getSendfileData().fchannel.isOpen()) {
nio2Socket.getSendfileData().fchannel.close();
}
} catch (Throwable e) {
ExceptionUtils.handleThrowable(e);
if (log.isDebugEnabled()) log.error("",e);
}
}
@Override
protected Log getLog() {
return log;
}
// --------------------------------------------------- Acceptor Inner Class
/**
* With NIO2, the main acceptor thread only initiates the initial accept
* but periodically checks that the connector is still accepting (if not
* it will attempt to start again). It is also responsible for periodic
* checks of async timeouts, rather than use a dedicated thread for that.
*/
protected class Acceptor extends AbstractEndpoint.Acceptor {
@Override
public void run() {
int errorDelay = 0;
// Loop until we receive a shutdown command
while (running) {
// Loop if endpoint is paused
while (paused && running) {
state = AcceptorState.PAUSED;
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// Ignore
}
}
if (!running) {
break;
}
state = AcceptorState.RUNNING;
try {
//if we have reached max connections, wait
countUpOrAwaitConnection();
AsynchronousSocketChannel socket = null;
try {
// Accept the next incoming connection from the server
// socket
socket = serverSock.accept().get();
} catch (Exception e) {
countDownConnection();
if (running) {
// Introduce delay if necessary
errorDelay = handleExceptionWithDelay(errorDelay);
// re-throw
throw e;
} else {
break;
}
}
// Successful accept, reset the error delay
errorDelay = 0;
// Configure the socket
if (running && !paused) {
// Hand this socket off to an appropriate processor
if (!setSocketOptions(socket)) {
countDownConnection();
closeSocket(socket);
}
} else {
countDownConnection();
// Close socket right away
closeSocket(socket);
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error(sm.getString("endpoint.accept.fail"), t);
}
}
state = AcceptorState.ENDED;
}
}
private void closeSocket(AsynchronousSocketChannel socket) {
try {
socket.close();
} catch (IOException ioe) {
if (log.isDebugEnabled()) {
log.debug("", ioe);
}
}
}
public static class Nio2SocketWrapper extends SocketWrapper<Nio2Channel> {
private SendfileData sendfileData = null;
private boolean upgradeInit = false;
public Nio2SocketWrapper(Nio2Channel channel) {
super(channel);
}
@Override
public long getTimeout() {
long timeout = super.getTimeout();
return (timeout > 0) ? timeout : Long.MAX_VALUE;
}
@Override
public void setUpgraded(boolean upgraded) {
if (upgraded && !isUpgraded()) {
upgradeInit = true;
}
super.setUpgraded(upgraded);
}
public boolean isUpgradeInit() {
boolean value = upgradeInit;
upgradeInit = false;
return value;
}
public void setSendfileData(SendfileData sf) { this.sendfileData = sf; }
public SendfileData getSendfileData() { return this.sendfileData; }
}
// ------------------------------------------------ Application Buffer Handler
public static class NioBufferHandler implements ApplicationBufferHandler {
private ByteBuffer readbuf = null;
private ByteBuffer writebuf = null;
public NioBufferHandler(int readsize, int writesize, boolean direct) {
if ( direct ) {
readbuf = ByteBuffer.allocateDirect(readsize);
writebuf = ByteBuffer.allocateDirect(writesize);
}else {
readbuf = ByteBuffer.allocate(readsize);
writebuf = ByteBuffer.allocate(writesize);
}
}
@Override
public ByteBuffer getReadBuffer() {return readbuf;}
@Override
public ByteBuffer getWriteBuffer() {return writebuf;}
}
// ------------------------------------------------ Handler Inner Interface
/**
* Bare bones interface used for socket processing. Per thread data is to be
* stored in the ThreadWithAttributes extra folders, or alternately in
* thread local fields.
*/
public interface Handler extends AbstractEndpoint.Handler {
public SocketState process(SocketWrapper<Nio2Channel> socket,
SocketStatus status);
public void release(SocketWrapper<Nio2Channel> socket);
public void closeAll();
public SSLImplementation getSslImplementation();
}
/**
* The completion handler used for asynchronous read operations
*/
private CompletionHandler<Integer, SocketWrapper<Nio2Channel>> awaitBytes
= new CompletionHandler<Integer, SocketWrapper<Nio2Channel>>() {
@Override
public synchronized void completed(Integer nBytes, SocketWrapper<Nio2Channel> attachment) {
if (nBytes.intValue() < 0) {
failed(new ClosedChannelException(), attachment);
return;
}
processSocket0(attachment, SocketStatus.OPEN_READ, true);
}
@Override
public void failed(Throwable exc, SocketWrapper<Nio2Channel> attachment) {
processSocket0(attachment, SocketStatus.DISCONNECT, true);
}
};
public void addTimeout(SocketWrapper<Nio2Channel> socket) {
waitingRequests.add(socket);
}
public boolean removeTimeout(SocketWrapper<Nio2Channel> socket) {
return waitingRequests.remove(socket);
}
public static void startInline() {
inlineCompletion.set(Boolean.TRUE);
}
public static void endInline() {
inlineCompletion.set(Boolean.FALSE);
}
public static boolean isInline() {
Boolean flag = inlineCompletion.get();
if (flag == null) {
return false;
} else {
return flag.booleanValue();
}
}
public void awaitBytes(SocketWrapper<Nio2Channel> socket) {
if (socket == null || socket.getSocket() == null) {
return;
}
ByteBuffer byteBuffer = socket.getSocket().getBufHandler().getReadBuffer();
byteBuffer.clear();
socket.getSocket().read(byteBuffer, socket.getTimeout(),
TimeUnit.MILLISECONDS, socket, awaitBytes);
}
private CompletionHandler<Integer, SendfileData> sendfile = new CompletionHandler<Integer, SendfileData>() {
@Override
public void completed(Integer nWrite, SendfileData attachment) {
if (nWrite.intValue() < 0) { // Reach the end of stream
failed(new EOFException(), attachment);
return;
}
attachment.pos += nWrite.intValue();
if (!attachment.buffer.hasRemaining()) {
if (attachment.length <= 0) {
// All data has now been written
attachment.socket.setSendfileData(null);
attachment.buffer.clear();
try {
attachment.fchannel.close();
} catch (IOException e) {
// Ignore
}
if (isInline()) {
attachment.doneInline = true;
} else {
switch (attachment.keepAliveState) {
case NONE: {
processSocket(attachment.socket, SocketStatus.DISCONNECT, false);
break;
}
case PIPELINED: {
processSocket(attachment.socket, SocketStatus.OPEN_READ, true);
break;
}
case OPEN: {
awaitBytes(attachment.socket);
break;
}
}
}
return;
} else {
attachment.buffer.clear();
int nRead = -1;
try {
nRead = attachment.fchannel.read(attachment.buffer);
} catch (IOException e) {
failed(e, attachment);
return;
}
if (nRead > 0) {
attachment.buffer.flip();
if (attachment.length < attachment.buffer.remaining()) {
attachment.buffer.limit(attachment.buffer.limit() - attachment.buffer.remaining() + (int) attachment.length);
}
attachment.length -= nRead;
} else {
failed(new EOFException(), attachment);
return;
}
}
}
attachment.socket.getSocket().write(attachment.buffer, attachment.socket.getTimeout(),
TimeUnit.MILLISECONDS, attachment, this);
}
@Override
public void failed(Throwable exc, SendfileData attachment) {
try {
attachment.fchannel.close();
} catch (IOException e) {
// Ignore
}
if (!isInline()) {
processSocket(attachment.socket, SocketStatus.ERROR, false);
} else {
attachment.doneInline = true;
attachment.error = true;
}
}
};
public SendfileState processSendfile(Nio2SocketWrapper socket) {
// Configure the send file data
SendfileData data = socket.getSendfileData();
if (data.fchannel == null || !data.fchannel.isOpen()) {
java.nio.file.Path path = new File(data.fileName).toPath();
try {
data.fchannel = java.nio.channels.FileChannel
.open(path, StandardOpenOption.READ).position(data.pos);
} catch (IOException e) {
return SendfileState.ERROR;
}
}
ByteBuffer buffer = socket.getSocket().getBufHandler().getWriteBuffer();
buffer.clear();
int nRead = -1;
try {
nRead = data.fchannel.read(buffer);
} catch (IOException e1) {
return SendfileState.ERROR;
}
if (nRead >= 0) {
buffer.flip();
data.socket = socket;
data.buffer = buffer;
data.length -= nRead;
startInline();
try {
socket.getSocket().write(buffer, socket.getTimeout(), TimeUnit.MILLISECONDS,
data, sendfile);
} finally {
endInline();
}
if (data.doneInline) {
if (data.error) {
return SendfileState.ERROR;
} else {
return SendfileState.DONE;
}
} else {
return SendfileState.PENDING;
}
} else {
return SendfileState.ERROR;
}
}
// ---------------------------------------------- SocketProcessor Inner Class
/**
* This class is the equivalent of the Worker, but will simply use in an
* external Executor thread pool.
*/
protected class SocketProcessor implements Runnable {
private SocketWrapper<Nio2Channel> socket = null;
private SocketStatus status = null;
public SocketProcessor(SocketWrapper<Nio2Channel> socket, SocketStatus status) {
reset(socket,status);
}
public void reset(SocketWrapper<Nio2Channel> socket, SocketStatus status) {
this.socket = socket;
this.status = status;
}
@Override
public void run() {
// Upgraded connections need to allow multiple threads to access the
// connection at the same time to enable blocking IO to be used when
// NIO has been configured
if (socket.isUpgraded() &&
SocketStatus.OPEN_WRITE == status) {
synchronized (socket.getWriteThreadLock()) {
doRun();
}
} else {
synchronized (socket) {
doRun();
}
}
}
private void doRun() {
boolean launch = false;
try {
int handshake = -1;
try {
if (socket.getSocket() != null) {
// For STOP there is no point trying to handshake as the
// Poller has been stopped.
if (socket.getSocket().isHandshakeComplete() ||
status == SocketStatus.STOP) {
handshake = 0;
} else {
handshake = socket.getSocket().handshake();
// The handshake process reads/writes from/to the
// socket. status may therefore be OPEN_WRITE once
// the handshake completes. However, the handshake
// happens when the socket is opened so the status
// must always be OPEN_READ after it completes. It
// is OK to always set this as it is only used if
// the handshake completes.
status = SocketStatus.OPEN_READ;
}
}
} catch (IOException x) {
handshake = -1;
if (log.isDebugEnabled()) {
log.debug(sm.getString("endpoint.err.handshake"), x);
}
}
if (handshake == 0) {
SocketState state = SocketState.OPEN;
// Process the request from this socket
if (status == null) {
state = handler.process(socket, SocketStatus.OPEN_READ);
} else {
state = handler.process(socket, status);
}
if (state == SocketState.CLOSED) {
// Close socket and pool
socket.setComet(false);
closeSocket(socket, SocketStatus.ERROR);
if (useCaches && running && !paused) {
nioChannels.push(socket.getSocket());
}
} else if (state == SocketState.UPGRADING) {
socket.setKeptAlive(true);
socket.access();
launch = true;
}
} else if (handshake == -1 ) {
closeSocket(socket, SocketStatus.DISCONNECT);
if (useCaches && running && !paused) {
nioChannels.push(socket.getSocket());
}
}
} catch (OutOfMemoryError oom) {
try {
oomParachuteData = null;
log.error("", oom);
closeSocket(socket, SocketStatus.ERROR);
releaseCaches();
} catch (Throwable oomt) {
try {
System.err.println(oomParachuteMsg);
oomt.printStackTrace();
} catch (Throwable letsHopeWeDontGetHere){
ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
}
}
} catch (VirtualMachineError vme) {
ExceptionUtils.handleThrowable(vme);
} catch (Throwable t) {
log.error(sm.getString("endpoint.processing.fail"), t);
if (socket != null) {
closeSocket(socket, SocketStatus.ERROR);
}
} finally {
if (launch) {
try {
getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN_READ));
} catch (NullPointerException npe) {
if (running) {
log.error(sm.getString("endpoint.launch.fail"),
npe);
}
}
}
socket = null;
status = null;
//return to cache
if (useCaches && running && !paused) {
processorCache.push(this);
}
}
}
}
// ----------------------------------------------- SendfileData Inner Class
/**
* SendfileData class.
*/
public static class SendfileData {
// File
public String fileName;
public FileChannel fchannel;
public long pos;
public long length;
// KeepAlive flag
public SendfileKeepAliveState keepAliveState = SendfileKeepAliveState.NONE;
// Internal use only
private Nio2SocketWrapper socket;
private ByteBuffer buffer;
private boolean doneInline = false;
private boolean error = false;
}
}