More robust solution to the problem of blocking writes not be closed when the web application stops. Futures used for blocking writes are registered with the session and the session completes them with an exception if they are outstanding when the session closes.
git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@1524687 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/java/org/apache/tomcat/websocket/FutureToSendHandler.java b/java/org/apache/tomcat/websocket/FutureToSendHandler.java
index 678d68e..7871400 100644
--- a/java/org/apache/tomcat/websocket/FutureToSendHandler.java
+++ b/java/org/apache/tomcat/websocket/FutureToSendHandler.java
@@ -31,12 +31,19 @@
class FutureToSendHandler implements Future<Void>, SendHandler {
private final CountDownLatch latch = new CountDownLatch(1);
+ private final WsSession wsSession;
private volatile SendResult result = null;
+ public FutureToSendHandler(WsSession wsSession) {
+ this.wsSession = wsSession;
+ }
+
+
// --------------------------------------------------------- SendHandler
@Override
public void onResult(SendResult result) {
+
this.result = result;
latch.countDown();
}
@@ -64,7 +71,12 @@
@Override
public Void get() throws InterruptedException,
ExecutionException {
- latch.await();
+ try {
+ wsSession.registerFuture(this);
+ latch.await();
+ } finally {
+ wsSession.unregisterFuture(this);
+ }
if (result.getException() != null) {
throw new ExecutionException(result.getException());
}
@@ -75,7 +87,14 @@
public Void get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException,
TimeoutException {
- boolean retval = latch.await(timeout, unit);
+ boolean retval = false;
+ try {
+ wsSession.registerFuture(this);
+ retval = latch.await(timeout, unit);
+ } finally {
+ wsSession.unregisterFuture(this);
+
+ }
if (retval == false) {
throw new TimeoutException();
}
diff --git a/java/org/apache/tomcat/websocket/LocalStrings.properties b/java/org/apache/tomcat/websocket/LocalStrings.properties
index 8565ad3..300a767 100644
--- a/java/org/apache/tomcat/websocket/LocalStrings.properties
+++ b/java/org/apache/tomcat/websocket/LocalStrings.properties
@@ -71,12 +71,19 @@
wsSession.duplicateHandlerBinary=A binary message handler has already been configured
wsSession.duplicateHandlerPong=A pong message handler has already been configured
wsSession.duplicateHandlerText=A text message handler has already been configured
-wsSession.sendCloseFail=Failed to send close message to remote endpoint
wsSession.invalidHandlerTypePong=A pong message handler must implement MessageHandler.Basic
+wsSession.messageFailed=Unable to write the complete message as the WebSocket connection has been closed
+wsSession.sendCloseFail=Failed to send close message to remote endpoint
wsSession.removeHandlerFailed=Unable to remove the handler [{0}] as it was not registered with this session
wsSession.unknownHandler=Unable to add the message handler [{0}] as it was for the unrecognised type [{1}]
wsSession.unknownHandlerType=Unable to add the message handler [{0}] as it was wrapped as the unrecognised type [{1}]
+# Note the following message is used as a close reason in a WebSocket control
+# frame and therefore must be 123 bytes (not characters) or less in length.
+# Messages are encoded using UTF-8 where a single character may be encoded in
+# as many as 4 bytes.
+wsWebSocketContainer.shutdown=The web application is stopping
+
wsWebSocketContainer.asynchronousChannelGroupFail=Unable to create dedicated AsynchronousChannelGroup for WebSocket clients which is required to prevent memory leaks in complex class loader environments like J2EE containers
wsWebSocketContainer.asynchronousSocketChannelFail=Unable to open a connection to the server
wsWebSocketContainer.defaultConfiguratorFaill=Failed to create the default configurator
@@ -90,4 +97,5 @@
wsWebSocketContainer.missingAnnotation=Cannot use POJO class [{0}] as it is not annotated with @ClientEndpoint
wsWebSocketContainer.pathNoHost=No host was specified in URI
wsWebSocketContainer.pathWrongScheme=The scheme [{0}] is not supported
+wsWebSocketContainer.sessionCloseFail=Session with ID [{0}] did not close cleanly
wsWebSocketContainer.sslEngineFail=Unable to create SSLEngine to support SSL/TLS connections
diff --git a/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java b/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
index 7ea7e54..4823556 100644
--- a/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
+++ b/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
@@ -119,7 +119,7 @@
public Future<Void> sendBytesByFuture(ByteBuffer data) {
- FutureToSendHandler f2sh = new FutureToSendHandler();
+ FutureToSendHandler f2sh = new FutureToSendHandler(wsSession);
sendBytesByCompletion(data, f2sh);
return f2sh;
}
@@ -156,7 +156,7 @@
public Future<Void> sendStringByFuture(String text) {
- FutureToSendHandler f2sh = new FutureToSendHandler();
+ FutureToSendHandler f2sh = new FutureToSendHandler(wsSession);
sendStringByCompletion(text, f2sh);
return f2sh;
}
@@ -191,7 +191,7 @@
// trigger a session close and depending on timing the client
// session may close before we can read the timeout.
long timeout = getBlockingSendTimeout();
- FutureToSendHandler f2sh = new FutureToSendHandler();
+ FutureToSendHandler f2sh = new FutureToSendHandler(wsSession);
TextMessageSendHandler tmsh = new TextMessageSendHandler(f2sh, part,
last, encoder, encoderBuffer, this);
tmsh.write();
@@ -213,7 +213,7 @@
// trigger a session close and depending on timing the client
// session may close before we can read the timeout.
long timeout = getBlockingSendTimeout();
- FutureToSendHandler f2sh = new FutureToSendHandler();
+ FutureToSendHandler f2sh = new FutureToSendHandler(wsSession);
startMessage(opCode, payload, last, f2sh);
try {
if (timeout == -1) {
@@ -448,7 +448,7 @@
}
public Future<Void> sendObjectByFuture(Object obj) {
- FutureToSendHandler f2sh = new FutureToSendHandler();
+ FutureToSendHandler f2sh = new FutureToSendHandler(wsSession);
sendObjectByCompletion(obj, f2sh);
return f2sh;
}
diff --git a/java/org/apache/tomcat/websocket/WsSession.java b/java/org/apache/tomcat/websocket/WsSession.java
index a6f0655..825ea47 100644
--- a/java/org/apache/tomcat/websocket/WsSession.java
+++ b/java/org/apache/tomcat/websocket/WsSession.java
@@ -38,6 +38,7 @@
import javax.websocket.MessageHandler;
import javax.websocket.PongMessage;
import javax.websocket.RemoteEndpoint;
+import javax.websocket.SendResult;
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;
@@ -92,6 +93,7 @@
Constants.DEFAULT_BUFFER_SIZE;
private volatile long maxIdleTimeout = 0;
private volatile long lastActive = System.currentTimeMillis();
+ private Map<FutureToSendHandler,FutureToSendHandler> futures = new ConcurrentHashMap<>();
/**
* Creates a new WebSocket session for communication between the two
@@ -415,6 +417,12 @@
state = State.CLOSED;
}
+
+ IOException ioe = new IOException(sm.getString("wsSession.messageFailed"));
+ SendResult sr = new SendResult(ioe);
+ for (FutureToSendHandler f2sh : futures.keySet()) {
+ f2sh.onResult(sr);
+ }
}
@@ -510,6 +518,25 @@
}
}
+
+ /**
+ * Make the session aware of a {@link FutureToSendHandler} that will need to
+ * be forcibly closed if the session closes before the
+ * {@link FutureToSendHandler} completes.
+ */
+ protected void registerFuture(FutureToSendHandler f2sh) {
+ futures.put(f2sh, f2sh);
+ }
+
+
+ /**
+ * Remove a {@link FutureToSendHandler} from the set of tracked instances.
+ */
+ protected void unregisterFuture(FutureToSendHandler f2sh) {
+ futures.remove(f2sh);
+ }
+
+
@Override
public URI getRequestURI() {
checkState();
diff --git a/java/org/apache/tomcat/websocket/WsWebSocketContainer.java b/java/org/apache/tomcat/websocket/WsWebSocketContainer.java
index 709022e..025be44 100644
--- a/java/org/apache/tomcat/websocket/WsWebSocketContainer.java
+++ b/java/org/apache/tomcat/websocket/WsWebSocketContainer.java
@@ -55,6 +55,8 @@
import javax.net.ssl.TrustManagerFactory;
import javax.websocket.ClientEndpoint;
import javax.websocket.ClientEndpointConfig;
+import javax.websocket.CloseReason;
+import javax.websocket.CloseReason.CloseCodes;
import javax.websocket.DeploymentException;
import javax.websocket.Endpoint;
import javax.websocket.Extension;
@@ -740,6 +742,27 @@
this.defaultAsyncTimeout = timeout;
}
+
+ /**
+ * Cleans up the resources still in use by WebSocket sessions created from
+ * this container. This includes closing sessions and cancelling
+ * {@link Future}s associated with blocking read/writes.
+ */
+ public void destroy() {
+ CloseReason cr = new CloseReason(
+ CloseCodes.GOING_AWAY, sm.getString("wsWebSocketContainer.shutdown"));
+
+ for (WsSession session : sessions.keySet()) {
+ try {
+ session.close(cr);
+ } catch (IOException ioe) {
+ log.debug(sm.getString(
+ "wsWebSocketContainer.sessionCloseFail", session.getId()), ioe);
+ }
+ }
+ }
+
+
// ----------------------------------------------- BackgroundProcess methods
@Override
diff --git a/java/org/apache/tomcat/websocket/server/WsContextListener.java b/java/org/apache/tomcat/websocket/server/WsContextListener.java
index 9a75f36..27ea702 100644
--- a/java/org/apache/tomcat/websocket/server/WsContextListener.java
+++ b/java/org/apache/tomcat/websocket/server/WsContextListener.java
@@ -21,10 +21,11 @@
import javax.servlet.ServletContextListener;
/**
- * In normal usage, this {@link ServletContextListener} is not required as the
- * {@link WsSci} performs all the necessary bootstrap. If the {@link WsSci} is
- * disabled, this listener must be added manually to every
- * {@link javax.servlet.ServletContext} that uses WebSocket to bootstrap the
+ * In normal usage, this {@link ServletContextListener} does not need to be
+ * explicitly configured as the {@link WsSci} performs all the necessary
+ * bootstrap and installs this listener in the {@link ServletContext}. If the
+ * {@link WsSci} is disabled, this listener must be added manually to every
+ * {@link ServletContext} that uses WebSocket to bootstrap the
* {@link WsServerContainer} correctly.
*/
public class WsContextListener implements ServletContextListener {
@@ -35,12 +36,16 @@
// Don't trigger WebSocket initialization if a WebSocket Server
// Container is already present
if (sc.getAttribute(Constants.SERVER_CONTAINER_SERVLET_CONTEXT_ATTRIBUTE) == null) {
- WsSci.init(sce.getServletContext());
+ WsSci.init(sce.getServletContext(), false);
}
}
@Override
public void contextDestroyed(ServletContextEvent sce) {
- // NOOP
+ ServletContext sc = sce.getServletContext();
+ Object obj = sc.getAttribute(Constants.SERVER_CONTAINER_SERVLET_CONTEXT_ATTRIBUTE);
+ if (obj instanceof WsServerContainer) {
+ ((WsServerContainer) obj).destroy();
+ }
}
}
diff --git a/java/org/apache/tomcat/websocket/server/WsSci.java b/java/org/apache/tomcat/websocket/server/WsSci.java
index 04d197e..635414e 100644
--- a/java/org/apache/tomcat/websocket/server/WsSci.java
+++ b/java/org/apache/tomcat/websocket/server/WsSci.java
@@ -44,7 +44,7 @@
public void onStartup(Set<Class<?>> clazzes, ServletContext ctx)
throws ServletException {
- WsServerContainer sc = init(ctx);
+ WsServerContainer sc = init(ctx, true);
if (clazzes == null || clazzes.size() == 0) {
return;
@@ -125,7 +125,8 @@
}
- static WsServerContainer init(ServletContext servletContext) {
+ static WsServerContainer init(ServletContext servletContext,
+ boolean initBySciMechanism) {
WsServerContainer sc = new WsServerContainer(servletContext);
@@ -133,6 +134,11 @@
Constants.SERVER_CONTAINER_SERVLET_CONTEXT_ATTRIBUTE, sc);
servletContext.addListener(new WsSessionListener(sc));
+ // Can't register the ContextListener again if the ContextListener is
+ // calling this method
+ if (initBySciMechanism) {
+ servletContext.addListener(new WsContextListener());
+ }
return sc;
}