WICKET-6791 Offload WebSocket push when initiated in Wicket request cycle (#435)
Use different thread executors when pushing WebSocketPushMessage in http worker thread and non-worker thread
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java
index b755c18..6b47820 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java
@@ -39,6 +39,9 @@
import org.slf4j.LoggerFactory;
import javax.servlet.http.HttpServletRequest;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
@@ -63,6 +66,7 @@
* are never passed to the Servlet Filters.
*/
private static boolean USING_JAVAX_WEB_SOCKET = false;
+
static
{
try
@@ -95,6 +99,7 @@
{
synchronized (application)
{
+ settings = application.getMetaData(KEY);
if (settings == null)
{
settings = new WebSocketSettings();
@@ -114,7 +119,7 @@
/**
* The executor that handles the processing of Web Socket push message broadcasts.
*/
- private Executor webSocketPushMessageExecutor = new SameThreadExecutor();
+ private Executor webSocketPushMessageExecutor = new WebSocketPushMessageExecutor();
/**
* The executor that handles broadcast of the {@link org.apache.wicket.protocol.ws.api.event.WebSocketPayload}
@@ -237,11 +242,12 @@
*/
public Executor getSendPayloadExecutor()
{
- return sendPayloadExecutor;
+ return sendPayloadExecutor;
}
/**
* Sets the filter for checking the incoming connections
+ *
* @param connectionFilter
* the filter for checking the incoming connections
* @see WebSocketConnectionFilterCollection
@@ -255,7 +261,8 @@
* @return the filter for checking the incoming connections
* @see WebSocketConnectionFilterCollection
*/
- public IWebSocketConnectionFilter getConnectionFilter() {
+ public IWebSocketConnectionFilter getConnectionFilter()
+ {
return this.connectionFilter;
}
@@ -283,7 +290,8 @@
* The active web socket connection
* @return a new instance of WebSocketRequestHandler for processing a web socket request
*/
- public WebSocketRequestHandler newWebSocketRequestHandler(Page page, IWebSocketConnection connection) {
+ public WebSocketRequestHandler newWebSocketRequestHandler(Page page, IWebSocketConnection connection)
+ {
return new WebSocketRequestHandler(page, connection);
}
@@ -302,11 +310,13 @@
return new WebSocketRequest(new ServletRequestCopy(request), filterPath);
}
- public void setFilterPrefix(final CharSequence filterPrefix) {
+ public void setFilterPrefix(final CharSequence filterPrefix)
+ {
this.filterPrefix.set(filterPrefix);
}
- public CharSequence getFilterPrefix() {
+ public CharSequence getFilterPrefix()
+ {
if (filterPrefix.get() == null)
{
if (USING_JAVAX_WEB_SOCKET)
@@ -321,11 +331,13 @@
return filterPrefix.get();
}
- public void setContextPath(final CharSequence contextPath) {
+ public void setContextPath(final CharSequence contextPath)
+ {
this.contextPath.set(contextPath);
}
- public CharSequence getContextPath() {
+ public CharSequence getContextPath()
+ {
contextPath.compareAndSet(null, RequestCycle.get().getRequest().getContextPath());
return contextPath.get();
}
@@ -335,7 +347,8 @@
this.baseUrl.set(baseUrl);
}
- public CharSequence getBaseUrl() {
+ public CharSequence getBaseUrl()
+ {
if (baseUrl.get() == null)
{
Url _baseUrl = RequestCycle.get().getUrlRenderer().getBaseUrl();
@@ -393,4 +406,66 @@
command.run();
}
}
+
+ public static class WebSocketPushMessageExecutor implements Executor
+ {
+ /**
+ * An executor that should be used when the WebSocket message is pushed
+ * from non-http worker thread.
+ */
+ private final java.util.concurrent.Executor nonHttpRequestExecutor;
+
+ /**
+ * An executor that is used when the WebSocket push is initiated in
+ * http worker thread. In this case the WebSocket processing should be
+ * off-loaded to a different thread that should wait for the page instance
+ * lock.
+ */
+ private final java.util.concurrent.Executor httpRequestExecutor;
+
+ /**
+ * For non-http worker threads pushes the WebSocket runnable in the same request.
+ * For http worker threads uses an elastic thread pool of 1-8 threads.
+ *
+ * Use {@link WebSocketPushMessageExecutor#WebSocketPushMessageExecutor(java.util.concurrent.Executor, java.util.concurrent.Executor)}
+ * for custom behavior and/or settings
+ */
+ public WebSocketPushMessageExecutor()
+ {
+ this(Runnable::run, new ThreadPoolExecutor(1, 8,
+ 60L, TimeUnit.SECONDS,
+ new SynchronousQueue<>(),
+ new ThreadFactory()));
+ }
+
+ public WebSocketPushMessageExecutor(java.util.concurrent.Executor nonHttpRequestExecutor, java.util.concurrent.Executor httpRequestExecutor)
+ {
+ this.nonHttpRequestExecutor = nonHttpRequestExecutor;
+ this.httpRequestExecutor = httpRequestExecutor;
+ }
+
+ @Override
+ public void run(final Runnable command)
+ {
+ if (RequestCycle.get() != null)
+ {
+ httpRequestExecutor.execute(command);
+ }
+ else
+ {
+ nonHttpRequestExecutor.execute(command);
+ }
+ }
+ }
+
+ public static class ThreadFactory implements java.util.concurrent.ThreadFactory
+ {
+ private final AtomicInteger counter = new AtomicInteger();
+
+ @Override
+ public Thread newThread(final Runnable r)
+ {
+ return new Thread(r, "Wicket-WebSocket-HttpRequest-Thread-" + counter.getAndIncrement());
+ }
+ }
}
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/WebSocketTester.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/WebSocketTester.java
index f83894f..df67eac 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/WebSocketTester.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/WebSocketTester.java
@@ -56,6 +56,9 @@
webApplication.getWicketFilter().setFilterPath("");
}
+ WebSocketSettings webSocketSettings = WebSocketSettings.Holder.get(webApplication);
+ webSocketSettings.setWebSocketPushMessageExecutor(new WebSocketSettings.SameThreadExecutor());
+
socketProcessor = new TestWebSocketProcessor(wicketTester, page)
{
@Override
@@ -93,6 +96,9 @@
webApplication.getWicketFilter().setFilterPath("");
}
+ WebSocketSettings webSocketSettings = WebSocketSettings.Holder.get(webApplication);
+ webSocketSettings.setWebSocketPushMessageExecutor(new WebSocketSettings.SameThreadExecutor());
+
socketProcessor = new TestWebSocketProcessor(wicketTester, resourceName)
{
@Override